| import gradio as gr | |
| import os | |
| from src.processing.gemini_processor import GeminiProcessor | |
| from src.analysis.coverage_generator import CoverageGenerator | |
| from src.analysis.creative_analyzer import CreativeAnalyzer | |
| from src.analysis.analysis_post_processor import AnalysisPostProcessor | |
| from src.analysis.market_analyzer import MarketAnalyzer | |
| from pathlib import Path | |
| import logging | |
| class ConsoleOutput: | |
| def __init__(self): | |
| self.messages = [] | |
| def write(self, message): | |
| self.messages.append(str(message)) | |
| return "\n".join(self.messages) | |
| def get_output(self): | |
| return "\n".join(self.messages) | |
| class GradioHandler(logging.Handler): | |
| def emit(self, record): | |
| msg = self.format(record) | |
| self.console.write(msg) | |
| def setup_logging(console_output): | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger() | |
| logger.handlers = [] | |
| handler = GradioHandler() | |
| handler.console = console_output | |
| logger.addHandler(handler) | |
| return logger | |
| def process_screenplay(pdf_file, progress=gr.Progress()): | |
| if pdf_file is None: | |
| raise gr.Error("Please upload a PDF file") | |
| console = ConsoleOutput() | |
| logger = setup_logging(console) | |
| try: | |
| processor = GeminiProcessor() | |
| progress(0.5, desc="Processing screenplay...") | |
| cleaned_path = Path("cleaned_screenplay_long.txt") | |
| success = processor.process_screenplay(pdf_file.name, str(cleaned_path)) | |
| if not success: | |
| raise gr.Error("Failed to process screenplay") | |
| with open(cleaned_path, 'r') as f: | |
| cleaned_text = f.read() | |
| progress(1.0, desc="Complete!") | |
| return [cleaned_text, | |
| gr.update(value=str(cleaned_path), visible=True), | |
| gr.update(interactive=True, variant="primary"), | |
| gr.update(interactive=True, variant="primary"), | |
| gr.update(interactive=False, variant="secondary"), | |
| gr.update(interactive=False, variant="secondary"), | |
| console.get_output()] | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| console.write(error_msg) | |
| raise gr.Error(error_msg) | |
| def generate_coverage(progress=gr.Progress()): | |
| console = ConsoleOutput() | |
| logger = setup_logging(console) | |
| try: | |
| coverage_gen = CoverageGenerator() | |
| progress(0.5, desc="Generating coverage...") | |
| cleaned_path = Path("cleaned_screenplay_long.txt") | |
| success = coverage_gen.generate_coverage(cleaned_path) | |
| if not success: | |
| raise gr.Error("Failed to generate coverage") | |
| coverage_path = Path("coverage.txt") | |
| with open(coverage_path, 'r') as f: | |
| coverage = f.read() | |
| progress(1.0, desc="Complete!") | |
| return [coverage, | |
| gr.update(value=str(coverage_path), visible=True), | |
| console.get_output()] | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| console.write(error_msg) | |
| raise gr.Error(error_msg) | |
| def analyze_screenplay(progress=gr.Progress()): | |
| console = ConsoleOutput() | |
| logger = setup_logging(console) | |
| try: | |
| analyzer = CreativeAnalyzer() | |
| progress(0.5, desc="Performing creative analysis...") | |
| cleaned_path = Path("cleaned_screenplay_long.txt") | |
| success = analyzer.analyze_screenplay(cleaned_path) | |
| if not success: | |
| raise gr.Error("Failed to generate creative analysis") | |
| analysis_path = Path("creative_analysis.txt") | |
| with open(analysis_path, 'r') as f: | |
| analysis = f.read() | |
| progress(1.0, desc="Complete!") | |
| return [analysis, | |
| gr.update(value=str(analysis_path), visible=True), | |
| gr.update(interactive=True, variant="primary"), | |
| console.get_output()] | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| console.write(error_msg) | |
| raise gr.Error(error_msg) | |
| def post_process_analysis(progress=gr.Progress()): | |
| console = ConsoleOutput() | |
| logger = setup_logging(console) | |
| try: | |
| post_processor = AnalysisPostProcessor() | |
| progress(0.5, desc="Post-processing analysis...") | |
| input_path = Path("creative_analysis.txt") | |
| output_path = Path("cleaned_creative_analysis.txt") | |
| success = post_processor.process_analysis(str(input_path), str(output_path)) | |
| if not success: | |
| raise gr.Error("Failed to post-process analysis") | |
| with open(output_path, 'r') as f: | |
| analysis = f.read() | |
| progress(1.0, desc="Complete!") | |
| return [analysis, | |
| gr.update(value=str(output_path), visible=True), | |
| gr.update(interactive=True, variant="primary"), | |
| console.get_output()] | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| console.write(error_msg) | |
| raise gr.Error(error_msg) | |
| def generate_market_analysis(progress=gr.Progress()): | |
| console = ConsoleOutput() | |
| logger = setup_logging(console) | |
| try: | |
| market_analyzer = MarketAnalyzer() | |
| progress(0.5, desc="Generating market analysis...") | |
| coverage_path = Path("coverage.txt") | |
| creative_analysis_path = Path("cleaned_creative_analysis.txt") | |
| success = market_analyzer.generate_market_analysis(coverage_path, creative_analysis_path) | |
| if not success: | |
| raise gr.Error("Failed to generate market analysis") | |
| market_path = Path("market_analysis.txt") | |
| with open(market_path, 'r') as f: | |
| analysis = f.read() | |
| progress(1.0, desc="Complete!") | |
| return [analysis, | |
| gr.update(value=str(market_path), visible=True), | |
| console.get_output()] | |
| except Exception as e: | |
| error_msg = f"Error: {str(e)}" | |
| console.write(error_msg) | |
| raise gr.Error(error_msg) | |
| with gr.Blocks(title="Screenplay Coverage Generator") as demo: | |
| gr.Markdown("# Screenplay Coverage Generator") | |
| with gr.Row(): | |
| file_input = gr.File(label="Upload Screenplay PDF", file_types=[".pdf"]) | |
| with gr.Row(): | |
| process_btn = gr.Button("Process Screenplay", variant="primary") | |
| coverage_btn = gr.Button("Generate Coverage", interactive=False) | |
| analysis_btn = gr.Button("Creative Analysis", interactive=False) | |
| post_process_btn = gr.Button("Post-Process Analysis", interactive=False) | |
| market_btn = gr.Button("Market Analysis", interactive=False) | |
| with gr.Row(): | |
| console = gr.Textbox(label="Console Output", lines=10, max_lines=30, autoscroll=True, show_copy_button=True) | |
| with gr.Tabs(): | |
| with gr.TabItem("Cleaned Screenplay"): | |
| with gr.Row(): | |
| cleaned_output = gr.Textbox(label="Cleaned Screenplay", lines=10, show_copy_button=True) | |
| with gr.Row(): | |
| cleaned_file = gr.File(label="Download Cleaned Screenplay", interactive=True, visible=False) | |
| with gr.TabItem("Coverage"): | |
| with gr.Row(): | |
| coverage_output = gr.Textbox(label="Coverage Document", lines=10, show_copy_button=True) | |
| with gr.Row(): | |
| coverage_file = gr.File(label="Download Coverage", interactive=True, visible=False) | |
| with gr.TabItem("Creative Analysis"): | |
| with gr.Row(): | |
| analysis_output = gr.Textbox(label="Creative Analysis", lines=10, show_copy_button=True) | |
| with gr.Row(): | |
| analysis_file = gr.File(label="Download Creative Analysis", interactive=True, visible=False) | |
| with gr.TabItem("Post-Processed Analysis"): | |
| with gr.Row(): | |
| post_process_output = gr.Textbox(label="Post-Processed Analysis", lines=10, show_copy_button=True) | |
| with gr.Row(): | |
| post_process_file = gr.File(label="Download Post-Processed Analysis", interactive=True, visible=False) | |
| with gr.TabItem("Market Analysis"): | |
| with gr.Row(): | |
| market_output = gr.Textbox(label="Market Analysis", lines=10, show_copy_button=True) | |
| with gr.Row(): | |
| market_file = gr.File(label="Download Market Analysis", interactive=True, visible=False) | |
| process_btn.click( | |
| fn=process_screenplay, | |
| inputs=[file_input], | |
| outputs=[cleaned_output, cleaned_file, coverage_btn, analysis_btn, post_process_btn, market_btn, console] | |
| ) | |
| coverage_btn.click( | |
| fn=generate_coverage, | |
| outputs=[coverage_output, coverage_file, console] | |
| ) | |
| analysis_btn.click( | |
| fn=analyze_screenplay, | |
| outputs=[analysis_output, analysis_file, post_process_btn, console] | |
| ) | |
| post_process_btn.click( | |
| fn=post_process_analysis, | |
| outputs=[post_process_output, post_process_file, market_btn, console] | |
| ) | |
| market_btn.click( | |
| fn=generate_market_analysis, | |
| outputs=[market_output, market_file, console] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |