Wan2.2-Animate / app.py
kelseye's picture
Upload app.py
0b909d1 verified
raw
history blame
7.36 kB
# app.py
import os
import oss2
import sys
import uuid
import shutil
import time
import gradio as gr
import requests
os.system("pip install dashscope")
import dashscope
from dashscope.utils.oss_utils import check_and_upload_local
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")
dashscope.api_key = DASHSCOPE_API_KEY
class WanAnimateApp:
def __init__(self, url, get_url):
self.url = url
self.get_url = get_url
def predict(
self,
ref_img,
video,
model_id,
model,
):
# Upload files to OSS if needed and get URLs
_, image_url = check_and_upload_local(model_id, ref_img, DASHSCOPE_API_KEY)
_, video_url = check_and_upload_local(model_id, video, DASHSCOPE_API_KEY)
# Prepare the request payload
payload = {
"model": model_id,
"input": {
"image_url": image_url,
"video_url": video_url
},
"parameters": {
"check_image": True,
"mode": model,
}
}
# Set up headers
headers = {
"X-DashScope-Async": "enable",
"X-DashScope-OssResourceResolve": "enable",
"Authorization": f"Bearer {DASHSCOPE_API_KEY}",
"Content-Type": "application/json"
}
# Make the initial API request
url = self.url
response = requests.post(url, json=payload, headers=headers)
# Check if request was successful
if response.status_code != 200:
raise Exception(f"Initial request failed with status code {response.status_code}: {response.text}")
# Get the task ID from response
result = response.json()
task_id = result.get("output", {}).get("task_id")
if not task_id:
raise Exception("Failed to get task ID from response")
# Poll for results
get_url = f"{self.get_url}/{task_id}"
headers = {
"Authorization": f"Bearer {DASHSCOPE_API_KEY}",
"Content-Type": "application/json"
}
while True:
response = requests.get(get_url, headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to get task status: {response.status_code}: {response.text}")
result = response.json()
print(result)
task_status = result.get("output", {}).get("task_status")
if task_status == "SUCCEEDED":
# Task completed successfully, return video URL
video_url = result["output"]["results"]["video_url"]
return video_url, "SUCCEEDED"
elif task_status == "FAILED":
# Task failed, raise an exception with error message
error_msg = result.get("output", {}).get("message", "Unknown error")
code_msg = result.get("output", {}).get("code", "Unknown code")
print(f"\n\nTask failed: {error_msg} Code: {code_msg} TaskId: {task_id}\n\n")
return None, f"Task failed: {error_msg} Code: {code_msg} TaskId: {task_id}"
# raise Exception(f"Task failed: {error_msg} TaskId: {task_id}")
else:
# Task is still running, wait and retry
time.sleep(5) # Wait 5 seconds before polling again
def start_app():
import argparse
parser = argparse.ArgumentParser(description="Wan2.2-Animate 视频生成工具")
args = parser.parse_args()
url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis/"
# url = "https://poc-dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis"
get_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/"
# get_url = f"https://poc-dashscope.aliyuncs.com/api/v1/tasks"
app = WanAnimateApp(url=url, get_url=get_url)
with gr.Blocks(title="Wan2.2-Animate 视频生成") as demo:
gr.HTML("""
<div style="text-align: center; font-size: 32px; font-weight: bold; margin-bottom: 20px;">
Wan2.2-Animate
</div>
""")
gr.Markdown("基于参考图像和骨骼序列的人物驱动和替换视频生成")
with gr.Row():
with gr.Column():
ref_img = gr.Image(
label="Reference Image(参考图像)",
type="filepath",
sources=["upload"],
)
video = gr.Video(
label="Template Video(模版视频)",
sources=["upload"],
)
with gr.Row():
model_id = gr.Dropdown(
label="模型名称",
choices=["wan2.2-animate-move", "wan2.2-animate-mix"],
value="wan2.2-animate-move",
info="支持mov和mix模型"
)
model = gr.Dropdown(
label="模式",
choices=["wan-pro", "wan-std"],
value="wan-pro",
info="支持标准模型std和专业模式pro两个版本"
)
run_button = gr.Button("Generate Video(生成视频)")
with gr.Column():
output_video = gr.Video(label="Output Video(输出视频)")
output_status = gr.Textbox(label="Status")
run_button.click(
fn=app.predict,
inputs=[
ref_img,
video,
model_id,
model,
],
outputs=[output_video, output_status],
)
# examples_dir = "examples"
# if os.path.exists(examples_dir):
# example_data = []
# files_dict = {}
# for file in os.listdir(examples_dir):
# file_path = os.path.join(examples_dir, file)
# name, ext = os.path.splitext(file)
# if ext.lower() in [".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"]:
# if name not in files_dict:
# files_dict[name] = {}
# files_dict[name]["image"] = file_path
# elif ext.lower() in [".mp3", ".wav"]:
# if name not in files_dict:
# files_dict[name] = {}
# files_dict[name]["audio"] = file_path
# for name, files in files_dict.items():
# if "image" in files and "audio" in files:
# example_data.append([
# files["image"],
# files["audio"],
# "480P"
# ])
# if example_data:
# gr.Examples(
# examples=example_data,
# inputs=[ref_img, video, resolution],
# outputs=output_video,
# fn=app.predict,
# cache_examples=False,
# )
demo.launch()
if __name__ == "__main__":
start_app()