""" Multi-Agent RAG Chatbot using LangGraph This system implements a 3-agent architecture: 1. Main Agent: Handles conversation flow, follow-ups, and determines when to call RAG 2. RAG Agent: Rewrites queries and applies filters for document retrieval 3. Response Agent: Generates final answers from retrieved documents Each agent has specialized prompts and responsibilities. """ import re import json import time import logging import traceback from pathlib import Path from datetime import datetime from dataclasses import dataclass from typing import Dict, List, Any, Optional, TypedDict from langchain_core.tools import tool from langgraph.graph import StateGraph, END from langchain_core.prompts import ChatPromptTemplate from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from src.pipeline import PipelineManager from src.llm.adapters import get_llm_client from src.config.paths import PROJECT_DIR, CONVERSATIONS_DIR from src.config.loader import load_config, get_embedding_model_for_collection logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) @dataclass class QueryContext: """Context extracted from conversation""" has_district: bool = False has_source: bool = False has_year: bool = False extracted_district: Optional[str] = None extracted_source: Optional[str] = None extracted_year: Optional[str] = None ui_filters: Dict[str, List[str]] = None confidence_score: float = 0.0 needs_follow_up: bool = False follow_up_question: Optional[str] = None class MultiAgentState(TypedDict): """State for the multi-agent conversation flow""" conversation_id: str messages: List[Any] current_query: str query_context: Optional[QueryContext] rag_query: Optional[str] rag_filters: Optional[Dict[str, Any]] retrieved_documents: Optional[List[Any]] final_response: Optional[str] agent_logs: List[str] conversation_context: Dict[str, Any] session_start_time: float last_ai_message_time: float class MultiAgentRAGChatbot: """Multi-agent RAG chatbot with specialized agents""" def __init__(self, config_path: str = "src/config/settings.yaml"): """Initialize the multi-agent chatbot""" self.config = load_config(config_path) # Get LLM provider from config reader_config = self.config.get("reader", {}) default_type = reader_config.get("default_type", "INF_PROVIDERS") provider_name = default_type.lower() self.llm_adapter = get_llm_client(provider_name, self.config) # Create a simple wrapper for LangChain compatibility class LLMWrapper: def __init__(self, adapter): self.adapter = adapter def invoke(self, messages): # Convert LangChain messages to the format expected by the adapter if isinstance(messages, list): formatted_messages = [] for msg in messages: if hasattr(msg, 'content'): role = "user" if msg.__class__.__name__ == "HumanMessage" else "assistant" formatted_messages.append({"role": role, "content": msg.content}) else: formatted_messages.append({"role": "user", "content": str(msg)}) else: formatted_messages = [{"role": "user", "content": str(messages)}] # Use the adapter to get response response = self.adapter.generate(formatted_messages) # Return a mock response object class MockResponse: def __init__(self, content): self.content = content return MockResponse(response.content) self.llm = LLMWrapper(self.llm_adapter) # Initialize pipeline manager early to load models logger.info("🔄 Initializing pipeline manager and loading models...") try: self.pipeline_manager = PipelineManager(self.config) logger.info("✅ Pipeline manager initialized and models loaded") except Exception as e: logger.error(f"❌ Failed to initialize pipeline manager: {e}") traceback.print_exc() raise RuntimeError(f"Pipeline manager initialization failed: {e}") # Connect to vector store logger.info("🔄 Connecting to vector store...") try: if not self.pipeline_manager.connect_vectorstore(): logger.error("❌ Failed to connect to vector store") logger.error("💡 Check that QDRANT_API_KEY environment variable is set") logger.error("💡 Check that Qdrant URL and collection name are correct in config") raise RuntimeError("Vector store connection failed") logger.info("✅ Vector store connected successfully") except RuntimeError: raise # Re-raise RuntimeError as-is except Exception as e: logger.error(f"❌ Error during vector store connection: {e}") traceback.print_exc() raise RuntimeError(f"Vector store connection failed: {e}") # Load dynamic data self._load_dynamic_data() # Build the multi-agent graph self.graph = self._build_graph() # Conversations directory - use PROJECT_DIR for local vs deployed compatibility self.conversations_dir = CONVERSATIONS_DIR try: # Use 777 permissions for maximum compatibility (HF Spaces runs as different user) self.conversations_dir.mkdir(parents=True, mode=0o777, exist_ok=True) except (PermissionError, OSError) as e: logger.warning(f"Could not create conversations directory at {self.conversations_dir}: {e}") # Fallback to a relative path (current directory) self.conversations_dir = Path("conversations") try: self.conversations_dir.mkdir(parents=True, mode=0o777, exist_ok=True) except (PermissionError, OSError) as e2: logger.error(f"Could not create conversations directory at {self.conversations_dir}: {e2}") raise RuntimeError(f"Failed to create conversations directory: {e2}") logger.info("🤖 Multi-Agent RAG Chatbot initialized") def _load_dynamic_data(self): """Load dynamic data from filter_options.json and add_district_metadata.py""" # Load filter options - use PROJECT_DIR relative path try: fo = PROJECT_DIR / "src" / "config" / "filter_options.json" if fo.exists(): with open(fo) as f: data = json.load(f) self.year_whitelist = [str(y).strip() for y in data.get("years", [])] self.source_whitelist = [str(s).strip() for s in data.get("sources", [])] self.district_whitelist = [str(d).strip() for d in data.get("districts", [])] else: # Fallback to default values self.year_whitelist = ['2018', '2019', '2020', '2021', '2022', '2023', '2024'] self.source_whitelist = ['Consolidated', 'Local Government', 'Ministry, Department and Agency'] self.district_whitelist = ['Kampala', 'Gulu', 'Kalangala'] except Exception as e: logger.warning(f"Could not load filter options: {e}") self.year_whitelist = ['2018', '2019', '2020', '2021', '2022', '2023', '2024'] self.source_whitelist = ['Consolidated', 'Local Government', 'Ministry, Department and Agency'] self.district_whitelist = ['Kampala', 'Gulu', 'Kalangala'] # Enrich district list from add_district_metadata.py (if available) try: from add_district_metadata import DistrictMetadataProcessor proc = DistrictMetadataProcessor() names = set() for key, mapping in proc.district_mappings.items(): if getattr(mapping, 'is_district', True): names.add(mapping.name) if names: merged = list(self.district_whitelist) for n in sorted(names): if n not in merged: merged.append(n) self.district_whitelist = merged logger.info(f"🧭 District whitelist enriched: {len(self.district_whitelist)} entries") except Exception as e: logger.info(f"â„šī¸ Could not enrich districts: {e}") # Calculate current year dynamically self.current_year = str(datetime.now().year) self.previous_year = str(datetime.now().year - 1) # Log the actual filter values for debugging logger.info(f"📊 ACTUAL FILTER VALUES:") logger.info(f" Years: {self.year_whitelist}") logger.info(f" Sources: {self.source_whitelist}") logger.info(f" Districts: {len(self.district_whitelist)} districts (first 10: {self.district_whitelist[:10]})") def _build_graph(self) -> StateGraph: """Build the multi-agent LangGraph""" graph = StateGraph(MultiAgentState) # Add nodes for each agent graph.add_node("main_agent", self._main_agent) graph.add_node("rag_agent", self._rag_agent) graph.add_node("response_agent", self._response_agent) # Define the flow graph.set_entry_point("main_agent") # Main agent decides next step graph.add_conditional_edges( "main_agent", self._should_call_rag, { "follow_up": END, "call_rag": "rag_agent" } ) # RAG agent calls response agent graph.add_edge("rag_agent", "response_agent") # Response agent returns to main agent for potential follow-ups graph.add_edge("response_agent", "main_agent") return graph.compile() def _should_call_rag(self, state: MultiAgentState) -> str: """Determine if we should call RAG or ask follow-up""" # If we already have a final response (from response agent), end if state.get("final_response"): return "follow_up" context = state["query_context"] if context and context.needs_follow_up: return "follow_up" return "call_rag" def _main_agent(self, state: MultiAgentState) -> MultiAgentState: """Main Agent: Handles conversation flow and follow-ups""" logger.info("đŸŽ¯ MAIN AGENT: Starting analysis") # If we already have a final response from response agent, end gracefully if state.get("final_response"): logger.info("đŸŽ¯ MAIN AGENT: Final response already exists, ending conversation flow") return state query = state["current_query"] messages = state["messages"] logger.info(f"đŸŽ¯ MAIN AGENT: Extracting UI filters from query") ui_filters = self._extract_ui_filters(query) logger.info(f"đŸŽ¯ MAIN AGENT: UI filters extracted: {ui_filters}") # Analyze query context logger.info(f"đŸŽ¯ MAIN AGENT: Analyzing query context") context = self._analyze_query_context(query, messages, ui_filters) # Log agent decision state["agent_logs"].append(f"MAIN AGENT: Context analyzed - district={context.has_district}, source={context.has_source}, year={context.has_year}") logger.info(f"đŸŽ¯ MAIN AGENT: Context analysis complete - district={context.has_district}, source={context.has_source}, year={context.has_year}") # Store context state["query_context"] = context # If follow-up needed, generate response if context.needs_follow_up: logger.info(f"đŸŽ¯ MAIN AGENT: Follow-up needed, generating question") response = context.follow_up_question state["final_response"] = response state["last_ai_message_time"] = time.time() logger.info(f"đŸŽ¯ MAIN AGENT: Follow-up question generated: {response[:100]}...") else: logger.info("đŸŽ¯ MAIN AGENT: No follow-up needed, proceeding to RAG") return state def _rag_agent(self, state: MultiAgentState) -> MultiAgentState: """RAG Agent: Rewrites queries and applies filters""" logger.info("🔍 RAG AGENT: Starting query rewriting and filter preparation") context = state["query_context"] messages = state["messages"] logger.info(f"🔍 RAG AGENT: Context received - district={context.has_district}, source={context.has_source}, year={context.has_year}") # Rewrite query for RAG logger.info(f"🔍 RAG AGENT: Rewriting query for optimal retrieval") rag_query = self._rewrite_query_for_rag(messages, context) logger.info(f"🔍 RAG AGENT: Query rewritten: '{rag_query}'") # Build filters logger.info(f"🔍 RAG AGENT: Building filters from context") filters = self._build_filters(context) logger.info(f"🔍 RAG AGENT: Filters built: {filters}") # Log RAG preparation state["agent_logs"].append(f"RAG AGENT: Query='{rag_query}', Filters={filters}") # Store for response agent state["rag_query"] = rag_query state["rag_filters"] = filters logger.info(f"🔍 RAG AGENT: Preparation complete, ready for retrieval") return state def _response_agent(self, state: MultiAgentState) -> MultiAgentState: """Response Agent: Generates final answer from retrieved documents""" logger.info("📝 RESPONSE AGENT: Starting document retrieval and answer generation") rag_query = state["rag_query"] filters = state["rag_filters"] logger.info(f"📝 RESPONSE AGENT: Starting RAG retrieval with query: '{rag_query}'") logger.info(f"📝 RESPONSE AGENT: Using filters: {filters}") # Perform RAG retrieval logger.info(f"📝 RESPONSE AGENT: Calling pipeline manager for retrieval") logger.info(f"🔍 ACTUAL RAG QUERY: '{rag_query}'") logger.info(f"🔍 ACTUAL FILTERS: {filters}") try: # Extract filenames from filters if present filenames = filters.get("filenames") if filters else None result = self.pipeline_manager.run( query=rag_query, sources=filters.get("sources") if filters else None, auto_infer_filters=False, filters=filters if filters else None ) logger.info(f"📝 RESPONSE AGENT: RAG retrieval completed - {len(result.sources)} documents retrieved") logger.info(f"🔍 RETRIEVAL DEBUG: Result type: {type(result)}") logger.info(f"🔍 RETRIEVAL DEBUG: Result sources type: {type(result.sources)}") # logger.info(f"🔍 RETRIEVAL DEBUG: Result metadata: {getattr(result, 'metadata', 'No metadata')}") if len(result.sources) == 0: logger.warning(f"âš ī¸ NO DOCUMENTS RETRIEVED: Query='{rag_query}', Filters={filters}") logger.warning(f"âš ī¸ RETRIEVAL DEBUG: This could be due to:") logger.warning(f" - Query too specific for available documents") logger.warning(f" - Filters too restrictive") logger.warning(f" - Vector store connection issues") logger.warning(f" - Embedding model issues") else: logger.info(f"✅ DOCUMENTS RETRIEVED: {len(result.sources)} documents found") for i, doc in enumerate(result.sources[:3]): # Log first 3 docs logger.info(f" Doc {i+1}: {getattr(doc, 'metadata', {}).get('filename', 'Unknown')[:50]}...") state["retrieved_documents"] = result.sources state["agent_logs"].append(f"RESPONSE AGENT: Retrieved {len(result.sources)} documents") # Check highest similarity score highest_score = 0.0 if result.sources: # Check reranked_score first (more accurate), fallback to original_score for doc in result.sources: score = doc.metadata.get('reranked_score') or doc.metadata.get('original_score', 0.0) if score > highest_score: highest_score = score logger.info(f"📝 RESPONSE AGENT: Highest similarity score: {highest_score:.4f}") # If highest score is too low, don't use retrieved documents if highest_score <= 0.15: logger.warning(f"âš ī¸ RESPONSE AGENT: Low similarity score ({highest_score:.4f} <= 0.15), using LLM knowledge only") response = self._generate_conversational_response_without_docs( state["current_query"], state["messages"] ) else: # Generate conversational response with documents logger.info(f"📝 RESPONSE AGENT: Generating conversational response from {len(result.sources)} documents") response = self._generate_conversational_response( state["current_query"], result.sources, result.answer, state["messages"] ) logger.info(f"📝 RESPONSE AGENT: Response generated: {response[:100]}...") state["final_response"] = response state["last_ai_message_time"] = time.time() logger.info(f"📝 RESPONSE AGENT: Answer generation complete") except Exception as e: logger.error(f"❌ RESPONSE AGENT ERROR: {e}") state["final_response"] = "I apologize, but I encountered an error while retrieving information. Please try again." state["last_ai_message_time"] = time.time() return state def _extract_ui_filters(self, query: str) -> Dict[str, List[str]]: """Extract UI filters from query""" filters = {} # Look for FILTER CONTEXT in query if "FILTER CONTEXT:" in query: # Extract the entire filter section (until USER QUERY: or end of query) filter_section = query.split("FILTER CONTEXT:")[1] if "USER QUERY:" in filter_section: filter_section = filter_section.split("USER QUERY:")[0] filter_section = filter_section.strip() # Parse sources if "Sources:" in filter_section: sources_line = [line for line in filter_section.split('\n') if line.strip().startswith('Sources:')][0] sources_str = sources_line.split("Sources:")[1].strip() if sources_str and sources_str != "None": filters["sources"] = [s.strip() for s in sources_str.split(",")] # Parse years if "Years:" in filter_section: years_line = [line for line in filter_section.split('\n') if line.strip().startswith('Years:')][0] years_str = years_line.split("Years:")[1].strip() if years_str and years_str != "None": filters["years"] = [y.strip() for y in years_str.split(",")] # Parse districts if "Districts:" in filter_section: districts_line = [line for line in filter_section.split('\n') if line.strip().startswith('Districts:')][0] districts_str = districts_line.split("Districts:")[1].strip() if districts_str and districts_str != "None": filters["districts"] = [d.strip() for d in districts_str.split(",")] # Parse filenames if "Filenames:" in filter_section: filenames_line = [line for line in filter_section.split('\n') if line.strip().startswith('Filenames:')][0] filenames_str = filenames_line.split("Filenames:")[1].strip() if filenames_str and filenames_str != "None": filters["filenames"] = [f.strip() for f in filenames_str.split(",")] return filters def _analyze_query_context(self, query: str, messages: List[Any], ui_filters: Dict[str, List[str]]) -> QueryContext: """Analyze query context using LLM""" logger.info(f"🔍 QUERY ANALYSIS: '{query[:50]}...' | UI filters: {ui_filters} | Messages: {len(messages)}") # Build conversation context conversation_context = "" for i, msg in enumerate(messages[-6:]): # Last 6 messages if isinstance(msg, HumanMessage): conversation_context += f"User: {msg.content}\n" elif isinstance(msg, AIMessage): conversation_context += f"Assistant: {msg.content}\n" # Create analysis prompt analysis_prompt = ChatPromptTemplate.from_messages([ SystemMessage(content=f"""You are the Main Agent in an advanced multi-agent RAG system for audit report analysis. đŸŽ¯ PRIMARY GOAL: Intelligently analyze user queries and determine the optimal conversation flow, whether that's answering directly, asking follow-ups, or proceeding to RAG retrieval. 🧠 INTELLIGENCE LEVEL: You are a sophisticated conversational AI that can handle any type of user interaction - from greetings to complex audit queries. 📊 YOUR EXPERTISE: You specialize in analyzing audit reports from various sources (Local Government, Ministry, Hospital, etc.) across different years and districts in Uganda. 🔍 AVAILABLE FILTERS: - Years: {', '.join(self.year_whitelist)} - Current year: {self.current_year}, Previous year: {self.previous_year} - Sources: {', '.join(self.source_whitelist)} - Districts: {', '.join(self.district_whitelist[:50])}... (and {len(self.district_whitelist)-50} more) đŸŽ›ī¸ UI FILTERS PROVIDED: {ui_filters} 📋 UI FILTER HANDLING: - If UI filters contain multiple values (e.g., districts: ['Lwengo', 'Kiboga']), extract ALL values - For multiple districts: extract each district separately and validate each one - For multiple years: extract each year separately and validate each one - For multiple sources: extract each source separately and validate each one - UI filters take PRIORITY over conversation context - use them first 🧭 CONVERSATION FLOW INTELLIGENCE: 1. **GREETINGS & GENERAL CHAT**: - If user greets you ("Hi", "Hello", "How are you"), respond warmly and guide them to audit-related questions - Example: "Hello! I'm here to help you analyze audit reports. What would you like to know about budget allocations, expenditures, or audit findings?" 2. **EDGE CASES**: - Handle "What can you do?", "Help", "I don't know what to ask" with helpful guidance - Example: "I can help you analyze audit reports! Try asking about budget allocations, salary management, PDM implementation, or any specific audit findings." 3. **AUDIT QUERIES**: - Extract ONLY values that EXACTLY match the available lists above - DO NOT hallucinate or infer values not in the lists - If user mentions "salary payroll management" - this is NOT a valid source filter **YEAR EXTRACTION**: - If user mentions "2023" and it's in the years list - extract "2023" - If user mentions "2022 / 23" - extract ["2022", "2023"] (as a JSON array) - If user mentions "2022-2023" - extract ["2022", "2023"] (as a JSON array) - If user mentions "latest couple of years" - extract the 2 most recent years from available data as JSON array - Always return years as JSON arrays when multiple years are mentioned **DISTRICT EXTRACTION**: - If user mentions "Kampala" and it's in the districts list - extract "Kampala" - If user mentions "Pader District" - extract "Pader" (remove "District" suffix) - If user mentions "Lwengo, Kiboga and Namutumba" - extract ["Lwengo", "Kiboga", "Namutumba"] (as JSON array) - If user mentions "Lwengo District and Kiboga District" - extract ["Lwengo", "Kiboga"] (as JSON array, remove "District" suffix) - Always return districts as JSON arrays when multiple districts are mentioned - If no exact matches found, set extracted values to null 4. **FILENAME FILTERING (MUTUALLY EXCLUSIVE)**: - If UI provides filenames filter - ONLY use that, ignore all other filters (year, district, source) - With filenames filter, no follow-ups needed - proceed directly to RAG - When filenames are specified, skip filter inference entirely 5. **HALLUCINATION PREVENTION**: - If user asks about a specific report but NO filename is selected in UI and NONE is extracted from conversation - DO NOT hallucinate - Clearly state: "I don't have any specific report selected. Could you please select a report from the list or tell me which report you'd like to analyze?" - DO NOT pretend to know which report they mean - DO NOT infer reports from context alone - only use explicitly mentioned reports 6. **CONVERSATION CONTEXT AWARENESS**: - ALWAYS consider the full conversation context when extracting filters - If district was mentioned in previous messages, include it in current analysis - If year was mentioned in previous messages, include it in current analysis - If source was mentioned in previous messages, include it in current analysis - Example: If conversation shows "User: Tell me about Pader District" then "User: 2023", extract both: district="Pader" and year="2023" 5. **SMART FOLLOW-UP STRATEGY**: - NEVER ask the same question twice in a row - If user provides source info, ask for year or district next - If user provides year info, ask for source or district next - If user provides district info, ask for year or source next - If user provides 2+ pieces of info, proceed to RAG instead of asking more - Make follow-ups conversational and contextual, not robotic 5. **DYNAMIC FOLLOW-UP EXAMPLES**: - Budget queries: "What year are you interested in?" or "Which department - Local Government or Ministry?" - PDM queries: "Which district are you interested in?" or "What year?" - General queries: "Could you be more specific about what you'd like to know?" đŸŽ¯ DECISION LOGIC: - If query is a greeting/general chat → needs_follow_up: true, provide helpful guidance - If query has 2+ pieces of info → needs_follow_up: false, proceed to RAG - If query has 1 piece of info → needs_follow_up: true, ask for missing piece - If query has 0 pieces of info → needs_follow_up: true, ask for clarification RESPOND WITH JSON ONLY: {{ "has_district": boolean, "has_source": boolean, "has_year": boolean, "extracted_district": "single district name or JSON array of districts or null", "extracted_source": "single source name or JSON array of sources or null", "extracted_year": "single year or JSON array of years or null", "confidence_score": 0.0-1.0, "needs_follow_up": boolean, "follow_up_question": "conversational question or helpful guidance or null" }}"""), HumanMessage(content=f"""Query: {query} Conversation Context: {conversation_context} CRITICAL: You MUST analyze the FULL conversation context above, not just the current query. - If ANY district was mentioned in previous messages, extract it - If ANY year was mentioned in previous messages, extract it - If ANY source was mentioned in previous messages, extract it - Combine information from ALL messages in the conversation Analyze this query using ONLY the exact values provided above:""") ]) try: response = self.llm.invoke(analysis_prompt.format_messages()) # Clean the response to extract JSON content = response.content.strip() if content.startswith("```json"): # Remove markdown formatting content = content.replace("```json", "").replace("```", "").strip() elif content.startswith("```"): # Remove generic markdown formatting content = content.replace("```", "").strip() # Clean and parse JSON with better error handling try: # Remove comments (// and /* */) from JSON # Remove single-line comments content = re.sub(r'//.*?$', '', content, flags=re.MULTILINE) # Remove multi-line comments content = re.sub(r'/\*.*?\*/', '', content, flags=re.DOTALL) analysis = json.loads(content) logger.info(f"🔍 QUERY ANALYSIS: ✅ Parsed successfully") except json.JSONDecodeError as e: logger.error(f"❌ JSON parsing failed: {e}") logger.error(f"❌ Raw content: {content[:200]}...") # Try to extract JSON from text if embedded json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: try: # Clean the extracted JSON cleaned_json = json_match.group() cleaned_json = re.sub(r'//.*?$', '', cleaned_json, flags=re.MULTILINE) cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json, flags=re.DOTALL) analysis = json.loads(cleaned_json) logger.info(f"🔍 QUERY ANALYSIS: ✅ Extracted and cleaned JSON from text") except json.JSONDecodeError as e2: logger.error(f"❌ Failed to extract JSON from text: {e2}") # Return fallback context context = QueryContext( has_district=False, has_source=False, has_year=False, extracted_district=None, extracted_source=None, extracted_year=None, confidence_score=0.0, needs_follow_up=True, follow_up_question="I apologize, but I'm having trouble processing your request. Could you please rephrase it or ask for help?" ) return context else: # Return fallback context context = QueryContext( has_district=False, has_source=False, has_year=False, extracted_district=None, extracted_source=None, extracted_year=None, confidence_score=0.0, needs_follow_up=True, follow_up_question="I apologize, but I'm having trouble processing your request. Could you please rephrase it or ask for help?" ) return context # Validate extracted values against whitelists extracted_district = analysis.get("extracted_district") extracted_source = analysis.get("extracted_source") extracted_year = analysis.get("extracted_year") logger.info(f"🔍 QUERY ANALYSIS: Raw extracted values - district: {extracted_district}, source: {extracted_source}, year: {extracted_year}") # Validate district (handle both single values and arrays) if extracted_district: if isinstance(extracted_district, list): # Validate each district in the array valid_districts = [] for district in extracted_district: if district in self.district_whitelist: valid_districts.append(district) else: # Try removing "District" suffix district_name = district.replace(" District", "").replace(" district", "") if district_name in self.district_whitelist: valid_districts.append(district_name) if valid_districts: extracted_district = valid_districts[0] if len(valid_districts) == 1 else valid_districts logger.info(f"🔍 QUERY ANALYSIS: Extracted districts: {extracted_district}") else: logger.warning(f"âš ī¸ No valid districts found in: '{extracted_district}'") extracted_district = None else: # Single district validation if extracted_district not in self.district_whitelist: # Try removing "District" suffix district_name = extracted_district.replace(" District", "").replace(" district", "") if district_name in self.district_whitelist: logger.info(f"🔍 QUERY ANALYSIS: Normalized district '{extracted_district}' to '{district_name}'") extracted_district = district_name else: logger.warning(f"âš ī¸ Invalid district extracted: '{extracted_district}' not in whitelist") extracted_district = None # Validate source (handle both single values and arrays) if extracted_source: if isinstance(extracted_source, list): # Validate each source in the array valid_sources = [] for source in extracted_source: if source in self.source_whitelist: valid_sources.append(source) else: logger.warning(f"âš ī¸ Invalid source in array: '{source}' not in whitelist") if valid_sources: extracted_source = valid_sources[0] if len(valid_sources) == 1 else valid_sources logger.info(f"🔍 QUERY ANALYSIS: Extracted sources: {extracted_source}") else: logger.warning(f"âš ī¸ No valid sources found in: '{extracted_source}'") extracted_source = None else: # Single source validation if extracted_source not in self.source_whitelist: logger.warning(f"âš ī¸ Invalid source extracted: '{extracted_source}' not in whitelist") extracted_source = None # Validate year (handle both single values and arrays) if extracted_year: if isinstance(extracted_year, list): # Validate each year in the array valid_years = [] for year in extracted_year: year_str = str(year) if year_str in self.year_whitelist: valid_years.append(year_str) if valid_years: extracted_year = valid_years[0] if len(valid_years) == 1 else valid_years logger.info(f"🔍 QUERY ANALYSIS: Extracted years: {extracted_year}") else: logger.warning(f"âš ī¸ No valid years found in: '{extracted_year}'") extracted_year = None else: # Single year validation year_str = str(extracted_year) if year_str not in self.year_whitelist: logger.warning(f"âš ī¸ Invalid year extracted: '{extracted_year}' not in whitelist") extracted_year = None else: extracted_year = year_str logger.info(f"🔍 QUERY ANALYSIS: Validated values - district: {extracted_district}, source: {extracted_source}, year: {extracted_year}") # Create QueryContext object context = QueryContext( has_district=bool(extracted_district), has_source=bool(extracted_source), has_year=bool(extracted_year), extracted_district=extracted_district, extracted_source=extracted_source, extracted_year=extracted_year, ui_filters=ui_filters, confidence_score=analysis.get("confidence_score", 0.0), needs_follow_up=analysis.get("needs_follow_up", False), follow_up_question=analysis.get("follow_up_question") ) logger.info(f"🔍 QUERY ANALYSIS: Analysis complete - needs_follow_up: {context.needs_follow_up}, confidence: {context.confidence_score}") # If filenames are provided in UI, skip follow-ups and proceed to RAG if ui_filters and ui_filters.get("filenames"): logger.info(f"🔍 QUERY ANALYSIS: Filenames provided, skipping follow-ups, proceeding to RAG") context.needs_follow_up = False context.follow_up_question = None # Additional smart decision logic if context.needs_follow_up: # Check if we have enough information to proceed info_count = sum([ bool(context.extracted_district), bool(context.extracted_source), bool(context.extracted_year) ]) # Check if user is asking for more info vs providing it query_lower = query.lower() is_requesting_info = any(phrase in query_lower for phrase in [ "please provide", "could you provide", "can you provide", "what is", "what are", "how much", "which", "what year", "what district", "what source", "tell me about" ]) # If we have 2+ pieces of info AND user is not requesting more info, proceed to RAG if info_count >= 2 and not is_requesting_info: logger.info(f"🔍 QUERY ANALYSIS: Smart override - have {info_count} pieces of info and user not requesting more, proceeding to RAG") context.needs_follow_up = False context.follow_up_question = None elif info_count >= 2 and is_requesting_info: logger.info(f"🔍 QUERY ANALYSIS: User requesting more info despite having {info_count} pieces, proceeding to RAG with comprehensive answer") context.needs_follow_up = False context.follow_up_question = None return context except Exception as e: logger.error(f"❌ Query analysis failed: {e}") # Fallback: proceed with RAG return QueryContext( has_district=bool(ui_filters.get("districts")), has_source=bool(ui_filters.get("sources")), has_year=bool(ui_filters.get("years")), ui_filters=ui_filters, confidence_score=0.5, needs_follow_up=False ) def _rewrite_query_for_rag(self, messages: List[Any], context: QueryContext) -> str: """Rewrite query for optimal RAG retrieval""" logger.info("🔄 QUERY REWRITING: Starting query rewrite for RAG") logger.info(f"🔄 QUERY REWRITING: Processing {len(messages)} messages") # Build conversation context logger.info(f"🔄 QUERY REWRITING: Building conversation context from last 6 messages") conversation_lines = [] for i, msg in enumerate(messages[-6:]): if isinstance(msg, HumanMessage): conversation_lines.append(f"User: {msg.content}") logger.info(f"🔄 QUERY REWRITING: Message {i+1}: User - {msg.content[:50]}...") elif isinstance(msg, AIMessage): conversation_lines.append(f"Assistant: {msg.content}") logger.info(f"🔄 QUERY REWRITING: Message {i+1}: Assistant - {msg.content[:50]}...") convo_text = "\n".join(conversation_lines) logger.info(f"🔄 QUERY REWRITING: Conversation context built ({len(convo_text)} chars)") # Create rewrite prompt rewrite_prompt = ChatPromptTemplate.from_messages([ SystemMessage(content=f"""You are a query rewriter for RAG retrieval. GOAL: Create the best possible search query for document retrieval. CRITICAL RULES: 1. Focus on the core information need from the conversation 2. Remove meta-verbs like "summarize", "list", "compare", "how much", "what" - keep the content focus 3. DO NOT include filter details (years, districts, sources) - these are applied separately as filters 4. DO NOT include specific years, district names, or source types in the query 5. Output ONE clear sentence suitable for vector search 6. Keep it generic and focused on the topic/subject matter EXAMPLES: - "What are the top challenges in budget allocation?" → "budget allocation challenges" - "How were PDM administrative costs utilized in 2023?" → "PDM administrative costs utilization" - "Compare salary management across districts" → "salary management" - "How much was budget allocation for Local Government in 2023?" → "budget allocation" OUTPUT FORMAT: Provide your response in this exact format: EXPLANATION: [Your reasoning here] QUERY: [One clean sentence for retrieval] The QUERY line will be extracted and used directly for RAG retrieval."""), HumanMessage(content=f"""Conversation: {convo_text} Rewrite the best retrieval query:""") ]) try: logger.info(f"🔄 QUERY REWRITING: Calling LLM for query rewrite") response = self.llm.invoke(rewrite_prompt.format_messages()) logger.info(f"🔄 QUERY REWRITING: LLM response received: {response.content[:100]}...") rewritten = response.content.strip() # Extract only the QUERY line from the structured response lines = rewritten.split('\n') query_line = None for line in lines: if line.strip().startswith('QUERY:'): query_line = line.replace('QUERY:', '').strip() break if query_line and len(query_line) > 5: logger.info(f"🔄 QUERY REWRITING: Query rewritten successfully: '{query_line[:50]}...'") return query_line else: logger.info(f"🔄 QUERY REWRITING: No QUERY line found or too short, using fallback") # Fallback to last user message for msg in reversed(messages): if isinstance(msg, HumanMessage): logger.info(f"🔄 QUERY REWRITING: Using fallback message: '{msg.content[:50]}...'") return msg.content logger.info(f"🔄 QUERY REWRITING: Using default fallback") return "audit report information" except Exception as e: logger.error(f"❌ QUERY REWRITING: Error during rewrite: {e}") # Fallback for msg in reversed(messages): if isinstance(msg, HumanMessage): logger.info(f"🔄 QUERY REWRITING: Using error fallback message: '{msg.content[:50]}...'") return msg.content logger.info(f"🔄 QUERY REWRITING: Using default error fallback") return "audit report information" def _build_filters(self, context: QueryContext) -> Dict[str, Any]: """Build filters for RAG retrieval""" logger.info("🔧 FILTER BUILDING: Starting filter construction") filters = {} # Check for filename filtering first (mutually exclusive) if context.ui_filters and context.ui_filters.get("filenames"): logger.info(f"🔧 FILTER BUILDING: Filename filtering requested (mutually exclusive mode)") filters["filenames"] = context.ui_filters["filenames"] logger.info(f"🔧 FILTER BUILDING: Added filenames filter: {context.ui_filters['filenames']}") logger.info(f"🔧 FILTER BUILDING: Final filters: {filters}") return filters # Return early, skip all other filters # UI filters take priority, but merge with extracted context if UI filters are incomplete if context.ui_filters: logger.info(f"🔧 FILTER BUILDING: UI filters present: {context.ui_filters}") # Add UI filters first if context.ui_filters.get("sources"): filters["sources"] = context.ui_filters["sources"] logger.info(f"🔧 FILTER BUILDING: Added sources filter from UI: {context.ui_filters['sources']}") if context.ui_filters.get("years"): filters["year"] = context.ui_filters["years"] logger.info(f"🔧 FILTER BUILDING: Added years filter from UI: {context.ui_filters['years']}") if context.ui_filters.get("districts"): # Normalize district names to title case (match Qdrant metadata format) normalized_districts = [d.title() for d in context.ui_filters['districts']] filters["district"] = normalized_districts logger.info(f"🔧 FILTER BUILDING: Added districts filter from UI: {context.ui_filters['districts']} → normalized: {normalized_districts}") # Merge with extracted context for missing filters if not filters.get("year") and context.extracted_year: # Handle both single values and arrays if isinstance(context.extracted_year, list): filters["year"] = context.extracted_year else: filters["year"] = [context.extracted_year] logger.info(f"🔧 FILTER BUILDING: Added extracted year filter (UI missing): {context.extracted_year}") if not filters.get("district") and context.extracted_district: # Handle both single values and arrays if isinstance(context.extracted_district, list): # Normalize district names to title case (match Qdrant metadata format) normalized = [d.title() for d in context.extracted_district] filters["district"] = normalized else: filters["district"] = [context.extracted_district.title()] logger.info(f"🔧 FILTER BUILDING: Added extracted district filter (UI missing): {context.extracted_district}") if not filters.get("sources") and context.extracted_source: # Handle both single values and arrays if isinstance(context.extracted_source, list): filters["sources"] = context.extracted_source else: filters["sources"] = [context.extracted_source] logger.info(f"🔧 FILTER BUILDING: Added extracted source filter (UI missing): {context.extracted_source}") else: logger.info(f"🔧 FILTER BUILDING: No UI filters, using extracted context") # Use extracted context if context.extracted_source: # Handle both single values and arrays if isinstance(context.extracted_source, list): filters["sources"] = context.extracted_source else: filters["sources"] = [context.extracted_source] logger.info(f"🔧 FILTER BUILDING: Added extracted source filter: {context.extracted_source}") if context.extracted_year: # Handle both single values and arrays if isinstance(context.extracted_year, list): filters["year"] = context.extracted_year else: filters["year"] = [context.extracted_year] logger.info(f"🔧 FILTER BUILDING: Added extracted year filter: {context.extracted_year}") if context.extracted_district: # Handle both single values and arrays if isinstance(context.extracted_district, list): filters["district"] = context.extracted_district else: filters["district"] = [context.extracted_district] logger.info(f"🔧 FILTER BUILDING: Added extracted district filter: {context.extracted_district}") logger.info(f"🔧 FILTER BUILDING: Final filters: {filters}") return filters def _generate_conversational_response(self, query: str, documents: List[Any], rag_answer: str, messages: List[Any]) -> str: """Generate conversational response from RAG results""" logger.info("đŸ’Ŧ RESPONSE GENERATION: Starting conversational response generation") logger.info(f"đŸ’Ŧ RESPONSE GENERATION: Processing {len(documents)} documents") logger.info(f"đŸ’Ŧ RESPONSE GENERATION: Query: '{query[:50]}...'") # Create response prompt logger.info(f"đŸ’Ŧ RESPONSE GENERATION: Building response prompt") response_prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are a helpful audit report assistant. Generate a natural, conversational response. RULES: 1. Answer the user's question directly and clearly 2. Use the retrieved documents as evidence 3. Be conversational, not technical 4. Don't mention scores, retrieval details, or technical implementation 5. If relevant documents were found, reference them naturally 6. If no relevant documents, explain based on your knowledge (if you have it) or just say you do not have enough information. 7. If the passages have useful facts or numbers, use them in your answer. 8. When you use information from a passage, mention where it came from by using [Doc i] at the end of the sentence. i stands for the number of the document. 9. Do not use the sentence 'Doc i says ...' to say where information came from. 10. If the same thing is said in more than one document, you can mention all of them like this: [Doc i, Doc j, Doc k] 11. Do not just summarize each passage one by one. Group your summaries to highlight the key parts in the explanation. 12. If it makes sense, use bullet points and lists to make your answers easier to understand. 13. You do not need to use every passage. Only use the ones that help answer the question. 14. If the documents do not have the information needed to answer the question, just say you do not have enough information. TONE: Professional but friendly, like talking to a colleague."""), HumanMessage(content=f"""User Question: {query} Retrieved Documents: {len(documents)} documents found RAG Answer: {rag_answer} Generate a conversational response:""") ]) try: logger.info(f"đŸ’Ŧ RESPONSE GENERATION: Calling LLM for final response") response = self.llm.invoke(response_prompt.format_messages()) logger.info(f"đŸ’Ŧ RESPONSE GENERATION: LLM response received: {response.content[:100]}...") return response.content.strip() except Exception as e: logger.error(f"❌ RESPONSE GENERATION: Error during generation: {e}") logger.info(f"đŸ’Ŧ RESPONSE GENERATION: Using RAG answer as fallback") return rag_answer # Fallback to RAG answer def _generate_conversational_response_without_docs(self, query: str, messages: List[Any]) -> str: """Generate conversational response using only LLM knowledge and conversation history""" logger.info("đŸ’Ŧ RESPONSE GENERATION (NO DOCS): Starting response generation without documents") logger.info(f"đŸ’Ŧ RESPONSE GENERATION (NO DOCS): Query: '{query[:50]}...'") # Build conversation context conversation_context = "" for i, msg in enumerate(messages[-6:]): # Last 6 messages for context if isinstance(msg, HumanMessage): conversation_context += f"User: {msg.content}\n" elif isinstance(msg, AIMessage): conversation_context += f"Assistant: {msg.content}\n" # Create response prompt logger.info(f"đŸ’Ŧ RESPONSE GENERATION (NO DOCS): Building response prompt") response_prompt = ChatPromptTemplate.from_messages([ SystemMessage(content="""You are a helpful audit report assistant. Generate a natural, conversational response. RULES: 1. Answer the user's question directly and clearly based on your knowledge 2. Use conversation history for context 3. Be conversational, not technical 4. Acknowledge if the answer is based on general knowledge rather than specific documents 5. Stay professional but friendly TONE: Professional but friendly, like talking to a colleague."""), HumanMessage(content=f"""Current Question: {query} Conversation History: {conversation_context} Generate a conversational response based on your knowledge:""") ]) try: logger.info(f"đŸ’Ŧ RESPONSE GENERATION (NO DOCS): Calling LLM") response = self.llm.invoke(response_prompt.format_messages()) logger.info(f"đŸ’Ŧ RESPONSE GENERATION (NO DOCS): LLM response received: {response.content[:100]}...") return response.content.strip() except Exception as e: logger.error(f"❌ RESPONSE GENERATION (NO DOCS): Error during generation: {e}") return "I apologize, but I encountered an error. Please try asking your question differently." def chat(self, user_input: str, conversation_id: str = "default") -> Dict[str, Any]: """Main chat interface""" logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Processing '{user_input[:50]}...'") # Load conversation logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Loading conversation {conversation_id}") conversation_file = self.conversations_dir / f"{conversation_id}.json" conversation = self._load_conversation(conversation_file) logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Loaded {len(conversation['messages'])} previous messages") # Add user message conversation["messages"].append(HumanMessage(content=user_input)) logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Added user message to conversation") # Prepare state logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Preparing state for graph execution") state = MultiAgentState( conversation_id=conversation_id, messages=conversation["messages"], current_query=user_input, query_context=None, rag_query=None, rag_filters=None, retrieved_documents=None, final_response=None, agent_logs=[], conversation_context=conversation.get("context", {}), session_start_time=conversation["session_start_time"], last_ai_message_time=conversation["last_ai_message_time"] ) # Run multi-agent graph logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Executing multi-agent graph") final_state = self.graph.invoke(state) logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Graph execution completed") # Add AI response to conversation if final_state["final_response"]: conversation["messages"].append(AIMessage(content=final_state["final_response"])) logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Added AI response to conversation") # Update conversation conversation["last_ai_message_time"] = final_state["last_ai_message_time"] conversation["context"] = final_state["conversation_context"] # Save conversation logger.info(f"đŸ’Ŧ MULTI-AGENT CHAT: Saving conversation") self._save_conversation(conversation_file, conversation) logger.info("✅ MULTI-AGENT CHAT: Completed") # Return response and RAG results return { 'response': final_state["final_response"], 'rag_result': { 'sources': final_state["retrieved_documents"] or [], 'answer': final_state["final_response"] }, 'agent_logs': final_state["agent_logs"], 'actual_rag_query': final_state.get("rag_query", "") } def _load_conversation(self, conversation_file: Path) -> Dict[str, Any]: """Load conversation from file""" if conversation_file.exists(): try: with open(conversation_file) as f: data = json.load(f) # Convert message dicts back to LangChain messages messages = [] for msg_data in data.get("messages", []): if msg_data["type"] == "human": messages.append(HumanMessage(content=msg_data["content"])) elif msg_data["type"] == "ai": messages.append(AIMessage(content=msg_data["content"])) data["messages"] = messages return data except Exception as e: logger.warning(f"Could not load conversation: {e}") # Return default conversation return { "messages": [], "session_start_time": time.time(), "last_ai_message_time": time.time(), "context": {} } def _save_conversation(self, conversation_file: Path, conversation: Dict[str, Any]): """Save conversation to file""" try: # Ensure the conversations directory exists with proper permissions conversation_file.parent.mkdir(parents=True, mode=0o777, exist_ok=True) # Convert messages to serializable format messages_data = [] for msg in conversation["messages"]: if isinstance(msg, HumanMessage): messages_data.append({"type": "human", "content": msg.content}) elif isinstance(msg, AIMessage): messages_data.append({"type": "ai", "content": msg.content}) conversation_data = { "messages": messages_data, "session_start_time": conversation["session_start_time"], "last_ai_message_time": conversation["last_ai_message_time"], "context": conversation.get("context", {}) } with open(conversation_file, 'w') as f: json.dump(conversation_data, f, indent=2) except Exception as e: logger.error(f"Could not save conversation: {e}") logger.error(f"Traceback: {traceback.format_exc()}") def get_multi_agent_chatbot(): """Get multi-agent chatbot instance""" return MultiAgentRAGChatbot() if __name__ == "__main__": # Test the multi-agent system chatbot = MultiAgentRAGChatbot() # Test conversation result = chatbot.chat("List me top 10 challenges in budget allocation for the last 3 years") print("Response:", result['response']) print("Agent Logs:", result['agent_logs'])