#!/usr/bin/env python3 """ QUADRANT RAG System - Enhanced UI v2 with Document Library Professional chat interface with persistent document storage """ import os import streamlit as st import json import uuid import time from typing import List, Dict, Any, Optional from pathlib import Path from datetime import datetime, timezone import tempfile import base64 # Load environment variables first import os from dotenv import load_dotenv load_dotenv() # Import RAG components from rag_core import DynamicRAG, extract_pdf_pages, create_chunks # Page configuration st.set_page_config( page_title="QUADRANT RAG - AI Document Assistant", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded" ) # Enhanced CSS for modern UI st.markdown(""" """, unsafe_allow_html=True) # Initialize session state if 'rag_system' not in st.session_state: st.session_state.rag_system = None if 'current_doc' not in st.session_state: st.session_state.current_doc = None if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'all_documents' not in st.session_state: st.session_state.all_documents = [] if 'processing' not in st.session_state: st.session_state.processing = False if 'waiting_for_response' not in st.session_state: st.session_state.waiting_for_response = False def init_rag_system(): """Initialize the RAG system""" try: # Check environment variables from dotenv import load_dotenv load_dotenv() # Reload environment variables openai_key = os.environ.get('OPENAI_API_KEY', '') qdrant_url = os.environ.get('QDRANT_URL', '') qdrant_key = os.environ.get('QDRANT_API_KEY', '') if not openai_key or openai_key == 'your-openai-api-key-here': st.error("❌ OpenAI API key not configured. Please set OPENAI_API_KEY in your environment.") return False if not qdrant_url or not qdrant_key: st.warning("⚠️ Qdrant Cloud credentials not found. Using local file storage.") # Show initialization progress progress_placeholder = st.empty() with progress_placeholder: with st.spinner("πŸ”„ Initializing RAG System..."): try: st.session_state.rag_system = DynamicRAG() # Load all documents from Qdrant st.session_state.all_documents = st.session_state.rag_system.get_all_documents() except Exception as init_error: st.error(f"❌ RAG System initialization failed: {str(init_error)}") # Continue anyway for basic functionality st.session_state.all_documents = [] progress_placeholder.success("βœ… RAG System initialized successfully!") return True except Exception as e: st.error(f"❌ Failed to initialize RAG system: {str(e)}") # Don't fail completely - allow app to show error state return True def process_pdf_upload(uploaded_file) -> Optional[Dict[str, Any]]: """Process uploaded PDF file""" try: st.session_state.processing = True # Save uploaded file temp_path = Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.pdf" with open(temp_path, "wb") as f: f.write(uploaded_file.getvalue()) # Extract text pages = extract_pdf_pages(str(temp_path)) # Create chunks chunks = create_chunks(pages, chunk_size=3000, overlap=200) # Generate document ID doc_id = f"{uploaded_file.name.replace('.pdf', '')}_{int(time.time())}" # Store in Qdrant st.session_state.rag_system.store_document(doc_id, chunks) # Create document info doc_info = { 'doc_id': doc_id, 'title': uploaded_file.name, 'pages': len(pages), 'chunks': len(chunks), 'upload_time': datetime.now(timezone.utc).isoformat() } # Update documents list st.session_state.all_documents = st.session_state.rag_system.get_all_documents() # Clean up temp_path.unlink() return doc_info except Exception as e: st.error(f"Error processing PDF: {str(e)}") return None finally: st.session_state.processing = False def query_document(question: str) -> tuple[str, List[Dict[str, Any]]]: """Query the current document""" try: if not st.session_state.current_doc: return "Please select a document first.", [] # Search in current document - increased for better coverage search_results = st.session_state.rag_system.search( query=question, doc_id=st.session_state.current_doc['doc_id'], top_k=10 ) if not search_results: return "I couldn't find relevant information about that in the document.", [] # Generate answer answer = st.session_state.rag_system.generate_answer(question, search_results) # Check if the answer indicates insufficient evidence insufficient_keywords = ["insufficient evidence", "couldn't find", "no relevant information", "cannot answer"] # Prepare citations only if the answer has sufficient evidence citations = [] if not any(keyword in answer.lower() for keyword in insufficient_keywords): for i, result in enumerate(search_results[:3]): citations.append({ 'page': result['page'], 'text': result['text'][:150] + "..." if len(result['text']) > 150 else result['text'], 'score': round(result['score'], 3) }) return answer, citations except Exception as e: return f"Sorry, I encountered an error: {str(e)}", [] def render_sidebar(): """Render the document library sidebar""" with st.sidebar: # Header st.markdown("""

πŸ“š Document Library

{} documents stored
""".format(len(st.session_state.all_documents)), unsafe_allow_html=True) # Upload new document with st.expander("πŸ“€ Upload New Document", expanded=False): uploaded_file = st.file_uploader( "Choose a PDF file", type=['pdf'], label_visibility="collapsed", disabled=st.session_state.processing ) if uploaded_file and st.button("Upload", type="primary", use_container_width=True): with st.spinner("Processing..."): doc = process_pdf_upload(uploaded_file) if doc: st.success("βœ… Document uploaded successfully!") st.rerun() # Document list if st.session_state.all_documents: st.markdown("### Your Documents") for doc in st.session_state.all_documents: # Check if this is the current document is_active = (st.session_state.current_doc and doc['doc_id'] == st.session_state.current_doc['doc_id']) # Document card card_class = "doc-card active" if is_active else "doc-card" col1, col2 = st.columns([5, 1]) with col1: if st.button( f"πŸ“„ **{doc['title'][:30]}{'...' if len(doc['title']) > 30 else ''}**\n\n" f"πŸ“Š {doc['pages']} pages β€’ {doc['chunks']} chunks", key=f"doc_{doc['doc_id']}", use_container_width=True ): st.session_state.current_doc = doc st.session_state.chat_history = [] st.rerun() with col2: if st.button("πŸ—‘οΈ", key=f"del_{doc['doc_id']}", help="Delete this document"): if st.session_state.rag_system.delete_document(doc['doc_id']): st.session_state.all_documents = st.session_state.rag_system.get_all_documents() if (st.session_state.current_doc and doc['doc_id'] == st.session_state.current_doc['doc_id']): st.session_state.current_doc = None st.session_state.chat_history = [] st.rerun() else: st.markdown("""
πŸ“­

No documents yet

Upload your first PDF to get started

""", unsafe_allow_html=True) def render_chat_interface(): """Render the main chat interface""" if not st.session_state.current_doc: # No document selected st.markdown("""
πŸ“š

Welcome to QUADRANT RAG Medical Assistant

Upload medical documents or select from your library to start AI-powered medical Q&A

✨ Powered by OpenAI GPT-5-mini & Qdrant Cloud β€’ Optimized for Medical Education

""", unsafe_allow_html=True) else: # Chat interface title = st.session_state.current_doc['title'] # Truncate overly long titles for cleaner UI display_title = (title[:100] + "…") if len(title) > 100 else title pages = st.session_state.current_doc['pages'] chunks = st.session_state.current_doc['chunks'] st.markdown( f"""
πŸ’¬ Chatting with: {display_title}
{pages} pages β€’ {chunks} chunks β€’ Ask anything about this document
""", unsafe_allow_html=True, ) # New chat UI using Streamlit's native components if not st.session_state.chat_history: st.info("Start a conversation about your document. Ask me to explain, summarize, or find specifics.") for msg in st.session_state.chat_history: if msg['type'] == 'user': with st.chat_message("user"): st.markdown(msg['content']) else: with st.chat_message("assistant"): st.markdown(msg['content']) if msg.get('citations'): with st.expander(f"πŸ“š {len(msg['citations'])} Sources"): for i, cite in enumerate(msg['citations'], 1): st.markdown(f"**[{i}] Page {cite['page']}** (Relevance: {cite['score']:.3f})") st.text(cite['text'][:200] + "..." if len(cite['text']) > 200 else cite['text']) st.divider() # Chat input and immediate handling if prompt := st.chat_input("Ask anything about this document…"): st.session_state.chat_history.append({'type': 'user', 'content': prompt}) with st.chat_message("assistant"): with st.spinner("Thinking..."): answer, citations = query_document(prompt) st.session_state.chat_history.append({ 'type': 'assistant', 'content': answer, 'citations': citations if citations else None }) st.markdown(answer) if citations: with st.expander(f"πŸ“š {len(citations)} Sources"): for i, cite in enumerate(citations, 1): st.markdown(f"**[{i}] Page {cite['page']}** (Relevance: {cite['score']:.3f})") st.text(cite['text'][:200] + "..." if len(cite['text']) > 200 else cite['text']) st.divider() # Prevent legacy UI from rendering below return def main(): # Configuration section for missing environment variables openai_key = os.environ.get('OPENAI_API_KEY', '') # Check if we're in Hugging Face Spaces environment is_hf_spaces = os.environ.get('SPACE_ID') is not None if not openai_key or openai_key == 'your-openai-api-key-here': if is_hf_spaces: st.error("πŸ”‘ **OpenAI API Key Required for Hugging Face Spaces**") st.markdown(""" To use this app on Hugging Face Spaces: 1. Go to your Space Settings 2. Add a new secret named `OPENAI_API_KEY` 3. Enter your OpenAI API key as the value 4. Restart the Space You can get an API key from: https://platform.openai.com/api-keys """) else: st.error("πŸ”‘ **OpenAI API Key Required**") st.markdown(""" Please set your OpenAI API key: 1. Add `OPENAI_API_KEY=your-key-here` to the `.env` file, OR 2. Set it as an environment variable in your deployment platform """) # Quick input for testing (only in local environment) with st.expander("πŸ’‘ Quick Setup (for testing)"): key_input = st.text_input("Enter OpenAI API Key:", type="password") if st.button("Set API Key") and key_input: os.environ['OPENAI_API_KEY'] = key_input st.success("βœ… API Key set! Initializing system...") st.rerun() st.stop() # Initialize system (non-blocking for faster health check) if not st.session_state.rag_system: init_rag_system() # This now doesn't block the app even if it fails # Header st.markdown("""

πŸ€– QUADRANT RAG - Document AI Assistant

Powered by Qdrant Vector Database & OpenAI GPT-4o-mini

""", unsafe_allow_html=True) # Sidebar render_sidebar() # Main content render_chat_interface() if __name__ == "__main__": main()