Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import subprocess | |
| import asyncio | |
| from PIL import Image | |
| from typing import Optional, List | |
| import traceback | |
| import sys | |
| from src.core.parse_video import ( | |
| get_images_from_video, | |
| image_with_most_non_black_space | |
| ) | |
| from mllm_tools.vertex_ai import VertexAIWrapper | |
| from mllm_tools.gemini import GeminiWrapper | |
| class VideoRenderer: | |
| """Class for rendering and combining Manim animation videos.""" | |
| def __init__(self, output_dir="output", print_response=False, use_visual_fix_code=False): | |
| """Initialize the VideoRenderer. | |
| Args: | |
| output_dir (str, optional): Directory for output files. Defaults to "output". | |
| print_response (bool, optional): Whether to print responses. Defaults to False. | |
| use_visual_fix_code (bool, optional): Whether to use visual fix code. Defaults to False. | |
| """ | |
| self.output_dir = output_dir | |
| self.print_response = print_response | |
| self.use_visual_fix_code = use_visual_fix_code | |
| async def render_scene(self, code: str, file_prefix: str, curr_scene: int, curr_version: int, code_dir: str, media_dir: str, max_retries: int = 3, use_visual_fix_code=False, visual_self_reflection_func=None, banned_reasonings=None, scene_trace_id=None, topic=None, session_id=None): | |
| """Render a single scene and handle error retries and visual fixes. | |
| Args: | |
| code (str): The Manim code to render | |
| file_prefix (str): Prefix for output files | |
| curr_scene (int): Current scene number | |
| curr_version (int): Current version number | |
| code_dir (str): Directory for code files | |
| media_dir (str): Directory for media output | |
| max_retries (int, optional): Maximum retry attempts. Defaults to 3. | |
| use_visual_fix_code (bool, optional): Whether to use visual fix code. Defaults to False. | |
| visual_self_reflection_func (callable, optional): Function for visual self-reflection. Defaults to None. | |
| banned_reasonings (list, optional): List of banned reasoning strings. Defaults to None. | |
| scene_trace_id (str, optional): Scene trace identifier. Defaults to None. | |
| topic (str, optional): Topic name. Defaults to None. | |
| session_id (str, optional): Session identifier. Defaults to None. | |
| Returns: | |
| tuple: (code, error_message) where error_message is None on success | |
| """ | |
| retries = 0 | |
| while retries < max_retries: | |
| try: | |
| # Execute manim in a thread to prevent blocking | |
| file_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py") | |
| result = await asyncio.to_thread( | |
| subprocess.run, | |
| ["manim", "-qh", file_path, "--media_dir", media_dir, "--progress_bar", "none"], | |
| capture_output=True, | |
| text=True | |
| ) | |
| # if result.returncode != 0, it means that the code is not rendered successfully | |
| # so we need to fix the code by returning the code and the error message | |
| if result.returncode != 0: | |
| raise Exception(result.stderr) | |
| if use_visual_fix_code and visual_self_reflection_func and banned_reasonings: | |
| # Get the rendered video path | |
| video_path = os.path.join( | |
| media_dir, | |
| "videos", | |
| f"{file_prefix}_scene{curr_scene}_v{curr_version}.mp4" | |
| ) | |
| # For Gemini/Vertex AI models, pass the video directly | |
| if self.scene_model.model_name.startswith(('gemini/', 'vertex_ai/')): | |
| media_input = video_path | |
| else: | |
| # For other models, use image snapshot | |
| media_input = self.create_snapshot_scene( | |
| topic, curr_scene, curr_version, return_type="path" | |
| ) | |
| new_code, log = visual_self_reflection_func( | |
| code, | |
| media_input, | |
| scene_trace_id=scene_trace_id, | |
| topic=topic, | |
| scene_number=curr_scene, | |
| session_id=session_id | |
| ) | |
| with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_vfix_log.txt"), "w") as f: | |
| f.write(log) | |
| # Check for termination markers | |
| if "<LGTM>" in new_code or any(word in new_code for word in banned_reasonings): | |
| break | |
| code = new_code | |
| curr_version += 1 | |
| with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py"), "w") as f: | |
| f.write(code) | |
| print(f"Code saved to scene{curr_scene}/code/{file_prefix}_scene{curr_scene}_v{curr_version}.py") | |
| retries = 0 | |
| continue | |
| break # Exit retry loop on success | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| print(f"Retrying {retries+1} of {max_retries}...") | |
| with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_error.log"), "a") as f: | |
| f.write(f"\nError in attempt {retries}:\n{str(e)}\n") | |
| retries += 1 | |
| return code, str(e) # Indicate failure and return error message | |
| print(f"Successfully rendered {file_path}") | |
| with open(os.path.join(self.output_dir, file_prefix, f"scene{curr_scene}", "succ_rendered.txt"), "w") as f: | |
| f.write("") | |
| return code, None # Indicate success | |
| def run_manim_process(self, | |
| topic: str): | |
| """Run manim on all generated manim code for a specific topic. | |
| Args: | |
| topic (str): Topic name to process | |
| Returns: | |
| subprocess.CompletedProcess: Result of the final manim process | |
| """ | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| search_path = os.path.join(self.output_dir, file_prefix) | |
| # Iterate through scene folders | |
| scene_folders = [f for f in os.listdir(search_path) if os.path.isdir(os.path.join(search_path, f))] | |
| scene_folders.sort() # Sort to process scenes in order | |
| for folder in scene_folders: | |
| folder_path = os.path.join(search_path, folder) | |
| # Get all Python files in version order | |
| py_files = [f for f in os.listdir(folder_path) if f.endswith('.py')] | |
| py_files.sort(key=lambda x: int(x.split('_v')[-1].split('.')[0])) # Sort by version number | |
| for file in py_files: | |
| file_path = os.path.join(folder_path, file) | |
| try: | |
| media_dir = os.path.join(self.output_dir, file_prefix, "media") | |
| result = subprocess.run( | |
| f"manim -qh {file_path} --media_dir {media_dir}", | |
| shell=True, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| raise Exception(result.stderr) | |
| print(f"Successfully rendered {file}") | |
| break # Move to next scene folder if successful | |
| except Exception as e: | |
| print(f"Error rendering {file}: {e}") | |
| error_log_path = os.path.join(folder_path, f"{file.split('.')[0]}_error.log") # drop the extra py | |
| with open(error_log_path, "w") as f: | |
| f.write(f"Error:\n{str(e)}\n") | |
| print(f"Error log saved to {error_log_path}") | |
| return result | |
| def create_snapshot_scene(self, topic: str, scene_number: int, version_number: int, return_type: str = "image"): | |
| """Create a snapshot of the video for a specific topic and scene. | |
| Args: | |
| topic (str): Topic name | |
| scene_number (int): Scene number | |
| version_number (int): Version number | |
| return_type (str, optional): Type of return value - "path" or "image". Defaults to "image". | |
| Returns: | |
| Union[str, PIL.Image]: Path to saved image or PIL Image object | |
| Raises: | |
| FileNotFoundError: If no mp4 files found in video folder | |
| """ | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| search_path = os.path.join(self.output_dir, file_prefix) | |
| video_folder_path = os.path.join(search_path, "media", "videos", f"{file_prefix}_scene{scene_number}_v{version_number}", "1080p60") | |
| os.makedirs(video_folder_path, exist_ok=True) | |
| snapshot_path = os.path.join(video_folder_path, "snapshot.png") | |
| # Get the mp4 video file from the video folder path | |
| video_files = [f for f in os.listdir(video_folder_path) if f.endswith('.mp4')] | |
| if not video_files: | |
| raise FileNotFoundError(f"No mp4 files found in {video_folder_path}") | |
| video_path = os.path.join(video_folder_path, video_files[0]) | |
| saved_image = image_with_most_non_black_space(get_images_from_video(video_path), snapshot_path, return_type=return_type) | |
| return saved_image | |
| def combine_videos(self, topic: str): | |
| """Combine all videos and subtitle files for a specific topic using ffmpeg. | |
| Args: | |
| topic (str): Topic name to combine videos for | |
| This function will: | |
| - Find all scene videos and subtitles | |
| - Combine videos with or without audio | |
| - Merge subtitle files with correct timing | |
| - Save combined video and subtitles to output directory | |
| """ | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| search_path = os.path.join(self.output_dir, file_prefix, "media", "videos") | |
| # Create output directory if it doesn't exist | |
| video_output_dir = os.path.join(self.output_dir, file_prefix) | |
| os.makedirs(video_output_dir, exist_ok=True) | |
| output_video_path = os.path.join(video_output_dir, f"{file_prefix}_combined.mp4") | |
| output_srt_path = os.path.join(video_output_dir, f"{file_prefix}_combined.srt") | |
| if os.path.exists(output_video_path) and os.path.exists(output_srt_path): | |
| print(f"Combined video and subtitles already exist at {output_video_path}, not combining again.") | |
| return | |
| # Get scene count from outline | |
| scene_outline_path = os.path.join(self.output_dir, file_prefix, f"{file_prefix}_scene_outline.txt") | |
| if not os.path.exists(scene_outline_path): | |
| print(f"Warning: Scene outline file not found at {scene_outline_path}. Cannot determine scene count.") | |
| return | |
| with open(scene_outline_path) as f: | |
| plan = f.read() | |
| scene_outline = re.search(r'(<SCENE_OUTLINE>.*?</SCENE_OUTLINE>)', plan, re.DOTALL).group(1) | |
| scene_count = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline)) | |
| # Find all scene folders and videos | |
| scene_folders = [] | |
| for root, dirs, files in os.walk(search_path): | |
| for dir in dirs: | |
| if dir.startswith(file_prefix + "_scene"): | |
| scene_folders.append(os.path.join(root, dir)) | |
| scene_videos = [] | |
| scene_subtitles = [] | |
| for scene_num in range(1, scene_count + 1): | |
| folders = [f for f in scene_folders if int(f.split("scene")[-1].split("_")[0]) == scene_num] | |
| if not folders: | |
| print(f"Warning: Missing scene {scene_num}") | |
| continue | |
| folders.sort(key=lambda f: int(f.split("_v")[-1])) | |
| folder = folders[-1] | |
| video_found = False | |
| subtitles_found = False | |
| for filename in os.listdir(os.path.join(folder, "1080p60")): | |
| if filename.endswith('.mp4'): | |
| scene_videos.append(os.path.join(folder, "1080p60", filename)) | |
| video_found = True | |
| elif filename.endswith('.srt'): | |
| scene_subtitles.append(os.path.join(folder, "1080p60", filename)) | |
| subtitles_found = True | |
| if not video_found: | |
| print(f"Warning: Missing video for scene {scene_num}") | |
| if not subtitles_found: | |
| scene_subtitles.append(None) | |
| if len(scene_videos) != scene_count: | |
| print("Not all videos/subtitles are found, aborting video combination.") | |
| return | |
| try: | |
| import ffmpeg # You might need to install ffmpeg-python package: pip install ffmpeg-python | |
| from tqdm import tqdm | |
| print("Analyzing video streams...") | |
| # Check if videos have audio streams | |
| has_audio = [] | |
| for video in tqdm(scene_videos, desc="Checking audio streams"): | |
| probe = ffmpeg.probe(video) | |
| audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio'] | |
| has_audio.append(len(audio_streams) > 0) | |
| print("Preparing video combination...") | |
| # If any video has audio, we need to ensure all videos have audio streams | |
| if any(has_audio): | |
| # Create list to store video and audio streams | |
| streams = [] | |
| for video, has_aud in tqdm(list(zip(scene_videos, has_audio)), desc="Processing videos"): | |
| if has_aud: | |
| # Video has audio, use as is | |
| input_vid = ffmpeg.input(video) | |
| streams.extend([input_vid['v'], input_vid['a']]) | |
| else: | |
| # Video lacks audio, add silent audio | |
| input_vid = ffmpeg.input(video) | |
| # Generate silent audio for the duration of the video | |
| probe = ffmpeg.probe(video) | |
| duration = float(probe['streams'][0]['duration']) | |
| silent_audio = ffmpeg.input(f'anullsrc=channel_layout=stereo:sample_rate=44100', | |
| f='lavfi', t=duration)['a'] | |
| streams.extend([input_vid['v'], silent_audio]) | |
| print("Combining videos with audio...") | |
| try: | |
| # Concatenate all streams using optimized CPU encoding settings | |
| concat = ffmpeg.concat(*streams, v=1, a=1, unsafe=True) | |
| process = ( | |
| concat | |
| .output(output_video_path, | |
| **{'c:v': 'libx264', | |
| 'c:a': 'aac', | |
| 'preset': 'veryfast', # Changed from ultrafast for better speed/quality balance | |
| 'crf': '28', # Same quality setting | |
| 'threads': '0', # Use all CPU threads | |
| 'tune': 'fastdecode', # Optimize for decoding speed | |
| 'profile:v': 'baseline', # Simpler profile for faster encoding | |
| 'level': '4.0', | |
| 'x264-params': 'aq-mode=0:no-deblock:no-cabac:ref=1:subme=0:trellis=0:weightp=0', # Added aggressive speed optimizations | |
| 'movflags': '+faststart', | |
| 'stats': None, | |
| 'progress': 'pipe:1'}) | |
| .overwrite_output() | |
| .run_async(pipe_stdout=True, pipe_stderr=True) | |
| ) | |
| # Process progress output | |
| while True: | |
| line = process.stdout.readline().decode('utf-8') | |
| if not line: | |
| break | |
| if 'frame=' in line: | |
| sys.stdout.write('\rProcessing: ' + line.strip()) | |
| sys.stdout.flush() | |
| # Wait for the process to complete and capture output | |
| stdout, stderr = process.communicate() | |
| print("\nEncoding complete!") | |
| except ffmpeg.Error as e: | |
| print(f"FFmpeg stdout:\n{e.stdout.decode('utf8')}") | |
| print(f"FFmpeg stderr:\n{e.stderr.decode('utf8')}") | |
| raise | |
| else: | |
| # No videos have audio, concatenate video streams only | |
| streams = [] | |
| for video in tqdm(scene_videos, desc="Processing videos"): | |
| streams.append(ffmpeg.input(video)['v']) | |
| print("Combining videos without audio...") | |
| try: | |
| concat = ffmpeg.concat(*streams, v=1, unsafe=True) | |
| process = ( | |
| concat | |
| .output(output_video_path, | |
| **{'c:v': 'libx264', | |
| 'preset': 'medium', | |
| 'crf': '23', | |
| 'stats': None, # Enable progress stats | |
| 'progress': 'pipe:1'}) # Output progress to pipe | |
| .overwrite_output() | |
| .run_async(pipe_stdout=True, pipe_stderr=True) | |
| ) | |
| # Process progress output | |
| while True: | |
| line = process.stdout.readline().decode('utf-8') | |
| if not line: | |
| break | |
| if 'frame=' in line: | |
| sys.stdout.write('\rProcessing: ' + line.strip()) | |
| sys.stdout.flush() | |
| # Wait for the process to complete and capture output | |
| stdout, stderr = process.communicate() | |
| print("\nEncoding complete!") | |
| except ffmpeg.Error as e: | |
| print(f"FFmpeg stdout:\n{e.stdout.decode('utf8')}") | |
| print(f"FFmpeg stderr:\n{e.stderr.decode('utf8')}") | |
| raise | |
| print(f"Successfully combined videos into {output_video_path}") | |
| # Handle subtitle combination (existing subtitle code remains the same) | |
| if scene_subtitles: | |
| with open(output_srt_path, 'w', encoding='utf-8') as outfile: | |
| current_time_offset = 0 | |
| subtitle_index = 1 | |
| for srt_file, video_file in zip(scene_subtitles, scene_videos): | |
| if srt_file is None: | |
| continue | |
| with open(srt_file, 'r', encoding='utf-8') as infile: | |
| lines = infile.readlines() | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| if line.isdigit(): # Subtitle index | |
| outfile.write(f"{subtitle_index}\n") | |
| subtitle_index += 1 | |
| i += 1 | |
| # Time codes line | |
| time_line = lines[i].strip() | |
| start_time, end_time = time_line.split(' --> ') | |
| # Convert time codes and add offset | |
| def adjust_time(time_str, offset): | |
| h, m, s = time_str.replace(',', '.').split(':') | |
| total_seconds = float(h) * 3600 + float(m) * 60 + float(s) + offset | |
| h = int(total_seconds // 3600) | |
| m = int((total_seconds % 3600) // 60) | |
| s = total_seconds % 60 | |
| return f"{h:02d}:{m:02d}:{s:06.3f}".replace('.', ',') | |
| new_start = adjust_time(start_time, current_time_offset) | |
| new_end = adjust_time(end_time, current_time_offset) | |
| outfile.write(f"{new_start} --> {new_end}\n") | |
| i += 1 | |
| # Subtitle text (could be multiple lines) | |
| while i < len(lines) and lines[i].strip(): | |
| outfile.write(lines[i]) | |
| i += 1 | |
| outfile.write('\n') | |
| else: | |
| i += 1 | |
| # Update time offset using ffprobe | |
| probe = ffmpeg.probe(video_file) | |
| duration = float(probe['streams'][0]['duration']) | |
| current_time_offset += duration | |
| print(f"Successfully combined videos into {output_video_path}") | |
| if scene_subtitles: | |
| print(f"Successfully combined subtitles into {output_srt_path}") | |
| except Exception as e: | |
| print(f"Error combining videos and subtitles: {e}") | |
| traceback.print_exc() |