""" Intelligent RAG Chatbot with Smart Query Analysis and Conversation Management This chatbot provides intelligent conversation flow with: - Smart query analysis and expansion - Single LangSmith conversation traces - Local conversation logging - Context-aware RAG retrieval - Natural conversation without technical jargon """ import os import json import time import logging from pathlib import Path from dataclasses import dataclass from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, TypedDict import re from langgraph.graph import StateGraph, END from langchain_core.prompts import ChatPromptTemplate from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from src.pipeline import PipelineManager from src.config.loader import load_config from src.config.paths import PROJECT_DIR @dataclass class QueryAnalysis: """Analysis result of a user query""" has_district: bool has_source: bool has_year: bool extracted_district: Optional[str] extracted_source: Optional[str] extracted_year: Optional[str] confidence_score: float can_answer_directly: bool missing_filters: List[str] suggested_follow_up: Optional[str] expanded_query: Optional[str] = None # Query expansion for better RAG class ConversationState(TypedDict): """State for the conversation flow""" conversation_id: str messages: List[Any] current_query: str query_analysis: Optional[QueryAnalysis] rag_query: Optional[str] rag_result: Optional[Any] final_response: Optional[str] conversation_context: Dict[str, Any] # Store conversation context session_start_time: float last_ai_message_time: float class IntelligentRAGChatbot: """Intelligent chatbot with smart query analysis and conversation management""" def __init__(self, suppress_logs=False): """Initialize the intelligent chatbot""" # Setup logger to avoid cluttering UI self.logger = logging.getLogger(__name__) if suppress_logs: self.logger.setLevel(logging.CRITICAL) # Suppress all logs else: self.logger.setLevel(logging.INFO) if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.info("๐Ÿค– INITIALIZING: Intelligent RAG Chatbot") # Load configuration first self.config = load_config() # Use the same LLM configuration as the existing system from auditqa.llm.adapters import get_llm_client # Get LLM client using the same configuration reader_config = self.config.get("reader", {}) default_type = reader_config.get("default_type", "INF_PROVIDERS") # Convert to lowercase as that's how it's registered provider_name = default_type.lower() self.llm_adapter = get_llm_client(provider_name, self.config) # Create a simple wrapper for LangChain compatibility class LLMWrapper: def __init__(self, adapter): self.adapter = adapter def invoke(self, messages): # Convert LangChain messages to the format expected by the adapter if isinstance(messages, list): # Convert LangChain messages to dict format message_dicts = [] for msg in messages: if hasattr(msg, 'content'): role = "user" if isinstance(msg, HumanMessage) else "assistant" message_dicts.append({"role": role, "content": msg.content}) else: message_dicts.append({"role": "user", "content": str(msg)}) else: # Single message message_dicts = [{"role": "user", "content": str(messages)}] # Use the adapter to generate response llm_response = self.adapter.generate(message_dicts) # Return in LangChain format class MockResponse: def __init__(self, content): self.content = content return MockResponse(llm_response.content) self.llm = LLMWrapper(self.llm_adapter) # Initialize pipeline manager for RAG self.logger.info("๐Ÿ”ง PIPELINE: Initializing PipelineManager...") self.pipeline_manager = PipelineManager(self.config) # Ensure vectorstore is connected self.logger.info("๐Ÿ”— VECTORSTORE: Connecting to Qdrant...") try: vectorstore = self.pipeline_manager.vectorstore_manager.connect_to_existing() self.logger.info("โœ… VECTORSTORE: Connected successfully") except Exception as e: self.logger.error(f"โŒ VECTORSTORE: Connection failed: {e}") # Fix LLM client to use the same provider as chatbot self.logger.info("๐Ÿ”ง LLM: Fixing PipelineManager LLM client...") self.pipeline_manager.llm_client = self.llm_adapter self.logger.info("โœ… LLM: PipelineManager now uses same LLM as chatbot") self.logger.info("โœ… PIPELINE: PipelineManager initialized") # Available metadata for filtering self.available_metadata = { 'sources': [ 'KCCA', 'MAAIF', 'MWTS', 'Gulu DLG', 'Kalangala DLG', 'Namutumba DLG', 'Lwengo DLG', 'Kiboga DLG', 'Annual Consolidated OAG', 'Consolidated', 'Hospital', 'Local Government', 'Ministry, Department and Agency', 'Project', 'Thematic', 'Value for Money' ], 'years': ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025'], 'districts': [ 'Gulu', 'Kalangala', 'Kampala', 'Namutumba', 'Lwengo', 'Kiboga', 'Fort Portal', 'Arua', 'Kasese', 'Kabale', 'Masindi', 'Mbale', 'Jinja', 'Masaka', 'Mbarara', 'KCCA' ] } # Try to load district whitelist from filter_options.json try: fo = PROJECT_DIR / "src" / "config" / "filter_options.json" if fo.exists(): with open(fo) as f: data = json.load(f) if isinstance(data, dict) and data.get("districts"): self.district_whitelist = [d.strip() for d in data["districts"] if d] else: self.district_whitelist = self.available_metadata['districts'] else: self.district_whitelist = self.available_metadata['districts'] except Exception: self.district_whitelist = self.available_metadata['districts'] # Enrich whitelist from add_district_metadata.py if available (optional module) try: from add_district_metadata import DistrictMetadataProcessor proc = DistrictMetadataProcessor() names = set() for key, mapping in proc.district_mappings.items(): if getattr(mapping, 'is_district', True): names.add(mapping.name) if names: # Merge while preserving order: existing first, then new ones not present merged = list(self.district_whitelist) for n in sorted(names): if n not in merged: merged.append(n) self.district_whitelist = merged self.logger.info(f"๐Ÿงญ District whitelist enriched: {len(self.district_whitelist)} entries") except Exception as e: self.logger.info(f"โ„น๏ธ Could not enrich districts from add_district_metadata: {e}") # Get dynamic year list from filter_options.json try: fo = PROJECT_DIR / "src" / "config" / "filter_options.json" if fo.exists(): with open(fo) as f: data = json.load(f) if isinstance(data, dict) and data.get("years"): self.year_whitelist = [str(y).strip() for y in data["years"] if y] else: self.year_whitelist = self.available_metadata['years'] else: self.year_whitelist = self.available_metadata['years'] except Exception: self.year_whitelist = self.available_metadata['years'] # Calculate current year dynamically from datetime import datetime self.current_year = str(datetime.now().year) self.previous_year = str(datetime.now().year - 1) # Data context for system prompt self.data_context = self._load_data_context() # Build the LangGraph self.graph = self._build_graph() # Conversation logging self.conversations_dir = Path("conversations") self.conversations_dir.mkdir(exist_ok=True) def _load_data_context(self) -> str: """Load and analyze data context for system prompt""" try: # Try to load from generated context file context_file = Path("data_context.md") if context_file.exists(): with open(context_file) as f: return f.read() # Fallback to basic analysis reports_dir = Path("reports") testset_dir = Path("outputs/datasets/testset") context_parts = [] # Report analysis if reports_dir.exists(): report_folders = [d for d in reports_dir.iterdir() if d.is_dir()] context_parts.append(f"๐Ÿ“Š Available Reports: {len(report_folders)} audit report folders") # Get year range years = [] for folder in report_folders: if "2018" in folder.name: years.append("2018") elif "2019" in folder.name: years.append("2019") elif "2020" in folder.name: years.append("2020") elif "2021" in folder.name: years.append("2021") elif "2022" in folder.name: years.append("2022") elif "2023" in folder.name: years.append("2023") if years: context_parts.append(f"๐Ÿ“… Years covered: {', '.join(sorted(set(years)))}") # Test dataset analysis if testset_dir.exists(): test_files = list(testset_dir.glob("*.json")) context_parts.append(f"๐Ÿงช Test dataset: {len(test_files)} files with sample questions") return "\n".join(context_parts) if context_parts else "๐Ÿ“Š Audit report database with comprehensive coverage" except Exception as e: self.logger.warning(f"โš ๏ธ Could not load data context: {e}") return "๐Ÿ“Š Comprehensive audit report database" def _build_graph(self) -> StateGraph: """Build the LangGraph for intelligent conversation flow""" # Define the graph workflow = StateGraph(ConversationState) # Add nodes workflow.add_node("analyze_query", self._analyze_query) workflow.add_node("decide_action", self._decide_action) workflow.add_node("perform_rag", self._perform_rag) workflow.add_node("ask_follow_up", self._ask_follow_up) workflow.add_node("generate_response", self._generate_response) # Add edges workflow.add_edge("analyze_query", "decide_action") # Conditional edges from decide_action workflow.add_conditional_edges( "decide_action", self._should_perform_rag, { "rag": "perform_rag", "follow_up": "ask_follow_up" } ) # From perform_rag, go to generate_response workflow.add_edge("perform_rag", "generate_response") # From ask_follow_up, end workflow.add_edge("ask_follow_up", END) # From generate_response, end workflow.add_edge("generate_response", END) # Set entry point workflow.set_entry_point("analyze_query") return workflow.compile() def _extract_districts_list(self, text: str) -> List[str]: """Extract one or more districts from free text using whitelist matching. - Case-insensitive substring match for each known district name - Handles multi-district inputs like "Lwengo Kiboga District & Namutumba" """ if not text: return [] q = text.lower() found: List[str] = [] for name in self.district_whitelist: n = name.lower() if n in q: # Map Kampala -> KCCA canonical canonical = 'KCCA' if name.lower() == 'kampala' else name if canonical not in found: found.append(canonical) return found def _extract_years_list(self, text: str) -> List[str]: """Extract year list from text, supporting forms like '2022 / 23', '2022-2023', '2022โ€“23'.""" if not text: return [] years: List[str] = [] q = text # Full 4-digit years for y in re.findall(r"\b(20\d{2})\b", q): if y not in years: years.append(y) # Shorthand like 2022/23 or 2022-23 for m in re.finditer(r"\b(20\d{2})\s*[\-/โ€“]\s*(\d{2})\b", q): y1 = m.group(1) y2_short = int(m.group(2)) y2 = f"20{y2_short:02d}" for y in [y1, y2]: if y not in years: years.append(y) return years def _analyze_query(self, state: ConversationState) -> ConversationState: """Analyze the user query with conversation context""" query = state["current_query"] conversation_context = state.get("conversation_context", {}) self.logger.info(f"๐Ÿง  QUERY ANALYSIS: Starting analysis for: '{query[:50]}...'") # Build conversation context for analysis context_info = "" if conversation_context: context_info = f"\n\nConversation context:\n" for key, value in conversation_context.items(): if value: context_info += f"- {key}: {value}\n" # Also include recent conversation messages for better context recent_messages = state.get("messages", []) if recent_messages and len(recent_messages) > 1: context_info += f"\nRecent conversation:\n" # Get last 3 messages for context for msg in recent_messages[-3:]: if hasattr(msg, 'content'): role = "User" if isinstance(msg, HumanMessage) else "Assistant" context_info += f"- {role}: {msg.content[:100]}...\n" # Create analysis prompt with data context analysis_prompt = ChatPromptTemplate.from_messages([ SystemMessage(content=f"""You are an expert at analyzing audit report queries. Your job is to extract specific information and determine if a query can be answered directly. {self.data_context} DISTRICT RECOGNITION RULES: - Kampala = KCCA (Kampala Capital City Authority) - Available districts: {', '.join(self.district_whitelist[:15])}... (and {len(self.district_whitelist)-15} more) - DLG = District Local Government - Uganda has {len(self.district_whitelist)} districts - recognize common ones SOURCE RECOGNITION RULES: - KCCA = Kampala Capital City Authority - MAAIF = Ministry of Agriculture, Animal Industry and Fisheries - MWTS = Ministry of Works and Transport - OAG = Office of the Auditor General - Consolidated = Annual Consolidated reports YEAR RECOGNITION RULES: - Available years: {', '.join(self.year_whitelist)} - Current year is {self.current_year} - use this to reason about relative years - If user mentions "last year", "previous year" - infer {self.previous_year} - If user mentions "this year", "current year" - infer {self.current_year} Analysis rules: 1. Be SMART - if you have enough context to search, do it 2. Use conversation context to fill in missing information 3. For budget/expenditure queries, try to infer missing details from context 4. Current year is {self.current_year} - use this to reason about relative years 5. If user mentions "last year", "previous year" - infer {self.previous_year} 6. If user mentions "this year", "current year" - infer {self.current_year} 7. If user mentions a department/ministry, infer the source 8. If user is getting frustrated or asking for results, proceed with RAG even if not perfect 9. Recognize Kampala as a district (KCCA) IMPORTANT: You must respond with ONLY valid JSON. No additional text. Return your analysis as JSON with these exact fields: {{ "has_district": boolean, "has_source": boolean, "has_year": boolean, "extracted_district": "string or null", "extracted_source": "string or null", "extracted_year": "string or null", "confidence_score": 0.0-1.0, "can_answer_directly": boolean, "missing_filters": ["list", "of", "missing", "filters"], "suggested_follow_up": "string or null", "expanded_query": "string or null" }} The expanded_query should be a natural language query that combines the original question with any inferred context for better RAG retrieval."""), HumanMessage(content=f"Analyze this query: '{query}'{context_info}") ]) # Get analysis from LLM response = self.llm.invoke(analysis_prompt.format_messages()) try: # Clean the response content to extract JSON content = response.content.strip() # Try to find JSON in the response if content.startswith('{') and content.endswith('}'): json_content = content else: # Try to extract JSON from the response import re json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: json_content = json_match.group() else: raise json.JSONDecodeError("No JSON found in response", content, 0) # Parse JSON response analysis_data = json.loads(json_content) query_analysis = QueryAnalysis( has_district=analysis_data.get("has_district", False), has_source=analysis_data.get("has_source", False), has_year=analysis_data.get("has_year", False), extracted_district=analysis_data.get("extracted_district"), extracted_source=analysis_data.get("extracted_source"), extracted_year=analysis_data.get("extracted_year"), confidence_score=analysis_data.get("confidence_score", 0.0), can_answer_directly=analysis_data.get("can_answer_directly", False), missing_filters=analysis_data.get("missing_filters", []), suggested_follow_up=analysis_data.get("suggested_follow_up"), expanded_query=analysis_data.get("expanded_query") ) except (json.JSONDecodeError, KeyError, AttributeError) as e: self.logger.info(f"โš ๏ธ JSON parsing failed: {e}") # Fallback analysis - be more permissive query_lower = query.lower() # Simple keyword matching - improved district recognition has_district = any(district.lower() in query_lower for district in [ 'gulu', 'kalangala', 'kampala', 'namutumba', 'lwengo', 'kiboga', 'kcca', 'maaif', 'mwts' ]) # Special case: Kampala = KCCA if 'kampala' in query_lower and not has_district: has_district = True has_source = any(source.lower() in query_lower for source in [ 'kcca', 'maaif', 'mwts', 'gulu', 'kalangala', 'consolidated', 'oag', 'government' ]) # Check for year mentions using dynamic year list has_year = any(year in query_lower for year in self.year_whitelist) # Also check for explicit relative year terms has_year = has_year or any(term in query_lower for term in [ 'this year', 'last year', 'previous year', 'current year' ]) # Extract specific values extracted_district = None extracted_source = None extracted_year = None # Extract districts using comprehensive whitelist for district_name in self.district_whitelist: if district_name.lower() in query_lower: extracted_district = district_name break # Also check common aliases district_aliases = { 'kampala': 'Kampala', 'kcca': 'Kampala', 'gulu': 'Gulu', 'kalangala': 'Kalangala' } for alias, full_name in district_aliases.items(): if alias in query_lower and not extracted_district: extracted_district = full_name break for source in ['kcca', 'maaif', 'mwts', 'consolidated', 'oag']: if source in query_lower: extracted_source = source.upper() break # Extract year using dynamic year list for year in self.year_whitelist: if year in query_lower: extracted_year = year has_year = True break # Only handle relative year terms if explicitly mentioned if not extracted_year: if 'last year' in query_lower or 'previous year' in query_lower: extracted_year = self.previous_year has_year = True elif 'this year' in query_lower or 'current year' in query_lower: extracted_year = self.current_year has_year = True elif 'recent' in query_lower and 'year' in query_lower: # Use the most recent year from available data extracted_year = max(self.year_whitelist) if self.year_whitelist else self.previous_year has_year = True # Be more permissive - if we have some context, try to answer missing_filters = [] if not has_district: missing_filters.append("district") if not has_source: missing_filters.append("source") if not has_year: missing_filters.append("year") # If user seems frustrated or asking for results, be more permissive frustration_indicators = ['already', 'just said', 'specified', 'provided', 'crazy', 'answer'] is_frustrated = any(indicator in query_lower for indicator in frustration_indicators) can_answer_directly = len(missing_filters) <= 1 or is_frustrated # More permissive confidence_score = 0.8 if can_answer_directly else 0.3 # Generate follow-up suggestion if missing_filters and not is_frustrated: if "district" in missing_filters and "source" in missing_filters: suggested_follow_up = "I'd be happy to help you with that information! Could you please specify which district and department/ministry you're asking about?" elif "district" in missing_filters: suggested_follow_up = "Thanks for your question! Could you please specify which district you're asking about?" elif "source" in missing_filters: suggested_follow_up = "I can help you with that! Could you please specify which department or ministry you're asking about?" elif "year" in missing_filters: suggested_follow_up = "Great question! Could you please specify which year you're interested in?" else: suggested_follow_up = "Could you please provide more specific details to help me give you a precise answer?" else: suggested_follow_up = None # Create expanded query expanded_query = query if extracted_district: expanded_query += f" for {extracted_district} district" if extracted_source: expanded_query += f" from {extracted_source}" if extracted_year: expanded_query += f" in {extracted_year}" query_analysis = QueryAnalysis( has_district=has_district, has_source=has_source, has_year=has_year, extracted_district=extracted_district, extracted_source=extracted_source, extracted_year=extracted_year, confidence_score=confidence_score, can_answer_directly=can_answer_directly, missing_filters=missing_filters, suggested_follow_up=suggested_follow_up, expanded_query=expanded_query ) # Update conversation context if query_analysis.extracted_district: conversation_context["district"] = query_analysis.extracted_district if query_analysis.extracted_source: conversation_context["source"] = query_analysis.extracted_source if query_analysis.extracted_year: conversation_context["year"] = query_analysis.extracted_year state["query_analysis"] = query_analysis state["conversation_context"] = conversation_context self.logger.info(f"โœ… ANALYSIS COMPLETE: district={query_analysis.has_district}, source={query_analysis.has_source}, year={query_analysis.has_year}") self.logger.info(f"๐Ÿ“ˆ Confidence: {query_analysis.confidence_score:.2f}, Can answer directly: {query_analysis.can_answer_directly}") if query_analysis.expanded_query: self.logger.info(f"๐Ÿ”„ Expanded query: {query_analysis.expanded_query}") return state def _decide_action(self, state: ConversationState) -> ConversationState: """Decide what action to take based on query analysis""" analysis = state["query_analysis"] # Add decision reasoning if analysis.can_answer_directly and analysis.confidence_score > 0.7: self.logger.info(f"๐Ÿš€ DECISION: Query is complete, proceeding with RAG") self.logger.info(f"๐Ÿ“Š REASONING: Confidence={analysis.confidence_score:.2f}, Missing filters={len(analysis.missing_filters or [])}") if analysis.missing_filters: self.logger.info(f"๐Ÿ“‹ Missing: {', '.join(analysis.missing_filters)}") else: self.logger.info(f"โœ… All required information available") else: self.logger.info(f"โ“ DECISION: Query incomplete, asking follow-up") self.logger.info(f"๐Ÿ“Š REASONING: Confidence={analysis.confidence_score:.2f}, Missing filters={len(analysis.missing_filters or [])}") if analysis.missing_filters: self.logger.info(f"๐Ÿ“‹ Missing: {', '.join(analysis.missing_filters)}") self.logger.info(f"๐Ÿ’ก Follow-up needed: {analysis.suggested_follow_up}") return state def _should_perform_rag(self, state: ConversationState) -> str: """Determine whether to perform RAG or ask follow-up""" analysis = state["query_analysis"] conversation_context = state.get("conversation_context", {}) recent_messages = state.get("messages", []) # Check if we have enough context from conversation history has_district_context = analysis.has_district or conversation_context.get("district") has_source_context = analysis.has_source or conversation_context.get("source") has_year_context = analysis.has_year or conversation_context.get("year") # Count how many context pieces we have context_count = sum([bool(has_district_context), bool(has_source_context), bool(has_year_context)]) # For PDM queries, we need more specific information current_query = state["current_query"].lower() recent_messages = state.get("messages", []) # Check if this is a PDM query by looking at current query OR recent conversation is_pdm_query = "pdm" in current_query or "parish development" in current_query # Also check recent messages for PDM context if not is_pdm_query and recent_messages: for msg in recent_messages[-3:]: # Check last 3 messages if isinstance(msg, HumanMessage) and ("pdm" in msg.content.lower() or "parish development" in msg.content.lower()): is_pdm_query = True break if is_pdm_query: # For PDM queries, we need district AND year to be specific enough # But we need them to be explicitly provided in the current conversation, not just inferred if has_district_context and has_year_context: # Check if both district and year are explicitly mentioned in recent messages explicit_district = False explicit_year = False for msg in recent_messages[-3:]: # Check last 3 messages if isinstance(msg, HumanMessage): content = msg.content.lower() if any(district in content for district in ["gulu", "kalangala", "kampala", "namutumba"]): explicit_district = True if any(year in content for year in ["2022", "2023", "2022/23", "2023/24"]): explicit_year = True if explicit_district and explicit_year: self.logger.info(f"๐Ÿš€ DECISION: PDM query with explicit district and year, proceeding with RAG") self.logger.info(f"๐Ÿ“Š REASONING: PDM query - explicit_district={explicit_district}, explicit_year={explicit_year}") return "rag" else: self.logger.info(f"โ“ DECISION: PDM query needs explicit district and year, asking follow-up") self.logger.info(f"๐Ÿ“Š REASONING: PDM query - explicit_district={explicit_district}, explicit_year={explicit_year}") return "follow_up" else: self.logger.info(f"โ“ DECISION: PDM query needs more specific info, asking follow-up") self.logger.info(f"๐Ÿ“Š REASONING: PDM query - district={has_district_context}, year={has_year_context}") return "follow_up" # For general queries, be more conservative - need at least 2 pieces AND high confidence if context_count >= 2 and analysis.confidence_score > 0.8: self.logger.info(f"๐Ÿš€ DECISION: Sufficient context with high confidence, proceeding with RAG") self.logger.info(f"๐Ÿ“Š REASONING: Context pieces: district={has_district_context}, source={has_source_context}, year={has_year_context}, confidence={analysis.confidence_score}") return "rag" # If user seems frustrated (short responses like "no"), proceed with RAG if recent_messages and len(recent_messages) >= 3: # Need more messages to detect frustration last_user_message = None for msg in reversed(recent_messages): if isinstance(msg, HumanMessage): last_user_message = msg.content.lower().strip() break if last_user_message and len(last_user_message) < 10 and any(word in last_user_message for word in ["no", "yes", "ok", "sure"]): self.logger.info(f"๐Ÿš€ DECISION: User seems frustrated with short response, proceeding with RAG") return "rag" # Original logic for direct answers if analysis.can_answer_directly and analysis.confidence_score > 0.7: return "rag" else: return "follow_up" def _ask_follow_up(self, state: ConversationState) -> ConversationState: """Generate a follow-up question to clarify missing information""" analysis = state["query_analysis"] current_query = state["current_query"].lower() conversation_context = state.get("conversation_context", {}) # Check if this is a PDM query is_pdm_query = "pdm" in current_query or "parish development" in current_query if is_pdm_query: # Generate PDM-specific follow-up questions missing_info = [] if not analysis.has_district and not conversation_context.get("district"): missing_info.append("district (e.g., Gulu, Kalangala)") if not analysis.has_year and not conversation_context.get("year"): missing_info.append("year (e.g., 2022, 2023)") if missing_info: follow_up_message = f"For PDM administrative costs information, I need to know the {', '.join(missing_info)}. Could you please specify these details?" else: follow_up_message = "Could you please provide more specific details about the PDM administrative costs you're looking for?" else: # Use the original follow-up logic if analysis.suggested_follow_up: follow_up_message = analysis.suggested_follow_up else: follow_up_message = "Could you please provide more specific details to help me give you a precise answer?" state["final_response"] = follow_up_message state["last_ai_message_time"] = time.time() return state def _build_comprehensive_query(self, current_query: str, analysis, conversation_context: dict, recent_messages: list) -> str: """Build a better RAG query from conversation. - If latest message is a short modifier (e.g., "financial"), merge it into the last substantive question. - If latest message looks like filters (district/year), keep the last question unchanged. - Otherwise, use the current message. """ def is_interrogative(text: str) -> bool: t = text.lower().strip() return any(t.startswith(w) for w in ["what", "how", "why", "when", "where", "which", "who"]) or t.endswith("?") def is_filter_like(text: str) -> bool: t = text.lower() if "district" in t: return True if re.search(r"\b20\d{2}\b", t) or re.search(r"20\d{2}\s*[\-/โ€“]\s*\d{2}\b", t): return True if self._extract_districts_list(text): return True return False # Find last substantive user question last_question = None for msg in reversed(recent_messages[:-1] if recent_messages else []): if isinstance(msg, HumanMessage): if is_interrogative(msg.content) and len(msg.content.strip()) > 15: last_question = msg.content.strip() break cq = current_query.strip() words = cq.split() is_short_modifier = (not is_interrogative(cq)) and (len(words) <= 3) if is_filter_like(cq) and last_question: comprehensive_query = last_question elif is_short_modifier and last_question: modifier = cq if modifier.lower() in last_question.lower(): comprehensive_query = last_question else: if last_question.endswith('?'): comprehensive_query = last_question[:-1] + f" for {modifier}?" else: comprehensive_query = last_question + f" for {modifier}" else: comprehensive_query = current_query self.logger.info(f"๐Ÿ”„ COMPREHENSIVE QUERY: '{comprehensive_query}'") return comprehensive_query def _rewrite_query_with_llm(self, recent_messages: list, draft_query: str) -> str: """Use the LLM to rewrite a clean, focused RAG query from the conversation. Rules enforced in prompt: - Keep the user's main information need from the last substantive question - Integrate short modifiers (e.g., "financial") into that question when appropriate - Do NOT include filter text (years/districts/sources) in the query; those are handled separately - Return a single plain sentence only (no quotes, no markdown) """ try: # Build a compact conversation transcript (last 6 messages max) convo_lines = [] for msg in recent_messages[-6:]: if isinstance(msg, HumanMessage): convo_lines.append(f"User: {msg.content}") elif isinstance(msg, AIMessage): convo_lines.append(f"Assistant: {msg.content}") convo_text = "\n".join(convo_lines) """ "DECISION GUIDANCE:\n" "- If the latest user message looks like a modifier (e.g., 'financial'), merge it into the best prior question.\n" "- If the latest message provides filters (e.g., districts, years), DO NOT embed them; keep the base question.\n" "- If the latest message itself is a full, clear question, use it.\n" "- If the draft query is already good, you may refine its clarity but keep the same intent.\n\n" """ prompt = ChatPromptTemplate.from_messages([ SystemMessage(content=( "ROLE: Query Rewriter for a RAG system.\n\n" "PRIMARY OBJECTIVE:\n- Produce ONE retrieval-focused sentence that best represents the user's information need.\n" "- Maximize recall of relevant evidence; be specific but not overconstrained.\n\n" "INPUTS:\n- Conversation with User and Assistant turns (latest last).\n- A draft query (heuristic).\n\n" "OPERATING PRINCIPLES:\n" "1) Use the last substantive USER question as the backbone of intent.\n" "2) Merge helpful domain modifiers from any USER turns (financial, procurement, risk) when they sharpen focus; ignore if not helpful.\n" "3) Treat Assistant messages as guidance only; if the user later provided filters (years, districts, sources), DO NOT embed them in the query (filters are applied separately).\n" "4) Remove meta-verbs like 'summarize', 'list', 'explain', 'compare' from the query.\n" "5) Prefer content-bearing terms (topics, programs, outcomes) over task phrasing.\n" "6) If the latest user message is filters-only, keep the prior substantive question unchanged.\n" "7) If the draft query is already strong, refine wording for clarity but keep the same intent.\n\n" "EXAMPLES (multi-turn):\n" "A)\nUser: What are the top 5 priorities for improving audit procedures?\nAssistant: Could you specify the scope (e.g., financial, procurement)?\nUser: Financial\nโ†’ Output: Top priorities for improving financial audit procedures.\n\n" "B)\nUser: How were PDM administrative costs utilized and what was the impact of shortfalls?\nAssistant: Please specify district/year for precision.\nUser: Namutumba and Lwengo Districts (2022/23)\nโ†’ Output: How were PDM administrative costs utilized and what was the impact of shortfalls.\n(Exclude districts/years; they are filters.)\n\n" "C)\nUser: Summarize risk management issues in audit reports.\nโ†’ Output: Key risk management issues in audit reports.\n\n" "CONSTRAINTS:\n- Do NOT include filters (years, districts, sources, filenames).\n- Do NOT include quotes/markdown/bullets or multiple sentences.\n- Return exactly one plain sentence." )), HumanMessage(content=( f"Conversation (most recent last):\n{convo_text}\n\n" f"Draft query: {draft_query}\n\n" "Rewrite the single best retrieval query sentence now:" )), ]) # Add timeout for LLM call import signal def timeout_handler(signum, frame): raise TimeoutError("LLM rewrite timeout") # Set 10 second timeout signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(10) try: resp = self.llm.invoke(prompt.format_messages()) signal.alarm(0) # Cancel timeout rewritten = getattr(resp, 'content', '').strip() # Basic sanitization: keep it one line rewritten = rewritten.replace('\n', ' ').strip() if rewritten and len(rewritten) > 5: # Basic quality check self.logger.info(f"๐Ÿ› ๏ธ LLM REWRITER: '{rewritten}'") return rewritten else: self.logger.info(f"โš ๏ธ LLM rewrite too short/empty, using draft query") return draft_query except TimeoutError: signal.alarm(0) self.logger.info(f"โš ๏ธ LLM rewrite timeout after 10s, using draft query") return draft_query except Exception as e: signal.alarm(0) self.logger.info(f"โš ๏ธ LLM rewrite failed, using draft query. Error: {e}") return draft_query except Exception as e: self.logger.info(f"โš ๏ธ LLM rewrite setup failed, using draft query. Error: {e}") return draft_query def _perform_rag(self, state: ConversationState) -> ConversationState: """Perform RAG retrieval with smart query expansion""" query = state["current_query"] analysis = state["query_analysis"] conversation_context = state.get("conversation_context", {}) recent_messages = state.get("messages", []) # Build comprehensive query from conversation history draft_query = self._build_comprehensive_query(query, analysis, conversation_context, recent_messages) # Let LLM rewrite a clean, focused search query search_query = self._rewrite_query_with_llm(recent_messages, draft_query) self.logger.info(f"๐Ÿ” RAG RETRIEVAL: Starting for query: '{search_query[:50]}...'") self.logger.info(f"๐Ÿ“Š Analysis: district={analysis.has_district}, source={analysis.has_source}, year={analysis.has_year}") try: # Build filters from analysis and conversation context filters = {} # Use conversation context to fill in missing filters source = analysis.extracted_source or conversation_context.get("source") district = analysis.extracted_district or conversation_context.get("district") year = analysis.extracted_year or conversation_context.get("year") if source: filters["source"] = [source] # Qdrant expects lists self.logger.info(f"๐ŸŽฏ Filter: source={source}") if year: filters["year"] = [year] self.logger.info(f"๐ŸŽฏ Filter: year={year}") if district: # Map district to source if needed if district.upper() == "KAMPALA": filters["source"] = ["KCCA"] self.logger.info(f"๐ŸŽฏ Filter: district={district} -> source=KCCA") elif district.upper() in ["GULU", "KALANGALA"]: filters["source"] = [f"{district.upper()} DLG"] self.logger.info(f"๐ŸŽฏ Filter: district={district} -> source={district.upper()} DLG") # Run RAG pipeline with correct parameters result = self.pipeline_manager.run( query=search_query, # Use expanded query sources=filters.get("source") if filters.get("source") else None, auto_infer_filters=False, # Our agent already handled filter inference filters=filters if filters else None ) self.logger.info(f"โœ… RAG completed: Found {len(result.sources)} sources") self.logger.info(f"โฑ๏ธ Execution time: {result.execution_time:.2f}s") # Store RAG result in state state["rag_result"] = result state["rag_query"] = search_query except Exception as e: self.logger.info(f"โŒ RAG retrieval failed: {e}") state["rag_result"] = None return state def _generate_response(self, state: ConversationState) -> ConversationState: """Generate final response using RAG results""" rag_result = state["rag_result"] self.logger.info(f"๐Ÿ“ RESPONSE: Using RAG result ({len(rag_result.answer)} chars)") # Store the final response directly from RAG state["final_response"] = rag_result.answer state["last_ai_message_time"] = time.time() return state def chat(self, user_input: str, conversation_id: str = "default") -> str: """Main chat interface with conversation management""" self.logger.info(f"๐Ÿ’ฌ CHAT: Processing user input: '{user_input[:50]}...'") self.logger.info(f"๐Ÿ“Š Session: {conversation_id}") # Load conversation history conversation_file = self.conversations_dir / f"{conversation_id}.json" conversation = self._load_conversation(conversation_file) # Add user message to conversation conversation["messages"].append(HumanMessage(content=user_input)) self.logger.info(f"๐Ÿ”„ LANGGRAPH: Starting graph execution") # Prepare state for LangGraph with conversation context state = ConversationState( conversation_id=conversation_id, messages=conversation["messages"], current_query=user_input, query_analysis=None, conversation_context=conversation.get("context", {}), rag_result=None, final_response=None, session_start_time=conversation["session_start_time"], last_ai_message_time=conversation["last_ai_message_time"] ) # Run the graph final_state = self.graph.invoke(state) # Add the AI response to conversation if final_state["final_response"]: conversation["messages"].append(AIMessage(content=final_state["final_response"])) # Update conversation state conversation["last_ai_message_time"] = final_state["last_ai_message_time"] conversation["context"] = final_state["conversation_context"] # Save conversation self._save_conversation(conversation_file, conversation) self.logger.info(f"โœ… LANGGRAPH: Graph execution completed") self.logger.info(f"๐ŸŽฏ CHAT COMPLETE: Response ready") # Return both response and RAG result for UI return { 'response': final_state["final_response"] or "I apologize, but I couldn't process your request.", 'rag_result': final_state["rag_result"], 'actual_rag_query': final_state.get("rag_query", "") } def _load_conversation(self, conversation_file: Path) -> Dict[str, Any]: """Load conversation from file""" if conversation_file.exists(): try: with open(conversation_file) as f: data = json.load(f) # Convert message dicts back to LangChain messages messages = [] for msg_data in data.get("messages", []): if msg_data["type"] == "human": messages.append(HumanMessage(content=msg_data["content"])) elif msg_data["type"] == "ai": messages.append(AIMessage(content=msg_data["content"])) data["messages"] = messages return data except Exception as e: self.logger.info(f"โš ๏ธ Could not load conversation: {e}") # Return default conversation return { "messages": [], "session_start_time": time.time(), "last_ai_message_time": time.time(), "context": {} } def _save_conversation(self, conversation_file: Path, conversation: Dict[str, Any]): """Save conversation to file""" try: # Convert LangChain messages to serializable format messages_data = [] for msg in conversation["messages"]: if isinstance(msg, HumanMessage): messages_data.append({"type": "human", "content": msg.content}) elif isinstance(msg, AIMessage): messages_data.append({"type": "ai", "content": msg.content}) data = { "messages": messages_data, "session_start_time": conversation["session_start_time"], "last_ai_message_time": conversation["last_ai_message_time"], "context": conversation.get("context", {}), "last_updated": datetime.now().isoformat() } with open(conversation_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: self.logger.info(f"โš ๏ธ Could not save conversation: {e}") def get_chatbot(): """Get chatbot instance""" return IntelligentRAGChatbot() if __name__ == "__main__": # Test the chatbot chatbot = IntelligentRAGChatbot() # Test conversation test_queries = [ "How much was the budget allocation for government salary payroll management?", "Namutumba district in 2023", "KCCA" ] for query in test_queries: self.logger.info(f"\n{'='*50}") self.logger.info(f"User: {query}") response = chatbot.chat(query) self.logger.info(f"Bot: {response}")