# app.py import gradio as gr import logging from modules.input_handler import InputHandler from modules.retriever import Retriever from modules.analyzer import Analyzer from modules.citation import CitationManager from modules.formatter import OutputFormatter import os # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class ResearchOrchestrator: def __init__(self, input_handler, retriever, analyzer, citation_manager, formatter): self.input_handler = input_handler self.retriever = retriever self.analyzer = analyzer self.citation_manager = citation_manager self.formatter = formatter def run(self, query, progress=gr.Progress()): """Execute the research pipeline with streaming updates""" try: progress(0.0, desc="Starting research...") logging.info(f"Starting research for query: {query}") # Step 1: Process input progress(0.1, desc="🔍 Processing your query...") processed_query = self.input_handler.process_query(query) logging.info("Query processed successfully") # Step 2: Retrieve data progress(0.3, desc="🌐 Searching for relevant information...") search_results = self.retriever.search(processed_query) if not search_results: result = "⚠️ No relevant information found for your query. Please try rephrasing." logging.warning("No search results found") progress(1.0, desc="⚠️ No results found") yield result return logging.info(f"Retrieved {len(search_results)} results") # Step 3: Analyze content with streaming progress(0.5, desc="🧠 Analyzing search results (streaming)...") yield "## 🧠 AI Analysis (Live Streaming)\n\n" # Collect all streamed content full_analysis = "" for chunk in self.analyzer.analyze_stream(query, search_results): full_analysis += chunk yield chunk # Check if analysis was successful if full_analysis.startswith("⚠️") or full_analysis.startswith("Analysis failed"): logging.warning(f"Analysis failed: {full_analysis}") progress(0.8, desc="⚠️ Analysis failed") return logging.info("Analysis streaming completed successfully") # Step 4: Manage citations progress(0.8, desc="📎 Adding citations...") cited_analysis = self.citation_manager.add_citations(full_analysis, search_results) logging.info("Citations added") # Step 5: Format output progress(0.9, desc="✨ Formatting response...") formatted_output = self.formatter.format_response(cited_analysis, search_results) logging.info("Response formatted successfully") # Add completion notification progress(1.0, desc="✅ Research complete!") if len(search_results) >= 3: completion_message = "\n\n---\n[ANALYSIS COMPLETE] ✅ Research finished with sufficient sources." else: completion_message = "\n\n---\n[RECOMMEND FURTHER ANALYSIS] ⚠️ Limited sources found. Consider refining your query." yield formatted_output + completion_message except Exception as e: error_msg = f"❌ An error occurred: {str(e)}" logging.error(f"Error in research pipeline: {str(e)}", exc_info=True) progress(1.0, desc="❌ Error occurred") yield error_msg # Configuration CONFIG = { "hf_api_base": "https://zxzbfrlg3ssrk7d9.us-east-1.aws.endpoints.huggingface.cloud/v1/", "hf_api_key": os.getenv("HF_TOKEN"), "tavily_api_key": os.getenv("TAVILY_API_KEY"), } # Initialize modules with error handling def initialize_modules(): """Initialize all modules with proper error handling""" try: if not CONFIG["tavily_api_key"]: raise ValueError("TAVILY_API_KEY environment variable is not set") if not CONFIG["hf_api_key"]: raise ValueError("HF_TOKEN environment variable is not set") input_handler = InputHandler() retriever = Retriever(api_key=CONFIG["tavily_api_key"]) analyzer = Analyzer(base_url=CONFIG["hf_api_base"], api_key=CONFIG["hf_api_key"]) citation_manager = CitationManager() formatter = OutputFormatter() return ResearchOrchestrator( input_handler, retriever, analyzer, citation_manager, formatter ) except Exception as e: logging.error(f"Failed to initialize modules: {str(e)}") raise # Initialize orchestrator orchestrator = initialize_modules() # Custom CSS for spinner and streaming custom_css = """ .spinner { border: 4px solid #f3f3f3; border-top: 4px solid #3498db; border-radius: 50%; width: 24px; height: 24px; animation: spin 1s linear infinite; display: inline-block; margin-right: 8px; } .streaming-content { white-space: pre-wrap; font-family: monospace; background-color: #f8f9fa; padding: 10px; border-radius: 5px; margin: 10px 0; } @keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } } """ def research_assistant(query, progress=gr.Progress()): """Main entry point for the research assistant with streaming""" logging.info(f"Research assistant called with query: {query}") for step in orchestrator.run(query, progress): yield step # Create Gradio interface with gr.Blocks(css=custom_css, title="Research Assistant") as demo: gr.Markdown("# 🧠 AI Research Assistant") gr.Markdown("Enter a research topic to get a structured analysis with sources") with gr.Row(): with gr.Column(): query_input = gr.Textbox( label="Research Query", placeholder="Enter your research question...", lines=3 ) submit_btn = gr.Button("Research", variant="primary") with gr.Column(): output = gr.Markdown(label="Analysis Results", elem_classes=["streaming-content"]) # Status indicator with spinner status_indicator = gr.HTML("