import torch from diffusers import AutoencoderKLWan, WanImageToVideoPipeline, UniPCMultistepScheduler from diffusers.utils import export_to_video from transformers import CLIPVisionModel import gradio as gr import tempfile import spaces from huggingface_hub import hf_hub_download import numpy as np from PIL import Image import random import logging import gc import time import hashlib from dataclasses import dataclass from typing import Optional, Tuple from functools import wraps import threading import os # GPU 메모리 관리 설정 os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:256' # 더 작은 청크 사용 # 로깅 설정 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 설정 관리 @dataclass class VideoGenerationConfig: model_id: str = "Wan-AI/Wan2.1-I2V-14B-480P-Diffusers" lora_repo_id: str = "Kijai/WanVideo_comfy" lora_filename: str = "Wan21_CausVid_14B_T2V_lora_rank32.safetensors" mod_value: int = 32 # Zero GPU를 위한 매우 보수적인 기본값 default_height: int = 320 default_width: int = 320 max_area: float = 320.0 * 320.0 # Zero GPU에 최적화 slider_min_h: int = 128 slider_max_h: int = 512 # 더 낮은 최대값 slider_min_w: int = 128 slider_max_w: int = 512 # 더 낮은 최대값 fixed_fps: int = 24 min_frames: int = 8 max_frames: int = 30 # 더 낮은 최대 프레임 (1.25초) default_prompt: str = "make this image move, smooth motion" default_negative_prompt: str = "static, blur" # GPU 메모리 최적화 설정 enable_model_cpu_offload: bool = True enable_vae_slicing: bool = True enable_vae_tiling: bool = True @property def max_duration(self): """최대 허용 duration (초)""" return self.max_frames / self.fixed_fps @property def min_duration(self): """최소 허용 duration (초)""" return self.min_frames / self.fixed_fps config = VideoGenerationConfig() MAX_SEED = np.iinfo(np.int32).max # 글로벌 변수 pipe = None generation_lock = threading.Lock() # 성능 측정 데코레이터 def measure_time(func): @wraps(func) def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) logger.info(f"{func.__name__} took {time.time()-start:.2f}s") return result return wrapper # GPU 메모리 정리 함수 def clear_gpu_memory(): """메모리 정리 (Zero GPU 안전)""" gc.collect() if torch.cuda.is_available(): try: torch.cuda.empty_cache() torch.cuda.synchronize() except: pass # 비디오 생성기 클래스 class VideoGenerator: def __init__(self, config: VideoGenerationConfig): self.config = config def calculate_dimensions(self, image: Image.Image) -> Tuple[int, int]: orig_w, orig_h = image.size if orig_w <= 0 or orig_h <= 0: return self.config.default_height, self.config.default_width aspect_ratio = orig_h / orig_w # Zero GPU에 최적화된 매우 작은 해상도 max_area = 320.0 * 320.0 # 102,400 픽셀 # 종횡비가 너무 극단적인 경우 조정 if aspect_ratio > 2.0: aspect_ratio = 2.0 elif aspect_ratio < 0.5: aspect_ratio = 0.5 calc_h = round(np.sqrt(max_area * aspect_ratio)) calc_w = round(np.sqrt(max_area / aspect_ratio)) # mod_value에 맞춤 calc_h = max(self.config.mod_value, (calc_h // self.config.mod_value) * self.config.mod_value) calc_w = max(self.config.mod_value, (calc_w // self.config.mod_value) * self.config.mod_value) # 최대 512로 제한 new_h = int(np.clip(calc_h, self.config.slider_min_h, 512)) new_w = int(np.clip(calc_w, self.config.slider_min_w, 512)) # mod_value에 맞춤 new_h = (new_h // self.config.mod_value) * self.config.mod_value new_w = (new_w // self.config.mod_value) * self.config.mod_value # 최종 픽셀 수 확인 if new_h * new_w > 102400: # 320x320 # 비율을 유지하면서 축소 scale = np.sqrt(102400 / (new_h * new_w)) new_h = int((new_h * scale) // self.config.mod_value) * self.config.mod_value new_w = int((new_w * scale) // self.config.mod_value) * self.config.mod_value return new_h, new_w def validate_inputs(self, image: Image.Image, prompt: str, height: int, width: int, duration: float, steps: int) -> Tuple[bool, Optional[str]]: if image is None: return False, "🖼️ Please upload an input image" if not prompt or len(prompt.strip()) == 0: return False, "✍️ Please provide a prompt" if len(prompt) > 200: # 더 짧은 프롬프트 제한 return False, "⚠️ Prompt is too long (max 200 characters)" # Zero GPU에 최적화된 제한 if duration < 0.3: return False, "⏱️ Duration too short (min 0.3s)" if duration > 1.2: # 더 짧은 최대 duration return False, "⏱️ Duration too long (max 1.2s for stability)" # 픽셀 수 제한 (더 보수적으로) max_pixels = 320 * 320 # 102,400 픽셀 if height * width > max_pixels: return False, f"📐 Total pixels limited to {max_pixels:,} (e.g., 320×320, 256×384)" if height > 512 or width > 512: # 더 낮은 최대값 return False, "📐 Maximum dimension is 512 pixels" # 종횡비 체크 aspect_ratio = max(height/width, width/height) if aspect_ratio > 2.0: return False, "📐 Aspect ratio too extreme (max 2:1 or 1:2)" if steps > 5: # 더 낮은 최대 스텝 return False, "🔧 Maximum 5 steps in Zero GPU environment" return True, None def generate_unique_filename(self, seed: int) -> str: timestamp = int(time.time()) unique_str = f"{timestamp}_{seed}_{random.randint(1000, 9999)}" hash_obj = hashlib.md5(unique_str.encode()) return f"video_{hash_obj.hexdigest()[:8]}.mp4" video_generator = VideoGenerator(config) # Gradio 함수들 def handle_image_upload(image): if image is None: return gr.update(value=config.default_height), gr.update(value=config.default_width) try: if not isinstance(image, Image.Image): raise ValueError("Invalid image format") new_h, new_w = video_generator.calculate_dimensions(image) return gr.update(value=new_h), gr.update(value=new_w) except Exception as e: logger.error(f"Error processing image: {e}") gr.Warning("⚠️ Error processing image") return gr.update(value=config.default_height), gr.update(value=config.default_width) def get_duration(input_image, prompt, height, width, negative_prompt, duration_seconds, guidance_scale, steps, seed, randomize_seed, progress): # Zero GPU 환경에서 매우 보수적인 시간 할당 base_duration = 50 # 기본 50초로 증가 # 픽셀 수에 따른 추가 시간 pixels = height * width if pixels > 147456: # 384x384 이상 base_duration += 20 elif pixels > 100000: # ~316x316 이상 base_duration += 10 # 스텝 수에 따른 추가 시간 if steps > 4: base_duration += 15 elif steps > 2: base_duration += 10 # 종횡비가 극단적인 경우 추가 시간 aspect_ratio = max(height/width, width/height) if aspect_ratio > 1.5: # 3:2 이상의 비율 base_duration += 10 # 최대 90초로 제한 return min(base_duration, 90) @spaces.GPU(duration=get_duration) @measure_time def generate_video(input_image, prompt, height, width, negative_prompt=config.default_negative_prompt, duration_seconds=0.8, guidance_scale=1, steps=3, seed=42, randomize_seed=False, progress=gr.Progress(track_tqdm=True)): global pipe # 동시 실행 방지 if not generation_lock.acquire(blocking=False): raise gr.Error("⏳ Another video is being generated. Please wait...") try: progress(0.05, desc="🔍 Validating inputs...") logger.info(f"Starting generation - Resolution: {height}x{width}, Duration: {duration_seconds}s, Steps: {steps}") # 입력 검증 is_valid, error_msg = video_generator.validate_inputs( input_image, prompt, height, width, duration_seconds, steps ) if not is_valid: logger.warning(f"Validation failed: {error_msg}") raise gr.Error(error_msg) # 메모리 정리 clear_gpu_memory() progress(0.1, desc="🚀 Loading model...") # 모델 로딩 (GPU 함수 내에서) if pipe is None: try: logger.info("Loading model components...") # 컴포넌트 로드 image_encoder = CLIPVisionModel.from_pretrained( config.model_id, subfolder="image_encoder", torch_dtype=torch.float16, low_cpu_mem_usage=True ) vae = AutoencoderKLWan.from_pretrained( config.model_id, subfolder="vae", torch_dtype=torch.float16, low_cpu_mem_usage=True ) pipe = WanImageToVideoPipeline.from_pretrained( config.model_id, vae=vae, image_encoder=image_encoder, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True, use_safetensors=True ) # 스케줄러 설정 pipe.scheduler = UniPCMultistepScheduler.from_config( pipe.scheduler.config, flow_shift=8.0 ) # LoRA 로드 건너뛰기 (안정성을 위해) logger.info("Skipping LoRA for stability") # GPU로 이동 pipe.to("cuda") # 최적화 활성화 pipe.enable_vae_slicing() pipe.enable_vae_tiling() # 모델 CPU 오프로드 활성화 (메모리 절약) pipe.enable_model_cpu_offload() logger.info("Model loaded successfully") except Exception as e: logger.error(f"Model loading failed: {e}") raise gr.Error("Failed to load model") progress(0.3, desc="🎯 Preparing image...") # 이미지 준비 target_h = max(config.mod_value, (int(height) // config.mod_value) * config.mod_value) target_w = max(config.mod_value, (int(width) // config.mod_value) * config.mod_value) # 프레임 수 계산 (매우 보수적) num_frames = min( int(round(duration_seconds * config.fixed_fps)), 24 # 최대 24프레임 (1초) ) num_frames = max(8, num_frames) # 최소 8프레임 logger.info(f"Generating {num_frames} frames at {target_h}x{target_w}") current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) # 이미지 리사이즈 resized_image = input_image.resize((target_w, target_h), Image.Resampling.LANCZOS) progress(0.4, desc="🎬 Generating video...") # 비디오 생성 with torch.inference_mode(), torch.amp.autocast('cuda', enabled=True, dtype=torch.float16): try: # 메모리 효율을 위한 설정 torch.cuda.empty_cache() # 생성 파라미터 최적화 output_frames_list = pipe( image=resized_image, prompt=prompt[:150], # 프롬프트 길이 제한 negative_prompt=negative_prompt[:50] if negative_prompt else "", height=target_h, width=target_w, num_frames=num_frames, guidance_scale=float(guidance_scale), num_inference_steps=int(steps), generator=torch.Generator(device="cuda").manual_seed(current_seed), return_dict=True, # 추가 최적화 파라미터 output_type="pil" ).frames[0] logger.info("Video generation completed successfully") except torch.cuda.OutOfMemoryError: logger.error("GPU OOM error") clear_gpu_memory() raise gr.Error("💾 GPU out of memory. Try smaller dimensions (256x256 recommended).") except RuntimeError as e: if "out of memory" in str(e).lower(): logger.error("Runtime OOM error") clear_gpu_memory() raise gr.Error("💾 GPU memory error. Please try again with smaller settings.") else: logger.error(f"Runtime error: {e}") raise gr.Error(f"❌ Generation failed: {str(e)[:50]}") except Exception as e: logger.error(f"Generation error: {type(e).__name__}: {e}") raise gr.Error(f"❌ Generation failed. Try reducing resolution or steps.") progress(0.9, desc="💾 Saving video...") # 비디오 저장 try: filename = video_generator.generate_unique_filename(current_seed) with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: video_path = tmpfile.name export_to_video(output_frames_list, video_path, fps=config.fixed_fps) logger.info(f"Video saved: {video_path}") except Exception as e: logger.error(f"Save error: {e}") raise gr.Error("Failed to save video") progress(1.0, desc="✨ Complete!") logger.info(f"Video generated: {num_frames} frames, {target_h}x{target_w}") # 메모리 정리 del output_frames_list del resized_image torch.cuda.empty_cache() gc.collect() return video_path, current_seed except gr.Error: raise except Exception as e: logger.error(f"Unexpected error: {type(e).__name__}: {e}") raise gr.Error(f"❌ Unexpected error. Please try again with smaller settings.") finally: generation_lock.release() clear_gpu_memory() # CSS css = """ .container { max-width: 1000px; margin: auto; padding: 20px; } .header { text-align: center; margin-bottom: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 30px; border-radius: 15px; color: white; box-shadow: 0 5px 15px rgba(0,0,0,0.2); } .header h1 { font-size: 2.5em; margin-bottom: 10px; } .warning-box { background: #fff3cd; border: 1px solid #ffeaa7; border-radius: 8px; padding: 12px; margin: 10px 0; color: #856404; font-size: 0.9em; } .generate-btn { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; font-size: 1.2em; padding: 12px 30px; border-radius: 25px; border: none; cursor: pointer; width: 100%; margin-top: 15px; } .generate-btn:hover { transform: translateY(-2px); box-shadow: 0 5px 15px rgba(102, 126, 234, 0.4); } """ # Gradio UI with gr.Blocks(css=css, theme=gr.themes.Soft()) as demo: with gr.Column(elem_classes="container"): # Header gr.HTML("""
Transform images into videos with Wan 2.1 (Zero GPU Optimized)