Nymbo commited on
Commit
0c5241e
·
verified ·
1 Parent(s): 2af0f78

Update Modules/Generate_Video.py

Browse files
Files changed (1) hide show
  1. Modules/Generate_Video.py +175 -171
Modules/Generate_Video.py CHANGED
@@ -1,171 +1,175 @@
1
- from __future__ import annotations
2
-
3
- import os
4
- import random
5
- import tempfile
6
- from typing import Annotated
7
-
8
- import gradio as gr
9
- from huggingface_hub import InferenceClient
10
-
11
- from app import _log_call_end, _log_call_start, _truncate_for_log
12
-
13
- HF_VIDEO_TOKEN = os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN")
14
-
15
-
16
- def _write_video_tmp(data_iter_or_bytes: object, suffix: str = ".mp4") -> str:
17
- fd, fname = tempfile.mkstemp(suffix=suffix)
18
- try:
19
- with os.fdopen(fd, "wb") as file:
20
- if isinstance(data_iter_or_bytes, (bytes, bytearray)):
21
- file.write(data_iter_or_bytes)
22
- elif hasattr(data_iter_or_bytes, "read"):
23
- file.write(data_iter_or_bytes.read())
24
- elif hasattr(data_iter_or_bytes, "content"):
25
- file.write(data_iter_or_bytes.content) # type: ignore[attr-defined]
26
- elif hasattr(data_iter_or_bytes, "__iter__") and not isinstance(data_iter_or_bytes, (str, dict)):
27
- for chunk in data_iter_or_bytes: # type: ignore[assignment]
28
- if chunk:
29
- file.write(chunk)
30
- else:
31
- raise gr.Error("Unsupported video data type returned by provider.")
32
- except Exception:
33
- try:
34
- os.remove(fname)
35
- except Exception:
36
- pass
37
- raise
38
- return fname
39
-
40
-
41
- def Generate_Video(
42
- prompt: Annotated[str, "Text description of the video to generate (e.g., 'a red fox running through a snowy forest at sunrise')."],
43
- model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name'. Defaults to Wan-AI/Wan2.2-T2V-A14B."] = "Wan-AI/Wan2.2-T2V-A14B",
44
- negative_prompt: Annotated[str, "What should NOT appear in the video."] = "",
45
- steps: Annotated[int, "Number of denoising steps (1–100). Higher can improve quality but is slower."] = 25,
46
- cfg_scale: Annotated[float, "Guidance scale (1–20). Higher = follow the prompt more closely, lower = more creative."] = 3.5,
47
- seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1,
48
- width: Annotated[int, "Output width in pixels (multiples of 8 recommended)."] = 768,
49
- height: Annotated[int, "Output height in pixels (multiples of 8 recommended)."] = 768,
50
- fps: Annotated[int, "Frames per second of the output video (e.g., 24)."] = 24,
51
- duration: Annotated[float, "Target duration in seconds (provider/model dependent, commonly 2–6s)."] = 4.0,
52
- ) -> str:
53
- _log_call_start(
54
- "Generate_Video",
55
- prompt=_truncate_for_log(prompt, 160),
56
- model_id=model_id,
57
- steps=steps,
58
- cfg_scale=cfg_scale,
59
- fps=fps,
60
- duration=duration,
61
- size=f"{width}x{height}",
62
- )
63
- if not prompt or not prompt.strip():
64
- _log_call_end("Generate_Video", "error=empty prompt")
65
- raise gr.Error("Please provide a non-empty prompt.")
66
- providers = ["auto", "replicate", "fal-ai"]
67
- last_error: Exception | None = None
68
- parameters = {
69
- "negative_prompt": negative_prompt or None,
70
- "num_inference_steps": steps,
71
- "guidance_scale": cfg_scale,
72
- "seed": seed if seed != -1 else random.randint(1, 1_000_000_000),
73
- "width": width,
74
- "height": height,
75
- "fps": fps,
76
- "duration": duration,
77
- }
78
- for provider in providers:
79
- try:
80
- client = InferenceClient(api_key=HF_VIDEO_TOKEN, provider=provider)
81
- if hasattr(client, "text_to_video"):
82
- num_frames = int(duration * fps) if duration and fps else None
83
- extra_body = {}
84
- if width:
85
- extra_body["width"] = width
86
- if height:
87
- extra_body["height"] = height
88
- if fps:
89
- extra_body["fps"] = fps
90
- if duration:
91
- extra_body["duration"] = duration
92
- result = client.text_to_video(
93
- prompt=prompt,
94
- model=model_id,
95
- guidance_scale=cfg_scale,
96
- negative_prompt=[negative_prompt] if negative_prompt else None,
97
- num_frames=num_frames,
98
- num_inference_steps=steps,
99
- seed=parameters["seed"],
100
- extra_body=extra_body if extra_body else None,
101
- )
102
- else:
103
- result = client.post(
104
- model=model_id,
105
- json={"inputs": prompt, "parameters": {k: v for k, v in parameters.items() if v is not None}},
106
- )
107
- path = _write_video_tmp(result, suffix=".mp4")
108
- try:
109
- size = os.path.getsize(path)
110
- except Exception:
111
- size = -1
112
- _log_call_end("Generate_Video", f"provider={provider} path={os.path.basename(path)} bytes={size}")
113
- return path
114
- except Exception as exc: # pylint: disable=broad-except
115
- last_error = exc
116
- continue
117
- msg = str(last_error) if last_error else "Unknown error"
118
- lowered = msg.lower()
119
- if "404" in msg:
120
- raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and HF token access.")
121
- if "503" in msg:
122
- raise gr.Error("The model is warming up. Please try again shortly.")
123
- if "401" in msg or "403" in msg:
124
- raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.")
125
- if ("api_key" in lowered) or ("hf auth login" in lowered) or ("unauthorized" in lowered) or ("forbidden" in lowered):
126
- raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.")
127
- _log_call_end("Generate_Video", f"error={_truncate_for_log(msg, 200)}")
128
- raise gr.Error(f"Video generation failed: {msg}")
129
-
130
-
131
- def build_interface() -> gr.Interface:
132
- return gr.Interface(
133
- fn=Generate_Video,
134
- inputs=[
135
- gr.Textbox(label="Prompt", placeholder="Enter a prompt for the video", lines=2),
136
- gr.Textbox(
137
- label="Model",
138
- value="Wan-AI/Wan2.2-T2V-A14B",
139
- placeholder="creator/model-name",
140
- max_lines=1,
141
- info="<a href=\"https://huggingface.co/models?pipeline_tag=text-to-video&inference_provider=nebius,cerebras,novita,fireworks-ai,together,fal-ai,groq,featherless-ai,nscale,hyperbolic,sambanova,cohere,replicate,scaleway,publicai,hf-inference&sort=trending\" target=\"_blank\" rel=\"noopener noreferrer\">Browse models</a>",
142
- ),
143
- gr.Textbox(label="Negative Prompt", value="", lines=2),
144
- gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Steps"),
145
- gr.Slider(minimum=1.0, maximum=20.0, value=3.5, step=0.1, label="CFG Scale"),
146
- gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"),
147
- gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Width"),
148
- gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Height"),
149
- gr.Slider(minimum=4, maximum=60, value=24, step=1, label="FPS"),
150
- gr.Slider(minimum=1.0, maximum=10.0, value=4.0, step=0.5, label="Duration (s)"),
151
- ],
152
- outputs=gr.Video(label="Generated Video", show_download_button=True, format="mp4"),
153
- title="Generate Video",
154
- description=(
155
- "<div style=\"text-align:center\">Generate short videos via Hugging Face serverless inference. "
156
- "Default model is Wan2.2-T2V-A14B.</div>"
157
- ),
158
- api_description=(
159
- "Generate a short video from a text prompt using a Hugging Face model via serverless inference. "
160
- "Create dynamic scenes like 'a red fox running through a snowy forest at sunrise', 'waves crashing on a rocky shore', "
161
- "'time-lapse of clouds moving across a blue sky'. Default model: Wan2.2-T2V-A14B (2-6 second videos). "
162
- "Parameters: prompt (str), model_id (str), negative_prompt (str), steps (int), cfg_scale (float), seed (int), "
163
- "width/height (int), fps (int), duration (float in seconds). Returns MP4 file path. "
164
- "Return the generated media to the user in this format `![Alt text](URL)`"
165
- ),
166
- flagging_mode="never",
167
- show_api=bool(os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN")),
168
- )
169
-
170
-
171
- __all__ = ["Generate_Video", "build_interface"]
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import os
4
+ import random
5
+ import tempfile
6
+ from typing import Annotated
7
+
8
+ import gradio as gr
9
+ from huggingface_hub import InferenceClient
10
+
11
+ from app import _log_call_end, _log_call_start, _truncate_for_log
12
+ from ._docstrings import autodoc
13
+
14
+ HF_VIDEO_TOKEN = os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN")
15
+
16
+ # Single source of truth for the LLM-facing tool description
17
+ TOOL_SUMMARY = (
18
+ "Generate a short MP4 video from a text prompt via Hugging Face serverless inference; "
19
+ "control model, steps, guidance, seed, size, fps, and duration; returns a temporary MP4 file path. "
20
+ "Return the generated media to the user in this format `![Alt text](URL)`"
21
+ )
22
+
23
+
24
+ def _write_video_tmp(data_iter_or_bytes: object, suffix: str = ".mp4") -> str:
25
+ fd, fname = tempfile.mkstemp(suffix=suffix)
26
+ try:
27
+ with os.fdopen(fd, "wb") as file:
28
+ if isinstance(data_iter_or_bytes, (bytes, bytearray)):
29
+ file.write(data_iter_or_bytes)
30
+ elif hasattr(data_iter_or_bytes, "read"):
31
+ file.write(data_iter_or_bytes.read())
32
+ elif hasattr(data_iter_or_bytes, "content"):
33
+ file.write(data_iter_or_bytes.content) # type: ignore[attr-defined]
34
+ elif hasattr(data_iter_or_bytes, "__iter__") and not isinstance(data_iter_or_bytes, (str, dict)):
35
+ for chunk in data_iter_or_bytes: # type: ignore[assignment]
36
+ if chunk:
37
+ file.write(chunk)
38
+ else:
39
+ raise gr.Error("Unsupported video data type returned by provider.")
40
+ except Exception:
41
+ try:
42
+ os.remove(fname)
43
+ except Exception:
44
+ pass
45
+ raise
46
+ return fname
47
+
48
+
49
+ @autodoc(
50
+ summary=TOOL_SUMMARY,
51
+ )
52
+ def Generate_Video(
53
+ prompt: Annotated[str, "Text description of the video to generate (e.g., 'a red fox running through a snowy forest at sunrise')."],
54
+ model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name'. Defaults to Wan-AI/Wan2.2-T2V-A14B."] = "Wan-AI/Wan2.2-T2V-A14B",
55
+ negative_prompt: Annotated[str, "What should NOT appear in the video."] = "",
56
+ steps: Annotated[int, "Number of denoising steps (1–100). Higher can improve quality but is slower."] = 25,
57
+ cfg_scale: Annotated[float, "Guidance scale (1–20). Higher = follow the prompt more closely, lower = more creative."] = 3.5,
58
+ seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1,
59
+ width: Annotated[int, "Output width in pixels (multiples of 8 recommended)."] = 768,
60
+ height: Annotated[int, "Output height in pixels (multiples of 8 recommended)."] = 768,
61
+ fps: Annotated[int, "Frames per second of the output video (e.g., 24)."] = 24,
62
+ duration: Annotated[float, "Target duration in seconds (provider/model dependent, commonly 2–6s)."] = 4.0,
63
+ ) -> str:
64
+ _log_call_start(
65
+ "Generate_Video",
66
+ prompt=_truncate_for_log(prompt, 160),
67
+ model_id=model_id,
68
+ steps=steps,
69
+ cfg_scale=cfg_scale,
70
+ fps=fps,
71
+ duration=duration,
72
+ size=f"{width}x{height}",
73
+ )
74
+ if not prompt or not prompt.strip():
75
+ _log_call_end("Generate_Video", "error=empty prompt")
76
+ raise gr.Error("Please provide a non-empty prompt.")
77
+ providers = ["auto", "replicate", "fal-ai"]
78
+ last_error: Exception | None = None
79
+ parameters = {
80
+ "negative_prompt": negative_prompt or None,
81
+ "num_inference_steps": steps,
82
+ "guidance_scale": cfg_scale,
83
+ "seed": seed if seed != -1 else random.randint(1, 1_000_000_000),
84
+ "width": width,
85
+ "height": height,
86
+ "fps": fps,
87
+ "duration": duration,
88
+ }
89
+ for provider in providers:
90
+ try:
91
+ client = InferenceClient(api_key=HF_VIDEO_TOKEN, provider=provider)
92
+ if hasattr(client, "text_to_video"):
93
+ num_frames = int(duration * fps) if duration and fps else None
94
+ extra_body = {}
95
+ if width:
96
+ extra_body["width"] = width
97
+ if height:
98
+ extra_body["height"] = height
99
+ if fps:
100
+ extra_body["fps"] = fps
101
+ if duration:
102
+ extra_body["duration"] = duration
103
+ result = client.text_to_video(
104
+ prompt=prompt,
105
+ model=model_id,
106
+ guidance_scale=cfg_scale,
107
+ negative_prompt=[negative_prompt] if negative_prompt else None,
108
+ num_frames=num_frames,
109
+ num_inference_steps=steps,
110
+ seed=parameters["seed"],
111
+ extra_body=extra_body if extra_body else None,
112
+ )
113
+ else:
114
+ result = client.post(
115
+ model=model_id,
116
+ json={"inputs": prompt, "parameters": {k: v for k, v in parameters.items() if v is not None}},
117
+ )
118
+ path = _write_video_tmp(result, suffix=".mp4")
119
+ try:
120
+ size = os.path.getsize(path)
121
+ except Exception:
122
+ size = -1
123
+ _log_call_end("Generate_Video", f"provider={provider} path={os.path.basename(path)} bytes={size}")
124
+ return path
125
+ except Exception as exc: # pylint: disable=broad-except
126
+ last_error = exc
127
+ continue
128
+ msg = str(last_error) if last_error else "Unknown error"
129
+ lowered = msg.lower()
130
+ if "404" in msg:
131
+ raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and HF token access.")
132
+ if "503" in msg:
133
+ raise gr.Error("The model is warming up. Please try again shortly.")
134
+ if "401" in msg or "403" in msg:
135
+ raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.")
136
+ if ("api_key" in lowered) or ("hf auth login" in lowered) or ("unauthorized" in lowered) or ("forbidden" in lowered):
137
+ raise gr.Error("Please duplicate the space and provide a `HF_READ_TOKEN` to enable Image and Video Generation.")
138
+ _log_call_end("Generate_Video", f"error={_truncate_for_log(msg, 200)}")
139
+ raise gr.Error(f"Video generation failed: {msg}")
140
+
141
+
142
+ def build_interface() -> gr.Interface:
143
+ return gr.Interface(
144
+ fn=Generate_Video,
145
+ inputs=[
146
+ gr.Textbox(label="Prompt", placeholder="Enter a prompt for the video", lines=2),
147
+ gr.Textbox(
148
+ label="Model",
149
+ value="Wan-AI/Wan2.2-T2V-A14B",
150
+ placeholder="creator/model-name",
151
+ max_lines=1,
152
+ info="<a href=\"https://huggingface.co/models?pipeline_tag=text-to-video&inference_provider=nebius,cerebras,novita,fireworks-ai,together,fal-ai,groq,featherless-ai,nscale,hyperbolic,sambanova,cohere,replicate,scaleway,publicai,hf-inference&sort=trending\" target=\"_blank\" rel=\"noopener noreferrer\">Browse models</a>",
153
+ ),
154
+ gr.Textbox(label="Negative Prompt", value="", lines=2),
155
+ gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Steps"),
156
+ gr.Slider(minimum=1.0, maximum=20.0, value=3.5, step=0.1, label="CFG Scale"),
157
+ gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"),
158
+ gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Width"),
159
+ gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Height"),
160
+ gr.Slider(minimum=4, maximum=60, value=24, step=1, label="FPS"),
161
+ gr.Slider(minimum=1.0, maximum=10.0, value=4.0, step=0.5, label="Duration (s)"),
162
+ ],
163
+ outputs=gr.Video(label="Generated Video", show_download_button=True, format="mp4"),
164
+ title="Generate Video",
165
+ description=(
166
+ "<div style=\"text-align:center\">Generate short videos via Hugging Face serverless inference. "
167
+ "Default model is Wan2.2-T2V-A14B.</div>"
168
+ ),
169
+ api_description=TOOL_SUMMARY,
170
+ flagging_mode="never",
171
+ show_api=bool(os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN")),
172
+ )
173
+
174
+
175
+ __all__ = ["Generate_Video", "build_interface"]