Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import asyncio | |
| import threading | |
| import queue | |
| import os | |
| import time | |
| import json | |
| from datetime import datetime | |
| from modules.input_handler import validate_input | |
| from modules.retriever import perform_search | |
| from modules.context_enhancer import add_weather_context, add_space_weather_context | |
| from modules.analyzer import analyze_with_model | |
| from modules.formatter import format_output | |
| from modules.citation import generate_citations, format_citations | |
| from modules.server_cache import get_cached_result, cache_result | |
| from modules.status_logger import log_request | |
| from modules.server_monitor import ServerMonitor | |
| from modules.rag.rag_chain import RAGChain | |
| from modules.rag.vector_store import VectorStore | |
| from langchain.docstore.document import Document | |
| server_monitor = ServerMonitor() | |
| rag_chain = RAGChain() | |
| vector_store = VectorStore() | |
| # Cat-themed greeting function | |
| def get_cat_greeting(): | |
| """Generate a cat-themed greeting to test if the AI is operational""" | |
| return ( | |
| "Hello there! I'm a sophisticated AI research assistant, but right now I'm just a random cat preparing to make biscuits " | |
| "(that's cat slang for getting ready to do something awesome!). Today is " + datetime.now().strftime("%A, %B %d, %Y") + ". " | |
| "I'm purring with excitement to help you with your research questions! " | |
| "Meow... what delicious knowledge shall we hunt down today? " | |
| "Please ask me anything, and I'll pounce on the best information for you!" | |
| ) | |
| # Startup check function | |
| async def perform_startup_check(): | |
| """Perform startup checks to verify server status""" | |
| try: | |
| # Check 1: Verify server is not returning 503 | |
| test_prompt = "Hello, this is a startup check. Please respond with 'OK' if you're operational." | |
| # Use a short timeout for the startup check | |
| stream = analyze_with_model(test_prompt) | |
| response_parts = [] | |
| # Collect first few chunks to verify operation | |
| chunks_received = 0 | |
| for chunk in stream: | |
| response_parts.append(chunk) | |
| chunks_received += 1 | |
| if chunks_received >= 3: # Just need a few chunks to confirm operation | |
| break | |
| full_response = "".join(response_parts) | |
| # If we got a response, server is likely operational | |
| if full_response: | |
| return { | |
| "status": "operational", | |
| "message": "β Server is operational and ready to assist!", | |
| "details": f"Received response: {full_response[:50]}..." | |
| } | |
| else: | |
| return { | |
| "status": "warning", | |
| "message": "β οΈ Server responded but with empty content. May need attention.", | |
| "details": "Server connection established but no content returned." | |
| } | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "503" in error_msg: | |
| return { | |
| "status": "initializing", | |
| "message": "β³ Server is currently initializing (503 error detected)", | |
| "details": "The AI model server is warming up. Please wait approximately 5 minutes before asking questions." | |
| } | |
| elif "timeout" in error_msg.lower(): | |
| return { | |
| "status": "timeout", | |
| "message": "β° Server connection timed out", | |
| "details": "Connection to the AI model timed out. This may indicate server initialization." | |
| } | |
| else: | |
| return { | |
| "status": "error", | |
| "message": "β Server check failed", | |
| "details": f"Error during startup check: {error_msg}" | |
| } | |
| # Thread-safe wrapper for startup check | |
| class StartupCheckWrapper: | |
| def __init__(self, coroutine): | |
| self.coroutine = coroutine | |
| self.result = None | |
| self.exception = None | |
| self.completed = False | |
| self.thread = threading.Thread(target=self._run) | |
| self.thread.daemon = True | |
| self.thread.start() | |
| def _run(self): | |
| try: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| self.result = loop.run_until_complete(self.coroutine) | |
| except Exception as e: | |
| self.exception = e | |
| except Exception as e: | |
| self.exception = e | |
| finally: | |
| self.completed = True | |
| def get_result(self): | |
| if not self.completed: | |
| return {"status": "checking", "message": "π Performing startup checks...", "details": "Please wait while we verify system status."} | |
| if self.exception: | |
| return {"status": "error", "message": "β Startup check failed", "details": str(self.exception)} | |
| return self.result | |
| def run_startup_check(): | |
| """Run the startup check asynchronously""" | |
| coroutine = perform_startup_check() | |
| wrapper = StartupCheckWrapper(coroutine) | |
| return wrapper | |
| # Enhanced streaming with markdown support | |
| async def research_assistant(query, history, use_rag=False): | |
| log_request("Research started", query=query, use_rag=use_rag) | |
| # Add typing indicator | |
| history.append((query, "π Searching for information...")) | |
| yield history | |
| cached = get_cached_result(query) | |
| if cached: | |
| log_request("Cache hit", query=query) | |
| history[-1] = (query, cached) | |
| yield history | |
| return | |
| try: | |
| validated_query = validate_input(query) | |
| except ValueError as e: | |
| error_msg = f"β οΈ Input Error: {str(e)}" | |
| history[-1] = (query, error_msg) | |
| yield history | |
| return | |
| # Run context enhancement and search in parallel | |
| history[-1] = (query, "π Gathering context...") | |
| yield history | |
| # Get weather and space weather context (but don't include in prompt yet) | |
| weather_task = asyncio.create_task(add_weather_context()) | |
| space_weather_task = asyncio.create_task(add_space_weather_context()) | |
| search_task = asyncio.create_task(asyncio.to_thread(perform_search, validated_query)) | |
| weather_data = await weather_task | |
| space_weather_data = await space_weather_task | |
| search_results = await search_task | |
| # Handle search errors | |
| if isinstance(search_results, list) and len(search_results) > 0 and "error" in search_results[0]: | |
| error_msg = f"π Search Error: {search_results[0]['error']}" | |
| history[-1] = (query, error_msg) | |
| yield history | |
| return | |
| # Format search content for LLM | |
| search_content = "" | |
| answer_content = "" | |
| for result in search_results: | |
| if result.get("type") == "answer": | |
| answer_content = f"Direct Answer: {result['content']}\n\n" | |
| elif result.get("type") == "source": | |
| search_content += f"Source: {result['content']}\n\n" | |
| # Only include context if it seems relevant to the query | |
| context_section = "" | |
| lower_query = validated_query.lower() | |
| # Check if weather might be relevant | |
| weather_keywords = ["weather", "temperature", "climate", "rain", "snow", "sun", "storm", "wind", "humidity"] | |
| if any(keyword in lower_query for keyword in weather_keywords): | |
| context_section += f"\nCurrent Weather Context: {weather_data}" | |
| # Check if space weather might be relevant | |
| space_keywords = ["space", "solar", "sun", "satellite", "astronomy", "cosmic", "radiation", "flare"] | |
| if any(keyword in lower_query for keyword in space_keywords): | |
| context_section += f"\nSpace Weather Context: {space_weather_data}" | |
| # Build the enriched input | |
| enriched_input = f"{validated_query}\n\n{answer_content}Search Results:\n{search_content}{context_section}" | |
| # If RAG is enabled, use it | |
| if use_rag: | |
| history[-1] = (query, "π Searching document database...") | |
| yield history | |
| rag_result = rag_chain.query(validated_query) | |
| if rag_result["status"] == "success": | |
| enriched_input = rag_result["prompt"] | |
| context_section += f"\n\nDocument Context:\n" + "\n\n".join([doc.page_content for doc in rag_result["context_docs"][:2]]) | |
| server_status = server_monitor.check_server_status() | |
| if not server_status["available"]: | |
| wait_time = server_status["estimated_wait"] | |
| response = ( | |
| f"β³ **Server Initializing** β³\n\n" | |
| f"The AI model server is currently starting up. This happens automatically after periods of inactivity.\n\n" | |
| f"**Estimated wait time: {wait_time} minutes**\n\n" | |
| f"**What you can do:**\n" | |
| f"- Wait for {wait_time} minutes and try again\n" | |
| f"- Try a simpler query which might process faster\n" | |
| f"- Check back shortly - the server will be ready soon!\n\n" | |
| f"*Technical Details: {server_status['message']}*" | |
| ) | |
| history[-1] = (query, response) | |
| yield history | |
| return | |
| try: | |
| history[-1] = (query, "π§ Analyzing information...") | |
| yield history | |
| stream = analyze_with_model(enriched_input) | |
| full_response = "" | |
| # Buffer for smoother streaming | |
| buffer = "" | |
| buffer_threshold = 20 # Characters before yielding | |
| for chunk in stream: | |
| buffer += chunk | |
| # Yield when buffer is large enough or we have a complete line | |
| if len(buffer) > buffer_threshold or '\n' in buffer: | |
| full_response += buffer | |
| history[-1] = (query, full_response) | |
| yield history | |
| buffer = "" | |
| # Small delay for smoother streaming | |
| await asyncio.sleep(0.01) | |
| # Flush remaining buffer | |
| if buffer: | |
| full_response += buffer | |
| history[-1] = (query, full_response) | |
| yield history | |
| citations = generate_citations(search_results) | |
| citation_text = format_citations(citations) | |
| full_output = full_response + citation_text | |
| cache_result(query, full_output) | |
| server_monitor.report_success() | |
| log_request("Research completed", result_length=len(full_output)) | |
| history[-1] = (query, full_output) | |
| yield history | |
| except Exception as e: | |
| server_monitor.report_failure() | |
| error_response = f"π€ **Unexpected Error** π€\n\nAn unexpected error occurred:\n\n{str(e)}" | |
| history[-1] = (query, error_response) | |
| yield history | |
| # Thread-safe wrapper for async generator | |
| class AsyncGeneratorWrapper: | |
| def __init__(self, async_gen): | |
| self.async_gen = async_gen | |
| self.queue = queue.Queue() | |
| self.thread = threading.Thread(target=self._run) | |
| self.thread.daemon = True | |
| self.thread.start() | |
| def _run(self): | |
| try: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| async def consume(): | |
| try: | |
| async for item in self.async_gen: | |
| self.queue.put(("item", item)) | |
| except Exception as e: | |
| self.queue.put(("error", e)) | |
| finally: | |
| self.queue.put(("done", None)) | |
| loop.run_until_complete(consume()) | |
| except Exception as e: | |
| self.queue.put(("error", e)) | |
| finally: | |
| if not self.queue.empty(): | |
| _, item = self.queue.queue[-1] | |
| if item != ("done", None): | |
| self.queue.put(("done", None)) | |
| def __iter__(self): | |
| return self | |
| def __next__(self): | |
| item_type, item = self.queue.get() | |
| if item_type == "item": | |
| return item | |
| elif item_type == "error": | |
| raise item | |
| elif item_type == "done": | |
| raise StopIteration | |
| return item | |
| def research_assistant_wrapper(query, history, use_rag): | |
| async_gen = research_assistant(query, history, use_rag) | |
| wrapper = AsyncGeneratorWrapper(async_gen) | |
| return wrapper | |
| # Document upload function | |
| def upload_documents(files): | |
| """Upload and process documents for RAG""" | |
| try: | |
| documents = [] | |
| for file in files: | |
| # For PDF files | |
| if file.name.endswith('.pdf'): | |
| from PyPDF2 import PdfReader | |
| reader = PdfReader(file.name) | |
| text = "" | |
| for page in reader.pages: | |
| text += page.extract_text() | |
| documents.append(Document(page_content=text, metadata={"source": file.name})) | |
| # For text files | |
| else: | |
| with open(file.name, 'r') as f: | |
| text = f.read() | |
| documents.append(Document(page_content=text, metadata={"source": file.name})) | |
| result = vector_store.add_documents(documents) | |
| if result["status"] == "success": | |
| return f"β Successfully added {result['count']} document chunks to the knowledge base!" | |
| else: | |
| return f"β Error adding documents: {result['message']}" | |
| except Exception as e: | |
| return f"β Error processing documents: {str(e)}" | |
| # Performance dashboard data | |
| def get_performance_stats(): | |
| """Get performance statistics from Redis""" | |
| try: | |
| stats = server_monitor.get_system_stats() | |
| if "error" in stats: | |
| return {"status": "error", "message": stats["error"]} | |
| # Add more detailed stats | |
| stats["current_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| stats["uptime"] = "Calculating..." | |
| return stats | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # Global variable to store startup check result | |
| startup_check_result = None | |
| # Gradio Interface with all enhancements | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(primary_hue="amber", secondary_hue="orange"), | |
| title="AI Research Assistant" | |
| ) as demo: | |
| # State management | |
| chat_history = gr.State([]) | |
| gr.Markdown("# π§ AI Research Assistant") | |
| gr.Markdown("This advanced AI assistant combines web search with contextual awareness to answer complex questions. " | |
| "It provides weather and space weather context only when relevant to your query.") | |
| with gr.Tabs(): | |
| with gr.TabItem("π¬ Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## System Status") | |
| status_display = gr.Markdown("π Checking system status...") | |
| check_btn = gr.Button("π Refresh Status") | |
| gr.Markdown("## How to Use") | |
| gr.Markdown(""" | |
| 1. Enter a research question in the input box | |
| 2. Toggle 'Use Document Knowledge' to enable RAG | |
| 3. Click Submit or press Enter | |
| 4. Watch as the response streams in real-time | |
| 5. Review sources at the end of each response | |
| ## Features | |
| - π Web search integration | |
| - π€οΈ Context-aware weather data (only when relevant) | |
| - π Context-aware space weather data (only when relevant) | |
| - π RAG (Retrieval-Augmented Generation) with document database | |
| - π Real-time citations | |
| - β‘ Streaming output | |
| """) | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| label="Research Conversation", | |
| latex_delimiters=[{"left": "$$", "right": "$$", "display": True}], | |
| bubble_full_width=False | |
| ) | |
| msg = gr.Textbox( | |
| label="Research Question", | |
| placeholder="Ask a complex research question...", | |
| lines=3 | |
| ) | |
| use_rag = gr.Checkbox( | |
| label="π Use Document Knowledge (RAG)", | |
| value=False, | |
| info="Enable to search uploaded documents for context" | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Submit Research Query", variant="primary") | |
| clear_btn = gr.Button("Clear Conversation") | |
| examples = gr.Examples( | |
| examples=[ | |
| "What are the latest developments in quantum computing?", | |
| "How does climate change affect ocean currents?", | |
| "Explain the significance of the James Webb Space Telescope findings", | |
| "What are the economic implications of renewable energy adoption?", | |
| "How do solar flares affect satellite communications?" | |
| ], | |
| inputs=msg, | |
| label="Example Questions" | |
| ) | |
| with gr.TabItem("π Document Management"): | |
| gr.Markdown("## Upload Documents for RAG") | |
| gr.Markdown("Upload PDF or text files to add them to the knowledge base for document-based queries.") | |
| file_upload = gr.File( | |
| file_types=[".pdf", ".txt"], | |
| file_count="multiple", | |
| label="Upload Documents" | |
| ) | |
| upload_btn = gr.Button("π€ Upload Documents") | |
| upload_output = gr.Textbox(label="Upload Status", interactive=False) | |
| clear_docs_btn = gr.Button("ποΈ Clear All Documents") | |
| gr.Markdown("## Current Documents") | |
| doc_list = gr.Textbox( | |
| label="Document List", | |
| value="No documents uploaded yet", | |
| interactive=False | |
| ) | |
| with gr.TabItem("π Performance"): | |
| perf_refresh_btn = gr.Button("π Refresh Stats") | |
| perf_display = gr.JSON(label="System Statistics") | |
| def update_status(): | |
| """Update the system status display""" | |
| global startup_check_result | |
| if startup_check_result is None: | |
| startup_check_result = run_startup_check() | |
| result = startup_check_result.get_result() | |
| # Format status display based on result | |
| if result["status"] == "operational": | |
| cat_greeting = get_cat_greeting() | |
| status_md = f""" | |
| β **Server is operational and ready to assist!** | |
| πΎ **Cat Greeting:** | |
| *{cat_greeting}* | |
| β **Ready for your questions!** Ask anything and I'll pounce on the best information for you. | |
| """ | |
| elif result["status"] == "initializing": | |
| status_md = f""" | |
| β³ **Server is currently initializing (503 error detected)** | |
| β³ **Estimated wait time:** 5 minutes | |
| While you wait, why not prepare some treats? I'll be ready to hunt for knowledge soon! | |
| """ | |
| elif result["status"] == "checking": | |
| status_md = "π Performing startup checks..." | |
| else: | |
| status_md = f""" | |
| β **Server check failed** | |
| π **Details:** {result["details"]} | |
| """ | |
| return status_md | |
| def refresh_status(): | |
| """Refresh the startup check""" | |
| global startup_check_result | |
| startup_check_result = run_startup_check() | |
| return update_status() | |
| def respond(message, history, use_rag_flag): | |
| # Get streaming response | |
| for updated_history in research_assistant_wrapper(message, history, use_rag_flag): | |
| yield updated_history, update_status() | |
| def clear_conversation(): | |
| return [], [] | |
| def update_performance_stats(): | |
| stats = get_performance_stats() | |
| return stats | |
| # Set initial status on load | |
| demo.load(update_status, outputs=status_display) | |
| demo.load(update_performance_stats, outputs=perf_display) | |
| # Button interactions | |
| check_btn.click(refresh_status, outputs=status_display) | |
| submit_btn.click( | |
| respond, | |
| [msg, chat_history, use_rag], | |
| [chatbot, status_display] | |
| ) | |
| msg.submit( | |
| respond, | |
| [msg, chat_history, use_rag], | |
| [chatbot, status_display] | |
| ) | |
| clear_btn.click(clear_conversation, outputs=[chat_history, chatbot]) | |
| # Document management | |
| upload_btn.click(upload_documents, file_upload, upload_output) | |
| clear_docs_btn.click(lambda: vector_store.delete_collection(), None, upload_output) | |
| # Performance dashboard | |
| perf_refresh_btn.click(update_performance_stats, outputs=perf_display) | |
| if __name__ == "__main__": | |
| # Print public link information to logs | |
| print("===== Application Starting =====") | |
| print("Creating public link for Hugging Face Space...") | |
| print("Once the app launches, a public link will be available") | |
| print("================================") | |
| # Launch with public sharing enabled | |
| demo.launch(share=True) | |