# app.py import os import oss2 import sys import uuid import shutil import time import gradio as gr import requests os.system("pip install dashscope") import dashscope from dashscope.utils.oss_utils import check_and_upload_local DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY") dashscope.api_key = DASHSCOPE_API_KEY class WanAnimateApp: def __init__(self, url, get_url): self.url = url self.get_url = get_url def predict( self, ref_img, video, model_id, model, ): # Upload files to OSS if needed and get URLs _, image_url = check_and_upload_local(model_id, ref_img, DASHSCOPE_API_KEY) _, video_url = check_and_upload_local(model_id, video, DASHSCOPE_API_KEY) # Prepare the request payload payload = { "model": model_id, "input": { "image_url": image_url, "video_url": video_url }, "parameters": { "check_image": True, "mode": model, } } # Set up headers headers = { "X-DashScope-Async": "enable", "X-DashScope-OssResourceResolve": "enable", "Authorization": f"Bearer {DASHSCOPE_API_KEY}", "Content-Type": "application/json" } # Make the initial API request url = self.url response = requests.post(url, json=payload, headers=headers) # Check if request was successful if response.status_code != 200: raise Exception(f"Initial request failed with status code {response.status_code}: {response.text}") # Get the task ID from response result = response.json() task_id = result.get("output", {}).get("task_id") if not task_id: raise Exception("Failed to get task ID from response") # Poll for results get_url = f"{self.get_url}/{task_id}" headers = { "Authorization": f"Bearer {DASHSCOPE_API_KEY}", "Content-Type": "application/json" } while True: response = requests.get(get_url, headers=headers) if response.status_code != 200: raise Exception(f"Failed to get task status: {response.status_code}: {response.text}") result = response.json() print(result) task_status = result.get("output", {}).get("task_status") if task_status == "SUCCEEDED": # Task completed successfully, return video URL video_url = result["output"]["results"]["video_url"] return video_url, "SUCCEEDED" elif task_status == "FAILED": # Task failed, raise an exception with error message error_msg = result.get("output", {}).get("message", "Unknown error") code_msg = result.get("output", {}).get("code", "Unknown code") print(f"\n\nTask failed: {error_msg} Code: {code_msg} TaskId: {task_id}\n\n") return None, f"Task failed: {error_msg} Code: {code_msg} TaskId: {task_id}" # raise Exception(f"Task failed: {error_msg} TaskId: {task_id}") else: # Task is still running, wait and retry time.sleep(5) # Wait 5 seconds before polling again def start_app(): import argparse parser = argparse.ArgumentParser(description="Wan2.2-Animate 视频生成工具") args = parser.parse_args() url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis/" # url = "https://poc-dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis" get_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/" # get_url = f"https://poc-dashscope.aliyuncs.com/api/v1/tasks" app = WanAnimateApp(url=url, get_url=get_url) with gr.Blocks(title="Wan2.2-Animate 视频生成") as demo: gr.HTML("""
Wan2.2-Animate
""") gr.Markdown("基于参考图像和骨骼序列的人物驱动和替换视频生成") with gr.Row(): with gr.Column(): ref_img = gr.Image( label="Reference Image(参考图像)", type="filepath", sources=["upload"], ) video = gr.Video( label="Template Video(模版视频)", sources=["upload"], ) with gr.Row(): model_id = gr.Dropdown( label="模型名称", choices=["wan2.2-animate-move", "wan2.2-animate-mix"], value="wan2.2-animate-move", info="支持mov和mix模型" ) model = gr.Dropdown( label="模式", choices=["wan-pro", "wan-std"], value="wan-pro", info="支持标准模型std和专业模式pro两个版本" ) run_button = gr.Button("Generate Video(生成视频)") with gr.Column(): output_video = gr.Video(label="Output Video(输出视频)") output_status = gr.Textbox(label="Status") run_button.click( fn=app.predict, inputs=[ ref_img, video, model_id, model, ], outputs=[output_video, output_status], ) # examples_dir = "examples" # if os.path.exists(examples_dir): # example_data = [] # files_dict = {} # for file in os.listdir(examples_dir): # file_path = os.path.join(examples_dir, file) # name, ext = os.path.splitext(file) # if ext.lower() in [".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"]: # if name not in files_dict: # files_dict[name] = {} # files_dict[name]["image"] = file_path # elif ext.lower() in [".mp3", ".wav"]: # if name not in files_dict: # files_dict[name] = {} # files_dict[name]["audio"] = file_path # for name, files in files_dict.items(): # if "image" in files and "audio" in files: # example_data.append([ # files["image"], # files["audio"], # "480P" # ]) # if example_data: # gr.Examples( # examples=example_data, # inputs=[ref_img, video, resolution], # outputs=output_video, # fn=app.predict, # cache_examples=False, # ) demo.launch() if __name__ == "__main__": start_app()