| import os | |
| import google.generativeai as genai | |
| from pathlib import Path | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class CoverageGenerator: | |
| def __init__(self): | |
| api_key = os.getenv("GOOGLE_API_KEY") | |
| if not api_key: | |
| raise ValueError("GOOGLE_API_KEY not found") | |
| genai.configure(api_key=api_key) | |
| self.model = genai.GenerativeModel('gemini-pro') | |
| self.chunk_size = 8000 | |
| def count_tokens(self, text: str) -> int: | |
| """Estimate token count using simple word-based estimation""" | |
| words = text.split() | |
| return int(len(words) * 1.3) | |
| def chunk_screenplay(self, text: str) -> list: | |
| """Split screenplay into chunks with overlap for context""" | |
| logger.info("Chunking screenplay...") | |
| scenes = text.split("\n\n") | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| overlap_scenes = 2 | |
| for i, scene in enumerate(scenes): | |
| scene_size = self.count_tokens(scene) | |
| if current_size + scene_size > self.chunk_size and current_chunk: | |
| overlap = current_chunk[-overlap_scenes:] if len(current_chunk) > overlap_scenes else current_chunk | |
| chunks.append("\n\n".join(current_chunk)) | |
| current_chunk = overlap + [scene] | |
| current_size = sum(self.count_tokens(s) for s in current_chunk) | |
| else: | |
| current_chunk.append(scene) | |
| current_size += scene_size | |
| if current_chunk: | |
| chunks.append("\n\n".join(current_chunk)) | |
| logger.info(f"Split screenplay into {len(chunks)} chunks with context overlap") | |
| return chunks | |
| def generate_synopsis(self, chunk: str, chunk_num: int = 1, total_chunks: int = 1) -> str: | |
| """Generate synopsis for a single chunk""" | |
| logger.debug(f"Generating synopsis for chunk {chunk_num}/{total_chunks}") | |
| prompt = f"""As an experienced script analyst, analyze this section ({chunk_num}/{total_chunks}) of the screenplay. | |
| Focus on: plot developments, character development, narrative connections, themes | |
| Screenplay section: | |
| {chunk}""" | |
| try: | |
| response = self.model.generate_content(prompt) | |
| logger.debug(f"Generated synopsis for chunk {chunk_num}") | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"Error processing chunk {chunk_num}: {str(e)}") | |
| return None | |
| def generate_final_synopsis(self, chunk_synopses: list) -> str: | |
| """Combine chunk synopses into final coverage""" | |
| logger.info("Generating final synopsis") | |
| combined_text = "\n\n".join([f"Section {i+1}:\n{synopsis}" | |
| for i, synopsis in enumerate(chunk_synopses)]) | |
| prompt = f"""Synthesize these section summaries into a comprehensive coverage document with: | |
| 1. Complete narrative arc | |
| 2. Character development | |
| 3. Major themes | |
| 4. Key turning points | |
| 5. Core conflict and resolution | |
| Section summaries: | |
| {combined_text}""" | |
| try: | |
| response = self.model.generate_content(prompt) | |
| logger.info("Final synopsis generated") | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"Error generating final synopsis: {str(e)}") | |
| return None | |
| def generate_coverage(self, screenplay_path: Path) -> bool: | |
| """Main method to generate coverage document""" | |
| logger.info("Starting coverage generation") | |
| try: | |
| with open(screenplay_path, 'r', encoding='utf-8') as f: | |
| screenplay_text = f.read() | |
| chunks = self.chunk_screenplay(screenplay_text) | |
| chunk_synopses = [] | |
| for i, chunk in enumerate(chunks, 1): | |
| logger.info(f"Processing chunk {i}/{len(chunks)}") | |
| synopsis = self.generate_synopsis(chunk, i, len(chunks)) | |
| if synopsis: | |
| chunk_synopses.append(synopsis) | |
| else: | |
| logger.error(f"Failed to process chunk {i}") | |
| return False | |
| final_synopsis = self.generate_final_synopsis(chunk_synopses) | |
| if not final_synopsis: | |
| return False | |
| output_path = screenplay_path.parent / "coverage.txt" | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write("SCREENPLAY COVERAGE\n\n") | |
| f.write(final_synopsis) | |
| logger.info("Coverage generation complete") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error in coverage generation: {str(e)}") | |
| return False |