myspace134v / app.py
rdune71's picture
34534
03df531
raw
history blame
7.26 kB
# app.py
import gradio as gr
import logging
from modules.input_handler import InputHandler
from modules.retriever import Retriever
from modules.analyzer import Analyzer
from modules.citation import CitationManager
from modules.formatter import OutputFormatter
import os
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class ResearchOrchestrator:
def __init__(self, input_handler, retriever, analyzer, citation_manager, formatter):
self.input_handler = input_handler
self.retriever = retriever
self.analyzer = analyzer
self.citation_manager = citation_manager
self.formatter = formatter
def run(self, query, progress=gr.Progress()):
"""Execute the research pipeline with streaming updates"""
try:
progress(0.0, desc="Starting research...")
logging.info(f"Starting research for query: {query}")
# Step 1: Process input
progress(0.1, desc="🔍 Processing your query...")
processed_query = self.input_handler.process_query(query)
logging.info("Query processed successfully")
# Step 2: Retrieve data
progress(0.3, desc="🌐 Searching for relevant information...")
search_results = self.retriever.search(processed_query)
if not search_results:
result = "⚠️ No relevant information found for your query. Please try rephrasing."
logging.warning("No search results found")
progress(1.0, desc="⚠️ No results found")
yield result
return
logging.info(f"Retrieved {len(search_results)} results")
# Step 3: Analyze content with streaming
progress(0.5, desc="🧠 Analyzing search results (streaming)...")
yield "## 🧠 AI Analysis (Live Streaming)\n\n"
# Collect all streamed content
full_analysis = ""
for chunk in self.analyzer.analyze_stream(query, search_results):
full_analysis += chunk
yield chunk
# Check if analysis was successful
if full_analysis.startswith("⚠️") or full_analysis.startswith("Analysis failed"):
logging.warning(f"Analysis failed: {full_analysis}")
progress(0.8, desc="⚠️ Analysis failed")
return
logging.info("Analysis streaming completed successfully")
# Step 4: Manage citations
progress(0.8, desc="📎 Adding citations...")
cited_analysis = self.citation_manager.add_citations(full_analysis, search_results)
logging.info("Citations added")
# Step 5: Format output
progress(0.9, desc="✨ Formatting response...")
formatted_output = self.formatter.format_response(cited_analysis, search_results)
logging.info("Response formatted successfully")
# Add completion notification
progress(1.0, desc="✅ Research complete!")
if len(search_results) >= 3:
completion_message = "\n\n---\n[ANALYSIS COMPLETE] ✅ Research finished with sufficient sources."
else:
completion_message = "\n\n---\n[RECOMMEND FURTHER ANALYSIS] ⚠️ Limited sources found. Consider refining your query."
yield formatted_output + completion_message
except Exception as e:
error_msg = f"❌ An error occurred: {str(e)}"
logging.error(f"Error in research pipeline: {str(e)}", exc_info=True)
progress(1.0, desc="❌ Error occurred")
yield error_msg
# Configuration
CONFIG = {
"hf_api_base": "https://zxzbfrlg3ssrk7d9.us-east-1.aws.endpoints.huggingface.cloud/v1/",
"hf_api_key": os.getenv("HF_TOKEN"),
"tavily_api_key": os.getenv("TAVILY_API_KEY"),
}
# Initialize modules with error handling
def initialize_modules():
"""Initialize all modules with proper error handling"""
try:
if not CONFIG["tavily_api_key"]:
raise ValueError("TAVILY_API_KEY environment variable is not set")
if not CONFIG["hf_api_key"]:
raise ValueError("HF_TOKEN environment variable is not set")
input_handler = InputHandler()
retriever = Retriever(api_key=CONFIG["tavily_api_key"])
analyzer = Analyzer(base_url=CONFIG["hf_api_base"], api_key=CONFIG["hf_api_key"])
citation_manager = CitationManager()
formatter = OutputFormatter()
return ResearchOrchestrator(
input_handler,
retriever,
analyzer,
citation_manager,
formatter
)
except Exception as e:
logging.error(f"Failed to initialize modules: {str(e)}")
raise
# Initialize orchestrator
orchestrator = initialize_modules()
# Custom CSS for spinner and streaming
custom_css = """
.spinner {
border: 4px solid #f3f3f3;
border-top: 4px solid #3498db;
border-radius: 50%;
width: 24px;
height: 24px;
animation: spin 1s linear infinite;
display: inline-block;
margin-right: 8px;
}
.streaming-content {
white-space: pre-wrap;
font-family: monospace;
background-color: #f8f9fa;
padding: 10px;
border-radius: 5px;
margin: 10px 0;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
"""
def research_assistant(query, progress=gr.Progress()):
"""Main entry point for the research assistant with streaming"""
logging.info(f"Research assistant called with query: {query}")
for step in orchestrator.run(query, progress):
yield step
# Create Gradio interface
with gr.Blocks(css=custom_css, title="Research Assistant") as demo:
gr.Markdown("# 🧠 AI Research Assistant")
gr.Markdown("Enter a research topic to get a structured analysis with sources")
with gr.Row():
with gr.Column():
query_input = gr.Textbox(
label="Research Query",
placeholder="Enter your research question...",
lines=3
)
submit_btn = gr.Button("Research", variant="primary")
with gr.Column():
output = gr.Markdown(label="Analysis Results", elem_classes=["streaming-content"])
# Status indicator with spinner
status_indicator = gr.HTML("<div id='status'><span class='spinner'></span> Ready for your research query</div>")
examples = gr.Examples(
examples=[
"Latest advancements in quantum computing",
"Impact of climate change on global agriculture",
"Recent developments in Alzheimer's treatment research"
],
inputs=query_input
)
submit_btn.click(
fn=research_assistant,
inputs=query_input,
outputs=output
)
query_input.submit(
fn=research_assistant,
inputs=query_input,
outputs=output
)
if __name__ == "__main__":
demo.launch()