Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import random | |
| from typing import Union, List, Dict, Optional | |
| import subprocess | |
| import argparse | |
| import glob | |
| from PIL import Image | |
| import re | |
| from dotenv import load_dotenv | |
| import asyncio | |
| import uuid # Import uuid for generating trace_id | |
| from mllm_tools.litellm import LiteLLMWrapper | |
| from mllm_tools.utils import _prepare_text_inputs # Keep _prepare_text_inputs if still used directly in main | |
| # Import new modules | |
| from src.core.video_planner import VideoPlanner | |
| from src.core.code_generator import CodeGenerator | |
| from src.core.video_renderer import VideoRenderer | |
| from src.utils.utils import _print_response, _extract_code, extract_xml # Import utility functions | |
| from src.config.config import Config # Import Config class | |
| # Video parsing | |
| from src.core.parse_video import ( | |
| get_images_from_video, | |
| image_with_most_non_black_space | |
| ) | |
| from task_generator import get_banned_reasonings | |
| from task_generator.prompts_raw import (_code_font_size, _code_disable, _code_limit, _prompt_manim_cheatsheet) | |
| # Load allowed models list from JSON file | |
| allowed_models_path = os.path.join(os.path.dirname(__file__), 'src', 'utils', 'allowed_models.json') | |
| with open(allowed_models_path, 'r') as f: | |
| allowed_models = json.load(f).get("allowed_models", []) | |
| load_dotenv(override=True) | |
| class VideoGenerator: | |
| """ | |
| A class for generating manim videos using AI models. | |
| This class coordinates the video generation pipeline by managing scene planning, | |
| code generation, and video rendering. It supports concurrent scene processing, | |
| visual code fixing, and RAG (Retrieval Augmented Generation). | |
| Args: | |
| planner_model: Model used for scene planning and high-level decisions | |
| scene_model: Model used specifically for scene generation (defaults to planner_model) | |
| helper_model: Helper model for additional tasks (defaults to planner_model) | |
| output_dir (str): Directory to store generated files and videos | |
| verbose (bool): Whether to print detailed output | |
| use_rag (bool): Whether to use Retrieval Augmented Generation | |
| use_context_learning (bool): Whether to use context learning with example code | |
| context_learning_path (str): Path to context learning examples | |
| chroma_db_path (str): Path to ChromaDB for RAG | |
| manim_docs_path (str): Path to Manim documentation for RAG | |
| embedding_model (str): Model to use for embeddings | |
| use_visual_fix_code (bool): Whether to use visual feedback for code fixing | |
| use_langfuse (bool): Whether to enable Langfuse logging | |
| trace_id (str, optional): Trace ID for logging | |
| max_scene_concurrency (int): Maximum number of scenes to process concurrently | |
| Attributes: | |
| output_dir (str): Directory for output files | |
| verbose (bool): Verbosity flag | |
| use_visual_fix_code (bool): Visual code fixing flag | |
| session_id (str): Unique session identifier | |
| scene_semaphore (asyncio.Semaphore): Controls concurrent scene processing | |
| banned_reasonings (list): List of banned reasoning patterns | |
| planner (VideoPlanner): Handles scene planning | |
| code_generator (CodeGenerator): Handles code generation | |
| video_renderer (VideoRenderer): Handles video rendering | |
| """ | |
| def __init__(self, | |
| planner_model, | |
| scene_model=None, | |
| helper_model=None, | |
| output_dir="output", | |
| verbose=False, | |
| use_rag=False, | |
| use_context_learning=False, | |
| context_learning_path="data/context_learning", | |
| chroma_db_path="data/rag/chroma_db", | |
| manim_docs_path="data/rag/manim_docs", | |
| embedding_model="azure/text-embedding-3-large", | |
| use_visual_fix_code=False, | |
| use_langfuse=True, | |
| trace_id=None, | |
| max_scene_concurrency: int = 5): | |
| self.output_dir = output_dir | |
| self.verbose = verbose | |
| self.use_visual_fix_code = use_visual_fix_code | |
| self.session_id = self._load_or_create_session_id() # Modified to load existing or create new | |
| self.scene_semaphore = asyncio.Semaphore(max_scene_concurrency) | |
| self.banned_reasonings = get_banned_reasonings() | |
| # Initialize separate modules | |
| self.planner = VideoPlanner( | |
| planner_model=planner_model, | |
| helper_model=helper_model, | |
| output_dir=output_dir, | |
| print_response=verbose, | |
| use_context_learning=use_context_learning, | |
| context_learning_path=context_learning_path, | |
| use_rag=use_rag, | |
| session_id=self.session_id, | |
| chroma_db_path=chroma_db_path, | |
| manim_docs_path=manim_docs_path, | |
| embedding_model=embedding_model, | |
| use_langfuse=use_langfuse | |
| ) | |
| self.code_generator = CodeGenerator( | |
| scene_model=scene_model if scene_model is not None else planner_model, | |
| helper_model=helper_model if helper_model is not None else planner_model, | |
| output_dir=output_dir, | |
| print_response=verbose, | |
| use_rag=use_rag, | |
| use_context_learning=use_context_learning, | |
| context_learning_path=context_learning_path, | |
| chroma_db_path=chroma_db_path, | |
| manim_docs_path=manim_docs_path, | |
| embedding_model=embedding_model, | |
| use_visual_fix_code=use_visual_fix_code, | |
| use_langfuse=use_langfuse, | |
| session_id=self.session_id | |
| ) | |
| self.video_renderer = VideoRenderer( | |
| output_dir=output_dir, | |
| print_response=verbose, | |
| use_visual_fix_code=use_visual_fix_code | |
| ) | |
| def _load_or_create_session_id(self) -> str: | |
| """ | |
| Load existing session ID from file or create a new one. | |
| Returns: | |
| str: The session ID either loaded from file or newly created. | |
| """ | |
| session_file = os.path.join(self.output_dir, "session_id.txt") | |
| if os.path.exists(session_file): | |
| with open(session_file, 'r') as f: | |
| session_id = f.read().strip() | |
| print(f"Loaded existing session ID: {session_id}") | |
| return session_id | |
| # Create new session ID if none exists | |
| session_id = str(uuid.uuid4()) | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| with open(session_file, 'w') as f: | |
| f.write(session_id) | |
| print(f"Created new session ID: {session_id}") | |
| return session_id | |
| def _save_topic_session_id(self, topic: str, session_id: str) -> None: | |
| """ | |
| Save session ID for a specific topic. | |
| Args: | |
| topic (str): The topic to save the session ID for | |
| session_id (str): The session ID to save | |
| """ | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| topic_dir = os.path.join(self.output_dir, file_prefix) | |
| os.makedirs(topic_dir, exist_ok=True) | |
| session_file = os.path.join(topic_dir, "session_id.txt") | |
| with open(session_file, 'w') as f: | |
| f.write(session_id) | |
| def _load_topic_session_id(self, topic: str) -> Optional[str]: | |
| """ | |
| Load session ID for a specific topic if it exists. | |
| Args: | |
| topic (str): The topic to load the session ID for | |
| Returns: | |
| Optional[str]: The session ID if found, None otherwise | |
| """ | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| session_file = os.path.join(self.output_dir, file_prefix, "session_id.txt") | |
| if os.path.exists(session_file): | |
| with open(session_file, 'r') as f: | |
| return f.read().strip() | |
| return None | |
| def generate_scene_outline(self, | |
| topic: str, | |
| description: str, | |
| session_id: str) -> str: | |
| """ | |
| Generate scene outline using VideoPlanner. | |
| Args: | |
| topic (str): The topic of the video | |
| description (str): Description of the video content | |
| session_id (str): Session identifier for tracking | |
| Returns: | |
| str: Generated scene outline | |
| """ | |
| return self.planner.generate_scene_outline(topic, description, session_id) | |
| async def generate_scene_implementation(self, | |
| topic: str, | |
| description: str, | |
| plan: str, | |
| session_id: str) -> List[str]: | |
| """ | |
| Generate scene implementations using VideoPlanner. | |
| Args: | |
| topic (str): The topic of the video | |
| description (str): Description of the video content | |
| plan (str): The scene plan to implement | |
| session_id (str): Session identifier for tracking | |
| Returns: | |
| List[str]: List of generated scene implementations | |
| """ | |
| return await self.planner.generate_scene_implementation(topic, description, plan, session_id) | |
| async def generate_scene_implementation_concurrently(self, | |
| topic: str, | |
| description: str, | |
| plan: str, | |
| session_id: str) -> List[str]: | |
| """ | |
| Generate scene implementations concurrently using VideoPlanner. | |
| Args: | |
| topic (str): The topic of the video | |
| description (str): Description of the video content | |
| plan (str): The scene plan to implement | |
| session_id (str): Session identifier for tracking | |
| Returns: | |
| List[str]: List of generated scene implementations | |
| """ | |
| return await self.planner.generate_scene_implementation_concurrently(topic, description, plan, session_id, self.scene_semaphore) # Pass semaphore | |
| def load_implementation_plans(self, topic: str) -> Dict[int, Optional[str]]: | |
| """ | |
| Load implementation plans for each scene. | |
| Args: | |
| topic (str): The topic to load implementation plans for | |
| Returns: | |
| Dict[int, Optional[str]]: Dictionary mapping scene numbers to their plans. | |
| If a scene's plan is missing, its value will be None. | |
| """ | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| # Load scene outline from file | |
| scene_outline_path = os.path.join(self.output_dir, file_prefix, f"{file_prefix}_scene_outline.txt") | |
| if not os.path.exists(scene_outline_path): | |
| return {} | |
| with open(scene_outline_path, "r") as f: | |
| scene_outline = f.read() | |
| # Extract scene outline to get number of scenes | |
| scene_outline_content = extract_xml(scene_outline) | |
| scene_number = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline_content)) | |
| print(f"Number of scenes: {scene_number}") | |
| implementation_plans = {} | |
| # Check each scene's implementation plan | |
| for i in range(1, scene_number + 1): | |
| plan_path = os.path.join(self.output_dir, file_prefix, f"scene{i}", f"{file_prefix}_scene{i}_implementation_plan.txt") | |
| if os.path.exists(plan_path): | |
| with open(plan_path, "r") as f: | |
| implementation_plans[i] = f.read() | |
| print(f"Found existing implementation plan for scene {i}") | |
| else: | |
| implementation_plans[i] = None | |
| print(f"Missing implementation plan for scene {i}") | |
| return implementation_plans | |
| async def render_video_fix_code(self, | |
| topic: str, | |
| description: str, | |
| scene_outline: str, | |
| implementation_plans: List, | |
| max_retries=3, | |
| session_id: str = None) -> None: | |
| """ | |
| Render the video for all scenes with code fixing capability. | |
| Args: | |
| topic (str): The topic of the video | |
| description (str): Description of the video content | |
| scene_outline (str): The overall scene outline | |
| implementation_plans (List): List of implementation plans for each scene | |
| max_retries (int, optional): Maximum number of code fix attempts. Defaults to 3. | |
| session_id (str, optional): Session identifier for tracking | |
| """ | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| # Create tasks for each scene | |
| tasks = [] | |
| for i, implementation_plan in enumerate(implementation_plans): | |
| # Try to load scene trace id, or generate new one if it doesn't exist | |
| scene_dir = os.path.join(self.output_dir, file_prefix, f"scene{i+1}") | |
| subplan_dir = os.path.join(scene_dir, "subplans") | |
| os.makedirs(subplan_dir, exist_ok=True) # Create directories if they don't exist | |
| scene_trace_id_path = os.path.join(subplan_dir, "scene_trace_id.txt") | |
| try: | |
| with open(scene_trace_id_path, 'r') as f: | |
| scene_trace_id = f.read().strip() | |
| except FileNotFoundError: | |
| scene_trace_id = str(uuid.uuid4()) | |
| with open(scene_trace_id_path, 'w') as f: | |
| f.write(scene_trace_id) | |
| task = self.process_scene(i, scene_outline, implementation_plan, topic, description, max_retries, file_prefix, session_id, scene_trace_id) | |
| tasks.append(task) | |
| # Execute all tasks concurrently | |
| await asyncio.gather(*tasks) | |
| async def process_scene(self, i: int, scene_outline: str, scene_implementation: str, topic: str, description: str, max_retries: int, file_prefix: str, session_id: str, scene_trace_id: str): # added scene_trace_id | |
| """ | |
| Process a single scene using CodeGenerator and VideoRenderer. | |
| Args: | |
| i (int): Scene index | |
| scene_outline (str): Overall scene outline | |
| scene_implementation (str): Implementation plan for this scene | |
| topic (str): The topic of the video | |
| description (str): Description of the video content | |
| max_retries (int): Maximum number of code fix attempts | |
| file_prefix (str): Prefix for file naming | |
| session_id (str): Session identifier for tracking | |
| scene_trace_id (str): Trace identifier for this scene | |
| """ | |
| curr_scene = i + 1 | |
| curr_version = 0 | |
| # scene_trace_id = str(uuid.uuid4()) # Remove uuid generation | |
| rag_queries_cache = {} # Initialize RAG queries cache | |
| # Create necessary directories | |
| code_dir = os.path.join(self.output_dir, file_prefix, f"scene{curr_scene}", "code") | |
| os.makedirs(code_dir, exist_ok=True) | |
| media_dir = os.path.join(self.output_dir, file_prefix, "media") # Define media_dir here | |
| async with self.scene_semaphore: | |
| # Step 3A: Generate initial manim code | |
| code, log = self.code_generator.generate_manim_code( | |
| topic=topic, | |
| description=description, | |
| scene_outline=scene_outline, | |
| scene_implementation=scene_implementation, | |
| scene_number=curr_scene, | |
| additional_context=[_prompt_manim_cheatsheet, _code_font_size, _code_limit, _code_disable], | |
| scene_trace_id=scene_trace_id, # Use passed scene_trace_id | |
| session_id=session_id, | |
| rag_queries_cache=rag_queries_cache # Pass the cache | |
| ) | |
| # Save initial code and log (file operations can be offloaded if needed) | |
| with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_init_log.txt"), "w") as f: | |
| f.write(log) | |
| with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py"), "w") as f: | |
| f.write(code) | |
| print(f"Code saved to {code_dir}/{file_prefix}_scene{curr_scene}_v{curr_version}.py") | |
| # Step 3B: Compile and fix code if needed | |
| error_message = None | |
| while True: # Retry loop controlled by break statements | |
| code, error_message = await self.video_renderer.render_scene( | |
| code=code, | |
| file_prefix=file_prefix, | |
| curr_scene=curr_scene, | |
| curr_version=curr_version, | |
| code_dir=code_dir, | |
| media_dir=media_dir, | |
| max_retries=max_retries, # Pass max_retries here if needed in render_scene | |
| use_visual_fix_code=self.use_visual_fix_code, | |
| visual_self_reflection_func=self.code_generator.visual_self_reflection, # Pass visual_self_reflection function | |
| banned_reasonings=self.banned_reasonings, # Pass banned reasonings | |
| scene_trace_id=scene_trace_id, | |
| topic=topic, | |
| session_id=session_id | |
| ) | |
| if error_message is None: # Render success if error_message is None | |
| break | |
| if curr_version >= max_retries: # Max retries reached | |
| print(f"Max retries reached for scene {curr_scene}, error: {error_message}") | |
| break # Exit retry loop | |
| curr_version += 1 | |
| # if program runs this, it means that the code is not rendered successfully | |
| code, log = self.code_generator.fix_code_errors( | |
| implementation_plan=scene_implementation, | |
| code=code, | |
| error=error_message, | |
| scene_trace_id=scene_trace_id, | |
| topic=topic, | |
| scene_number=curr_scene, | |
| session_id=session_id, | |
| rag_queries_cache=rag_queries_cache | |
| ) | |
| with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_fix_log.txt"), "w") as f: | |
| f.write(log) | |
| with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py"), "w") as f: | |
| f.write(code) | |
| print(f"Code saved to {code_dir}/{file_prefix}_scene{curr_scene}_v{curr_version}.py") | |
| def run_manim_process(self, | |
| topic: str): | |
| """ | |
| Run manim on all generated manim code for a specific topic using VideoRenderer. | |
| Args: | |
| topic (str): The topic to render videos for | |
| """ | |
| return self.video_renderer.run_manim_process(topic) | |
| def create_snapshot_scene(self, topic: str, scene_number: int, version_number: int, return_type: str = "image"): | |
| """ | |
| Create a snapshot of the video for a specific topic and scene using VideoRenderer. | |
| Args: | |
| topic (str): The topic of the video | |
| scene_number (int): Scene number to snapshot | |
| version_number (int): Version number to snapshot | |
| return_type (str, optional): Type of snapshot to return. Defaults to "image". | |
| Returns: | |
| The snapshot in the specified format | |
| """ | |
| return self.video_renderer.create_snapshot_scene(topic, scene_number, version_number, return_type) | |
| def combine_videos(self, topic: str): | |
| """ | |
| Combine all videos and subtitle files for a specific topic using VideoRenderer. | |
| Args: | |
| topic (str): The topic to combine videos for | |
| """ | |
| self.video_renderer.combine_videos(topic) | |
| async def _generate_scene_implementation_single(self, topic: str, description: str, scene_outline_i: str, i: int, file_prefix: str, session_id: str, scene_trace_id: str) -> str: | |
| """ | |
| Generate detailed implementation plan for a single scene using VideoPlanner. | |
| Args: | |
| topic (str): The topic of the video | |
| description (str): Description of the video content | |
| scene_outline_i (str): Outline for this specific scene | |
| i (int): Scene index | |
| file_prefix (str): Prefix for file naming | |
| session_id (str): Session identifier for tracking | |
| scene_trace_id (str): Trace identifier for this scene | |
| Returns: | |
| str: Generated implementation plan | |
| """ | |
| return await self.planner._generate_scene_implementation_single(topic, description, scene_outline_i, i, file_prefix, session_id, scene_trace_id) | |
| async def generate_video_pipeline(self, topic: str, description: str, max_retries: int, only_plan: bool = False, specific_scenes: List[int] = None): | |
| """ | |
| Modified pipeline to handle partial scene completions and option to only generate plans for specific scenes. | |
| Args: | |
| topic (str): The topic of the video | |
| description (str): Description of the video content | |
| max_retries (int): Maximum number of code fix attempts | |
| only_plan (bool, optional): Whether to only generate plans without rendering. Defaults to False. | |
| specific_scenes (List[int], optional): List of specific scenes to process. Defaults to None. | |
| """ | |
| session_id = self._load_or_create_session_id() | |
| self._save_topic_session_id(topic, session_id) | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| # Load or generate scene outline | |
| scene_outline_path = os.path.join(self.output_dir, file_prefix, f"{file_prefix}_scene_outline.txt") | |
| if os.path.exists(scene_outline_path): | |
| with open(scene_outline_path, "r") as f: | |
| scene_outline = f.read() | |
| print(f"Loaded existing scene outline for topic: {topic}") | |
| if self.planner.use_rag: | |
| self.planner.relevant_plugins = self.planner.rag_integration.detect_relevant_plugins(topic, description) or [] | |
| self.planner.rag_integration.set_relevant_plugins(self.planner.relevant_plugins) | |
| print(f"Detected relevant plugins: {self.planner.relevant_plugins}") | |
| else: | |
| print(f"Generating new scene outline for topic: {topic}") | |
| scene_outline = self.planner.generate_scene_outline(topic, description, session_id) | |
| os.makedirs(os.path.join(self.output_dir, file_prefix), exist_ok=True) | |
| with open(scene_outline_path, "w") as f: | |
| f.write(scene_outline) | |
| # Load or generate implementation plans | |
| implementation_plans_dict = self.load_implementation_plans(topic) | |
| if not implementation_plans_dict: | |
| scene_outline_content = extract_xml(scene_outline) | |
| scene_numbers = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline_content)) | |
| implementation_plans_dict = {i: None for i in range(1, scene_numbers + 1)} | |
| # Generate missing implementation plans for specified scenes or all missing scenes | |
| missing_scenes = [] | |
| for scene_num, plan in implementation_plans_dict.items(): | |
| if plan is None and (specific_scenes is None or scene_num in specific_scenes): | |
| missing_scenes.append(scene_num) | |
| if missing_scenes: | |
| print(f"Generating implementation plans for missing scenes: {missing_scenes}") | |
| for scene_num in missing_scenes: | |
| scene_outline_content = extract_xml(scene_outline) | |
| scene_match = re.search(f'<SCENE_{scene_num}>(.*?)</SCENE_{scene_num}>', scene_outline_content, re.DOTALL) | |
| if scene_match: | |
| scene_outline_i = scene_match.group(1) | |
| scene_trace_id = str(uuid.uuid4()) | |
| implementation_plan = await self._generate_scene_implementation_single( | |
| topic, description, scene_outline_i, scene_num, file_prefix, session_id, scene_trace_id) | |
| implementation_plans_dict[scene_num] = implementation_plan | |
| if only_plan: | |
| print(f"Only generating plans - skipping code generation and video rendering for topic: {topic}") | |
| return | |
| # Convert dictionary to list maintaining scene order | |
| sorted_scene_numbers = sorted(implementation_plans_dict.keys()) | |
| implementation_plans = [implementation_plans_dict[i] for i in sorted_scene_numbers] | |
| # Render scenes | |
| print(f"Starting video rendering for topic: {topic}") | |
| # Check which scenes need processing | |
| scenes_to_process = [] | |
| for i, implementation_plan in enumerate(implementation_plans): | |
| scene_dir = os.path.join(self.output_dir, file_prefix, f"scene{i+1}") | |
| code_dir = os.path.join(scene_dir, "code") | |
| # Check if scene has any code files | |
| has_code = False | |
| if os.path.exists(code_dir): | |
| if any(f.endswith('.py') for f in os.listdir(code_dir)): | |
| has_code = True | |
| # For only_render mode, only process scenes without code | |
| if args.only_render: | |
| if not has_code: | |
| scenes_to_process.append((i+1, implementation_plan)) | |
| print(f"Scene {i+1} has no code, will process") | |
| else: | |
| print(f"Scene {i+1} already has code, skipping") | |
| # For normal mode, process scenes that haven't been successfully rendered | |
| elif not os.path.exists(os.path.join(scene_dir, "succ_rendered.txt")): | |
| scenes_to_process.append((i+1, implementation_plan)) | |
| if not scenes_to_process: | |
| print(f"No scenes need processing for topic '{topic}'.") | |
| else: | |
| print(f"Rendering {len(scenes_to_process)} scenes that need processing...") | |
| # Create a list of tuples with scene numbers and plans | |
| scene_plans = [(scene_num, plan) for scene_num, plan in scenes_to_process] | |
| # Sort by scene number to ensure correct order | |
| scene_plans.sort(key=lambda x: x[0]) | |
| # Extract just the plans in the correct order | |
| filtered_implementation_plans = [plan for _, plan in scene_plans] | |
| await self.render_video_fix_code(topic, description, scene_outline, filtered_implementation_plans, | |
| max_retries=max_retries, session_id=session_id) | |
| if not args.only_render: # Skip video combination in only_render mode | |
| print(f"Video rendering completed for topic '{topic}'.") | |
| def check_theorem_status(self, theorem: Dict) -> Dict[str, bool]: | |
| """ | |
| Check if a theorem has its plan, code files, and rendered videos with detailed scene status. | |
| Args: | |
| theorem (Dict): Dictionary containing theorem information | |
| Returns: | |
| Dict[str, bool]: Dictionary containing status information for the theorem | |
| """ | |
| topic = theorem['theorem'] | |
| file_prefix = topic.lower() | |
| file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix) | |
| # Check scene outline | |
| scene_outline_path = os.path.join(self.output_dir, file_prefix, f"{file_prefix}_scene_outline.txt") | |
| has_scene_outline = os.path.exists(scene_outline_path) | |
| # Get number of scenes if outline exists | |
| num_scenes = 0 | |
| if has_scene_outline: | |
| with open(scene_outline_path, "r") as f: | |
| scene_outline = f.read() | |
| scene_outline_content = extract_xml(scene_outline) | |
| num_scenes = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline_content)) | |
| # Check implementation plans, code files, and rendered videos | |
| implementation_plans = 0 | |
| code_files = 0 | |
| rendered_scenes = 0 | |
| # Track status of individual scenes | |
| scene_status = [] | |
| for i in range(1, num_scenes + 1): | |
| scene_dir = os.path.join(self.output_dir, file_prefix, f"scene{i}") | |
| # Check implementation plan | |
| plan_path = os.path.join(scene_dir, f"{file_prefix}_scene{i}_implementation_plan.txt") | |
| has_plan = os.path.exists(plan_path) | |
| if has_plan: | |
| implementation_plans += 1 | |
| # Check code files | |
| code_dir = os.path.join(scene_dir, "code") | |
| has_code = False | |
| if os.path.exists(code_dir): | |
| if any(f.endswith('.py') for f in os.listdir(code_dir)): | |
| has_code = True | |
| code_files += 1 | |
| # Check rendered scene video | |
| has_render = False | |
| if os.path.exists(scene_dir): | |
| succ_rendered_path = os.path.join(scene_dir, "succ_rendered.txt") | |
| if os.path.exists(succ_rendered_path): | |
| has_render = True | |
| rendered_scenes += 1 | |
| scene_status.append({ | |
| 'scene_number': i, | |
| 'has_plan': has_plan, | |
| 'has_code': has_code, | |
| 'has_render': has_render | |
| }) | |
| # Check combined video | |
| combined_video_path = os.path.join(self.output_dir, file_prefix, f"{file_prefix}_combined.mp4") | |
| has_combined_video = os.path.exists(combined_video_path) | |
| return { | |
| 'topic': topic, | |
| 'has_scene_outline': has_scene_outline, | |
| 'total_scenes': num_scenes, | |
| 'implementation_plans': implementation_plans, | |
| 'code_files': code_files, | |
| 'rendered_scenes': rendered_scenes, | |
| 'has_combined_video': has_combined_video, | |
| 'scene_status': scene_status | |
| } | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description='Generate Manim videos using AI') | |
| parser.add_argument('--model', type=str, choices=allowed_models, | |
| default='gemini/gemini-1.5-pro-002', help='Select the AI model to use') | |
| parser.add_argument('--topic', type=str, default=None, help='Topic to generate videos for') | |
| parser.add_argument('--context', type=str, default=None, help='Context of the topic') | |
| parser.add_argument('--helper_model', type=str, choices=allowed_models, | |
| default=None, help='Select the helper model to use') | |
| parser.add_argument('--only_gen_vid', action='store_true', help='Only generate videos to existing plans') | |
| parser.add_argument('--only_combine', action='store_true', help='Only combine videos') | |
| parser.add_argument('--peek_existing_videos', '--peek', action='store_true', help='Peek at existing videos') | |
| parser.add_argument('--output_dir', type=str, default=Config.OUTPUT_DIR, help='Output directory') # Use Config | |
| parser.add_argument('--theorems_path', type=str, default=None, help='Path to theorems json file') | |
| parser.add_argument('--sample_size', '--sample', type=int, default=None, help='Number of theorems to sample') | |
| parser.add_argument('--verbose', action='store_true', help='Print verbose output') | |
| parser.add_argument('--max_retries', type=int, default=5, help='Maximum number of retries for code generation') | |
| parser.add_argument('--use_rag', '--rag', action='store_true', help='Use Retrieval Augmented Generation') | |
| parser.add_argument('--use_visual_fix_code','--visual_fix_code', action='store_true', help='Use VLM to fix code with rendered visuals') | |
| parser.add_argument('--chroma_db_path', type=str, default=Config.CHROMA_DB_PATH, help="Path to Chroma DB") # Use Config | |
| parser.add_argument('--manim_docs_path', type=str, default=Config.MANIM_DOCS_PATH, help="Path to manim docs") # Use Config | |
| parser.add_argument('--embedding_model', type=str, | |
| default=Config.EMBEDDING_MODEL, # Use Config | |
| choices=["azure/text-embedding-3-large", "vertex_ai/text-embedding-005"], | |
| help='Select the embedding model to use') | |
| parser.add_argument('--use_context_learning', action='store_true', | |
| help='Use context learning with example Manim code') | |
| parser.add_argument('--context_learning_path', type=str, | |
| default=Config.CONTEXT_LEARNING_PATH, # Use Config | |
| help='Path to context learning examples') | |
| parser.add_argument('--use_langfuse', action='store_true', | |
| help='Enable Langfuse logging') | |
| parser.add_argument('--max_scene_concurrency', type=int, default=1, help='Maximum number of scenes to process concurrently') | |
| parser.add_argument('--max_topic_concurrency', type=int, default=1, | |
| help='Maximum number of topics to process concurrently') | |
| parser.add_argument('--debug_combine_topic', type=str, help='Debug combine videos', default=None) | |
| parser.add_argument('--only_plan', action='store_true', help='Only generate scene outline and implementation plans') | |
| parser.add_argument('--check_status', action='store_true', | |
| help='Check planning and code status for all theorems') | |
| parser.add_argument('--only_render', action='store_true', help='Only render scenes without combining videos') | |
| parser.add_argument('--scenes', nargs='+', type=int, help='Specific scenes to process (if theorems_path is provided)') | |
| args = parser.parse_args() | |
| # Initialize planner model using LiteLLM | |
| if args.verbose: | |
| verbose = True | |
| else: | |
| verbose = False | |
| planner_model = LiteLLMWrapper( | |
| model_name=args.model, | |
| temperature=0.7, | |
| print_cost=True, | |
| verbose=verbose, | |
| use_langfuse=args.use_langfuse | |
| ) | |
| helper_model = LiteLLMWrapper( | |
| model_name=args.helper_model if args.helper_model else args.model, # Use helper_model if provided, else planner_model | |
| temperature=0.7, | |
| print_cost=True, | |
| verbose=verbose, | |
| use_langfuse=args.use_langfuse | |
| ) | |
| scene_model = LiteLLMWrapper( # Initialize scene_model separately | |
| model_name=args.model, | |
| temperature=0.7, | |
| print_cost=True, | |
| verbose=verbose, | |
| use_langfuse=args.use_langfuse | |
| ) | |
| print(f"Planner model: {args.model}, Helper model: {args.helper_model if args.helper_model else args.model}, Scene model: {args.model}") # Print all models | |
| if args.theorems_path: | |
| # Load the sample theorems | |
| with open(args.theorems_path, "r") as f: | |
| theorems = json.load(f) | |
| if args.sample_size: | |
| theorems = theorems[:args.sample_size] | |
| if args.peek_existing_videos: | |
| print(f"Here's the results of checking whether videos are rendered successfully in {args.output_dir}:") | |
| # in output_dir, find all combined.mp4 files and print number of successful rendered videos out of total number of folders | |
| successful_rendered_videos = 0 | |
| total_folders = 0 | |
| for item in os.listdir(args.output_dir): | |
| if os.path.isdir(os.path.join(args.output_dir, item)): | |
| total_folders += 1 | |
| if os.path.exists(os.path.join(args.output_dir, item, f"{item}_combined.mp4")): | |
| successful_rendered_videos += 1 | |
| print(f"Number of successful rendered videos: {successful_rendered_videos}/{total_folders}") | |
| # also check whether any succ_rendered.txt in scene{i} folder, and then add up the number of successful rendered videos | |
| successful_rendered_videos = 0 | |
| total_scenes = 0 | |
| for item in os.listdir(args.output_dir): | |
| if os.path.isdir(os.path.join(args.output_dir, item)): | |
| for scene_folder in os.listdir(os.path.join(args.output_dir, item)): | |
| if "scene" in scene_folder and os.path.isdir(os.path.join(args.output_dir, item, scene_folder)): | |
| total_scenes += 1 | |
| if os.path.exists(os.path.join(args.output_dir, item, scene_folder, "succ_rendered.txt")): | |
| successful_rendered_videos += 1 | |
| print(f"Number of successful rendered scenes: {successful_rendered_videos}/{total_scenes}") | |
| exit() | |
| video_generator = VideoGenerator( | |
| planner_model=planner_model, | |
| scene_model=scene_model, # Pass scene_model | |
| helper_model=helper_model, # Pass helper_model | |
| output_dir=args.output_dir, | |
| verbose=args.verbose, | |
| use_rag=args.use_rag, | |
| use_context_learning=args.use_context_learning, | |
| context_learning_path=args.context_learning_path, | |
| chroma_db_path=args.chroma_db_path, | |
| manim_docs_path=args.manim_docs_path, | |
| embedding_model=args.embedding_model, | |
| use_visual_fix_code=args.use_visual_fix_code, | |
| use_langfuse=args.use_langfuse, | |
| max_scene_concurrency=args.max_scene_concurrency | |
| ) | |
| if args.debug_combine_topic is not None: | |
| video_generator.combine_videos(args.debug_combine_topic) | |
| exit() | |
| if args.only_gen_vid: | |
| # Generate videos for existing plans | |
| print("Generating videos for existing plans...") | |
| async def process_theorem(theorem, topic_semaphore): | |
| async with topic_semaphore: | |
| topic = theorem['theorem'] | |
| print(f"Processing topic: {topic}") | |
| await video_generator.render_video_fix_code(topic, theorem['description'], max_retries=args.max_retries) | |
| async def main(): | |
| # Use the command-line argument for topic concurrency | |
| topic_semaphore = asyncio.Semaphore(args.max_topic_concurrency) | |
| tasks = [process_theorem(theorem, topic_semaphore) for theorem in theorems] | |
| await asyncio.gather(*tasks) | |
| asyncio.run(main()) | |
| elif args.check_status: | |
| print("\nChecking theorem status...") | |
| video_generator = VideoGenerator( | |
| planner_model=planner_model, | |
| scene_model=scene_model, | |
| helper_model=helper_model, | |
| output_dir=args.output_dir, | |
| verbose=args.verbose, | |
| use_rag=args.use_rag, | |
| use_context_learning=args.use_context_learning, | |
| context_learning_path=args.context_learning_path, | |
| chroma_db_path=args.chroma_db_path, | |
| manim_docs_path=args.manim_docs_path, | |
| embedding_model=args.embedding_model, | |
| use_visual_fix_code=args.use_visual_fix_code, | |
| use_langfuse=args.use_langfuse, | |
| max_scene_concurrency=args.max_scene_concurrency | |
| ) | |
| all_statuses = [video_generator.check_theorem_status(theorem) for theorem in theorems] | |
| # Print combined status table | |
| print("\nTheorem Status:") | |
| print("-" * 160) | |
| print(f"{'Topic':<40} {'Outline':<8} {'Total':<8} {'Status (Plan/Code/Render)':<50} {'Combined':<10} {'Missing Components':<40}") | |
| print("-" * 160) | |
| for status in all_statuses: | |
| # Create status string showing plan/code/render completion for each scene | |
| scene_status_str = "" | |
| for scene in status['scene_status']: | |
| scene_str = ( | |
| ("P" if scene['has_plan'] else "-") + | |
| ("C" if scene['has_code'] else "-") + | |
| ("R" if scene['has_render'] else "-") + " " | |
| ) | |
| scene_status_str += scene_str | |
| # Collect missing components | |
| missing_plans = [] | |
| missing_code = [] | |
| missing_renders = [] | |
| for scene in status['scene_status']: | |
| if not scene['has_plan']: | |
| missing_plans.append(str(scene['scene_number'])) | |
| if not scene['has_code']: | |
| missing_code.append(str(scene['scene_number'])) | |
| if not scene['has_render']: | |
| missing_renders.append(str(scene['scene_number'])) | |
| # Format missing components string | |
| missing_str = [] | |
| if missing_plans: | |
| missing_str.append(f"P:{','.join(missing_plans)}") | |
| if missing_code: | |
| missing_str.append(f"C:{','.join(missing_code)}") | |
| if missing_renders: | |
| missing_str.append(f"R:{','.join(missing_renders)}") | |
| missing_str = ' '.join(missing_str) | |
| print(f"{status['topic'][:37]+'...' if len(status['topic'])>37 else status['topic']:<40} " | |
| f"{'β' if status['has_scene_outline'] else 'β':<8} " | |
| f"{status['total_scenes']:<8} " | |
| f"{scene_status_str[:47]+'...' if len(scene_status_str)>47 else scene_status_str:<50} " | |
| f"{'β' if status['has_combined_video'] else 'β':<10} " | |
| f"{missing_str[:37]+'...' if len(missing_str)>37 else missing_str:<40}") | |
| # Print summary | |
| print("\nSummary:") | |
| print(f"Total theorems: {len(theorems)}") | |
| print(f"Total scenes: {sum(status['total_scenes'] for status in all_statuses)}") | |
| print(f"Scene completion status:") | |
| print(f" Plans: {sum(status['implementation_plans'] for status in all_statuses)} scenes") | |
| print(f" Code: {sum(status['code_files'] for status in all_statuses)} scenes") | |
| print(f" Renders: {sum(status['rendered_scenes'] for status in all_statuses)} scenes") | |
| print(f"Combined videos: {sum(1 for status in all_statuses if status['has_combined_video'])}/{len(theorems)}") | |
| exit() | |
| else: | |
| # Generate video pipeline from scratch | |
| print("Generating video pipeline from scratch...") | |
| async def process_theorem(theorem, topic_semaphore): | |
| async with topic_semaphore: | |
| topic = theorem['theorem'] | |
| description = theorem['description'] | |
| print(f"Processing topic: {topic}") | |
| if args.only_combine: | |
| video_generator.combine_videos(topic) | |
| else: | |
| await video_generator.generate_video_pipeline( | |
| topic, | |
| description, | |
| max_retries=args.max_retries, | |
| only_plan=args.only_plan, | |
| specific_scenes=args.scenes | |
| ) | |
| if not args.only_plan and not args.only_render: # Add condition for only_render | |
| video_generator.combine_videos(topic) | |
| async def main(): | |
| # Use the command-line argument for topic concurrency | |
| topic_semaphore = asyncio.Semaphore(args.max_topic_concurrency) | |
| tasks = [process_theorem(theorem, topic_semaphore) for theorem in theorems] | |
| await asyncio.gather(*tasks) | |
| asyncio.run(main()) | |
| elif args.topic and args.context: | |
| video_generator = VideoGenerator( | |
| planner_model=planner_model, | |
| scene_model=scene_model, # Pass scene_model | |
| helper_model=helper_model, # Pass helper_model | |
| output_dir=args.output_dir, | |
| verbose=args.verbose, | |
| use_rag=args.use_rag, | |
| use_context_learning=args.use_context_learning, | |
| context_learning_path=args.context_learning_path, | |
| chroma_db_path=args.chroma_db_path, | |
| manim_docs_path=args.manim_docs_path, | |
| embedding_model=args.embedding_model, | |
| use_visual_fix_code=args.use_visual_fix_code, | |
| use_langfuse=args.use_langfuse, | |
| max_scene_concurrency=args.max_scene_concurrency | |
| ) | |
| # Process single topic with context | |
| print(f"Processing topic: {args.topic}") | |
| if args.only_gen_vid: | |
| video_generator.render_video_fix_code(args.topic, args.context, max_retries=args.max_retries) | |
| exit() | |
| if args.only_combine: | |
| video_generator.combine_videos(args.topic) | |
| else: | |
| asyncio.run(video_generator.generate_video_pipeline( | |
| args.topic, | |
| args.context, | |
| max_retries=args.max_retries, | |
| only_plan=args.only_plan, | |
| )) | |
| if not args.only_plan and not args.only_render: | |
| video_generator.combine_videos(args.topic) | |
| else: | |
| print("Please provide either (--theorems_path) or (--topic and --context)") | |
| exit() | |