Spaces:
Running
Running
| # app.py | |
| import os | |
| import oss2 | |
| import sys | |
| import uuid | |
| import shutil | |
| import time | |
| import gradio as gr | |
| import requests | |
| os.system("pip install dashscope") | |
| import dashscope | |
| from dashscope.utils.oss_utils import check_and_upload_local | |
| DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY") | |
| dashscope.api_key = DASHSCOPE_API_KEY | |
| class WanAnimateApp: | |
| def __init__(self, url, get_url): | |
| self.url = url | |
| self.get_url = get_url | |
| def predict( | |
| self, | |
| ref_img, | |
| video, | |
| model_id, | |
| model, | |
| ): | |
| # Upload files to OSS if needed and get URLs | |
| _, image_url = check_and_upload_local(model_id, ref_img, DASHSCOPE_API_KEY) | |
| _, video_url = check_and_upload_local(model_id, video, DASHSCOPE_API_KEY) | |
| # Prepare the request payload | |
| payload = { | |
| "model": model_id, | |
| "input": { | |
| "image_url": image_url, | |
| "video_url": video_url | |
| }, | |
| "parameters": { | |
| "check_image": True, | |
| "mode": model, | |
| } | |
| } | |
| # Set up headers | |
| headers = { | |
| "X-DashScope-Async": "enable", | |
| "X-DashScope-OssResourceResolve": "enable", | |
| "Authorization": f"Bearer {DASHSCOPE_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # Make the initial API request | |
| url = self.url | |
| response = requests.post(url, json=payload, headers=headers) | |
| # Check if request was successful | |
| if response.status_code != 200: | |
| raise Exception(f"Initial request failed with status code {response.status_code}: {response.text}") | |
| # Get the task ID from response | |
| result = response.json() | |
| task_id = result.get("output", {}).get("task_id") | |
| if not task_id: | |
| raise Exception("Failed to get task ID from response") | |
| # Poll for results | |
| get_url = f"{self.get_url}/{task_id}" | |
| headers = { | |
| "Authorization": f"Bearer {DASHSCOPE_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| while True: | |
| response = requests.get(get_url, headers=headers) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to get task status: {response.status_code}: {response.text}") | |
| result = response.json() | |
| print(result) | |
| task_status = result.get("output", {}).get("task_status") | |
| if task_status == "SUCCEEDED": | |
| # Task completed successfully, return video URL | |
| video_url = result["output"]["results"]["video_url"] | |
| return video_url, "SUCCEEDED" | |
| elif task_status == "FAILED": | |
| # Task failed, raise an exception with error message | |
| error_msg = result.get("output", {}).get("message", "Unknown error") | |
| code_msg = result.get("output", {}).get("code", "Unknown code") | |
| print(f"\n\nTask failed: {error_msg} Code: {code_msg} TaskId: {task_id}\n\n") | |
| return None, f"Task failed: {error_msg} Code: {code_msg} TaskId: {task_id}" | |
| # raise Exception(f"Task failed: {error_msg} TaskId: {task_id}") | |
| else: | |
| # Task is still running, wait and retry | |
| time.sleep(5) # Wait 5 seconds before polling again | |
| def start_app(): | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Wan2.2-Animate 视频生成工具") | |
| args = parser.parse_args() | |
| url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis/" | |
| # url = "https://poc-dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis" | |
| get_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/" | |
| # get_url = f"https://poc-dashscope.aliyuncs.com/api/v1/tasks" | |
| app = WanAnimateApp(url=url, get_url=get_url) | |
| with gr.Blocks(title="Wan2.2-Animate 视频生成") as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; font-size: 32px; font-weight: bold; margin-bottom: 20px;"> | |
| Wan2.2-Animate | |
| </div> | |
| """) | |
| gr.Markdown("基于参考图像和骨骼序列的人物驱动和替换视频生成") | |
| with gr.Row(): | |
| with gr.Column(): | |
| ref_img = gr.Image( | |
| label="Reference Image(参考图像)", | |
| type="filepath", | |
| sources=["upload"], | |
| ) | |
| video = gr.Video( | |
| label="Template Video(模版视频)", | |
| sources=["upload"], | |
| ) | |
| with gr.Row(): | |
| model_id = gr.Dropdown( | |
| label="模型名称", | |
| choices=["wan2.2-animate-move", "wan2.2-animate-mix"], | |
| value="wan2.2-animate-move", | |
| info="支持mov和mix模型" | |
| ) | |
| model = gr.Dropdown( | |
| label="模式", | |
| choices=["wan-pro", "wan-std"], | |
| value="wan-pro", | |
| info="支持标准模型std和专业模式pro两个版本" | |
| ) | |
| run_button = gr.Button("Generate Video(生成视频)") | |
| with gr.Column(): | |
| output_video = gr.Video(label="Output Video(输出视频)") | |
| output_status = gr.Textbox(label="Status") | |
| run_button.click( | |
| fn=app.predict, | |
| inputs=[ | |
| ref_img, | |
| video, | |
| model_id, | |
| model, | |
| ], | |
| outputs=[output_video, output_status], | |
| ) | |
| # examples_dir = "examples" | |
| # if os.path.exists(examples_dir): | |
| # example_data = [] | |
| # files_dict = {} | |
| # for file in os.listdir(examples_dir): | |
| # file_path = os.path.join(examples_dir, file) | |
| # name, ext = os.path.splitext(file) | |
| # if ext.lower() in [".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"]: | |
| # if name not in files_dict: | |
| # files_dict[name] = {} | |
| # files_dict[name]["image"] = file_path | |
| # elif ext.lower() in [".mp3", ".wav"]: | |
| # if name not in files_dict: | |
| # files_dict[name] = {} | |
| # files_dict[name]["audio"] = file_path | |
| # for name, files in files_dict.items(): | |
| # if "image" in files and "audio" in files: | |
| # example_data.append([ | |
| # files["image"], | |
| # files["audio"], | |
| # "480P" | |
| # ]) | |
| # if example_data: | |
| # gr.Examples( | |
| # examples=example_data, | |
| # inputs=[ref_img, video, resolution], | |
| # outputs=output_video, | |
| # fn=app.predict, | |
| # cache_examples=False, | |
| # ) | |
| demo.launch() | |
| if __name__ == "__main__": | |
| start_app() |