Spaces:
Sleeping
Sleeping
| """ | |
| Intelligent RAG Chatbot with Smart Query Analysis and Conversation Management | |
| This chatbot provides intelligent conversation flow with: | |
| - Smart query analysis and expansion | |
| - Single LangSmith conversation traces | |
| - Local conversation logging | |
| - Context-aware RAG retrieval | |
| - Natural conversation without technical jargon | |
| """ | |
| import os | |
| import json | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any, Optional, TypedDict | |
| import re | |
| from langgraph.graph import StateGraph, END | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
| from src.pipeline import PipelineManager | |
| from src.config.loader import load_config | |
| from src.config.paths import PROJECT_DIR | |
| class QueryAnalysis: | |
| """Analysis result of a user query""" | |
| has_district: bool | |
| has_source: bool | |
| has_year: bool | |
| extracted_district: Optional[str] | |
| extracted_source: Optional[str] | |
| extracted_year: Optional[str] | |
| confidence_score: float | |
| can_answer_directly: bool | |
| missing_filters: List[str] | |
| suggested_follow_up: Optional[str] | |
| expanded_query: Optional[str] = None # Query expansion for better RAG | |
| class ConversationState(TypedDict): | |
| """State for the conversation flow""" | |
| conversation_id: str | |
| messages: List[Any] | |
| current_query: str | |
| query_analysis: Optional[QueryAnalysis] | |
| rag_query: Optional[str] | |
| rag_result: Optional[Any] | |
| final_response: Optional[str] | |
| conversation_context: Dict[str, Any] # Store conversation context | |
| session_start_time: float | |
| last_ai_message_time: float | |
| class IntelligentRAGChatbot: | |
| """Intelligent chatbot with smart query analysis and conversation management""" | |
| def __init__(self, suppress_logs=False): | |
| """Initialize the intelligent chatbot""" | |
| # Setup logger to avoid cluttering UI | |
| self.logger = logging.getLogger(__name__) | |
| if suppress_logs: | |
| self.logger.setLevel(logging.CRITICAL) # Suppress all logs | |
| else: | |
| self.logger.setLevel(logging.INFO) | |
| if not self.logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter('%(message)s') | |
| handler.setFormatter(formatter) | |
| self.logger.addHandler(handler) | |
| self.logger.info("π€ INITIALIZING: Intelligent RAG Chatbot") | |
| # Load configuration first | |
| self.config = load_config() | |
| # Use the same LLM configuration as the existing system | |
| from auditqa.llm.adapters import get_llm_client | |
| # Get LLM client using the same configuration | |
| reader_config = self.config.get("reader", {}) | |
| default_type = reader_config.get("default_type", "INF_PROVIDERS") | |
| # Convert to lowercase as that's how it's registered | |
| provider_name = default_type.lower() | |
| self.llm_adapter = get_llm_client(provider_name, self.config) | |
| # Create a simple wrapper for LangChain compatibility | |
| class LLMWrapper: | |
| def __init__(self, adapter): | |
| self.adapter = adapter | |
| def invoke(self, messages): | |
| # Convert LangChain messages to the format expected by the adapter | |
| if isinstance(messages, list): | |
| # Convert LangChain messages to dict format | |
| message_dicts = [] | |
| for msg in messages: | |
| if hasattr(msg, 'content'): | |
| role = "user" if isinstance(msg, HumanMessage) else "assistant" | |
| message_dicts.append({"role": role, "content": msg.content}) | |
| else: | |
| message_dicts.append({"role": "user", "content": str(msg)}) | |
| else: | |
| # Single message | |
| message_dicts = [{"role": "user", "content": str(messages)}] | |
| # Use the adapter to generate response | |
| llm_response = self.adapter.generate(message_dicts) | |
| # Return in LangChain format | |
| class MockResponse: | |
| def __init__(self, content): | |
| self.content = content | |
| return MockResponse(llm_response.content) | |
| self.llm = LLMWrapper(self.llm_adapter) | |
| # Initialize pipeline manager for RAG | |
| self.logger.info("π§ PIPELINE: Initializing PipelineManager...") | |
| self.pipeline_manager = PipelineManager(self.config) | |
| # Ensure vectorstore is connected | |
| self.logger.info("π VECTORSTORE: Connecting to Qdrant...") | |
| try: | |
| vectorstore = self.pipeline_manager.vectorstore_manager.connect_to_existing() | |
| self.logger.info("β VECTORSTORE: Connected successfully") | |
| except Exception as e: | |
| self.logger.error(f"β VECTORSTORE: Connection failed: {e}") | |
| # Fix LLM client to use the same provider as chatbot | |
| self.logger.info("π§ LLM: Fixing PipelineManager LLM client...") | |
| self.pipeline_manager.llm_client = self.llm_adapter | |
| self.logger.info("β LLM: PipelineManager now uses same LLM as chatbot") | |
| self.logger.info("β PIPELINE: PipelineManager initialized") | |
| # Available metadata for filtering | |
| self.available_metadata = { | |
| 'sources': [ | |
| 'KCCA', 'MAAIF', 'MWTS', 'Gulu DLG', 'Kalangala DLG', 'Namutumba DLG', | |
| 'Lwengo DLG', 'Kiboga DLG', 'Annual Consolidated OAG', 'Consolidated', | |
| 'Hospital', 'Local Government', 'Ministry, Department and Agency', | |
| 'Project', 'Thematic', 'Value for Money' | |
| ], | |
| 'years': ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025'], | |
| 'districts': [ | |
| 'Gulu', 'Kalangala', 'Kampala', 'Namutumba', 'Lwengo', 'Kiboga', | |
| 'Fort Portal', 'Arua', 'Kasese', 'Kabale', 'Masindi', 'Mbale', 'Jinja', 'Masaka', 'Mbarara', | |
| 'KCCA' | |
| ] | |
| } | |
| # Try to load district whitelist from filter_options.json | |
| try: | |
| fo = PROJECT_DIR / "src" / "config" / "filter_options.json" | |
| if fo.exists(): | |
| with open(fo) as f: | |
| data = json.load(f) | |
| if isinstance(data, dict) and data.get("districts"): | |
| self.district_whitelist = [d.strip() for d in data["districts"] if d] | |
| else: | |
| self.district_whitelist = self.available_metadata['districts'] | |
| else: | |
| self.district_whitelist = self.available_metadata['districts'] | |
| except Exception: | |
| self.district_whitelist = self.available_metadata['districts'] | |
| # Enrich whitelist from add_district_metadata.py if available (optional module) | |
| try: | |
| from add_district_metadata import DistrictMetadataProcessor | |
| proc = DistrictMetadataProcessor() | |
| names = set() | |
| for key, mapping in proc.district_mappings.items(): | |
| if getattr(mapping, 'is_district', True): | |
| names.add(mapping.name) | |
| if names: | |
| # Merge while preserving order: existing first, then new ones not present | |
| merged = list(self.district_whitelist) | |
| for n in sorted(names): | |
| if n not in merged: | |
| merged.append(n) | |
| self.district_whitelist = merged | |
| self.logger.info(f"π§ District whitelist enriched: {len(self.district_whitelist)} entries") | |
| except Exception as e: | |
| self.logger.info(f"βΉοΈ Could not enrich districts from add_district_metadata: {e}") | |
| # Get dynamic year list from filter_options.json | |
| try: | |
| fo = PROJECT_DIR / "src" / "config" / "filter_options.json" | |
| if fo.exists(): | |
| with open(fo) as f: | |
| data = json.load(f) | |
| if isinstance(data, dict) and data.get("years"): | |
| self.year_whitelist = [str(y).strip() for y in data["years"] if y] | |
| else: | |
| self.year_whitelist = self.available_metadata['years'] | |
| else: | |
| self.year_whitelist = self.available_metadata['years'] | |
| except Exception: | |
| self.year_whitelist = self.available_metadata['years'] | |
| # Calculate current year dynamically | |
| from datetime import datetime | |
| self.current_year = str(datetime.now().year) | |
| self.previous_year = str(datetime.now().year - 1) | |
| # Data context for system prompt | |
| self.data_context = self._load_data_context() | |
| # Build the LangGraph | |
| self.graph = self._build_graph() | |
| # Conversation logging | |
| self.conversations_dir = Path("conversations") | |
| self.conversations_dir.mkdir(exist_ok=True) | |
| def _load_data_context(self) -> str: | |
| """Load and analyze data context for system prompt""" | |
| try: | |
| # Try to load from generated context file | |
| context_file = Path("data_context.md") | |
| if context_file.exists(): | |
| with open(context_file) as f: | |
| return f.read() | |
| # Fallback to basic analysis | |
| reports_dir = Path("reports") | |
| testset_dir = Path("outputs/datasets/testset") | |
| context_parts = [] | |
| # Report analysis | |
| if reports_dir.exists(): | |
| report_folders = [d for d in reports_dir.iterdir() if d.is_dir()] | |
| context_parts.append(f"π Available Reports: {len(report_folders)} audit report folders") | |
| # Get year range | |
| years = [] | |
| for folder in report_folders: | |
| if "2018" in folder.name: | |
| years.append("2018") | |
| elif "2019" in folder.name: | |
| years.append("2019") | |
| elif "2020" in folder.name: | |
| years.append("2020") | |
| elif "2021" in folder.name: | |
| years.append("2021") | |
| elif "2022" in folder.name: | |
| years.append("2022") | |
| elif "2023" in folder.name: | |
| years.append("2023") | |
| if years: | |
| context_parts.append(f"π Years covered: {', '.join(sorted(set(years)))}") | |
| # Test dataset analysis | |
| if testset_dir.exists(): | |
| test_files = list(testset_dir.glob("*.json")) | |
| context_parts.append(f"π§ͺ Test dataset: {len(test_files)} files with sample questions") | |
| return "\n".join(context_parts) if context_parts else "π Audit report database with comprehensive coverage" | |
| except Exception as e: | |
| self.logger.warning(f"β οΈ Could not load data context: {e}") | |
| return "π Comprehensive audit report database" | |
| def _build_graph(self) -> StateGraph: | |
| """Build the LangGraph for intelligent conversation flow""" | |
| # Define the graph | |
| workflow = StateGraph(ConversationState) | |
| # Add nodes | |
| workflow.add_node("analyze_query", self._analyze_query) | |
| workflow.add_node("decide_action", self._decide_action) | |
| workflow.add_node("perform_rag", self._perform_rag) | |
| workflow.add_node("ask_follow_up", self._ask_follow_up) | |
| workflow.add_node("generate_response", self._generate_response) | |
| # Add edges | |
| workflow.add_edge("analyze_query", "decide_action") | |
| # Conditional edges from decide_action | |
| workflow.add_conditional_edges( | |
| "decide_action", | |
| self._should_perform_rag, | |
| { | |
| "rag": "perform_rag", | |
| "follow_up": "ask_follow_up" | |
| } | |
| ) | |
| # From perform_rag, go to generate_response | |
| workflow.add_edge("perform_rag", "generate_response") | |
| # From ask_follow_up, end | |
| workflow.add_edge("ask_follow_up", END) | |
| # From generate_response, end | |
| workflow.add_edge("generate_response", END) | |
| # Set entry point | |
| workflow.set_entry_point("analyze_query") | |
| return workflow.compile() | |
| def _extract_districts_list(self, text: str) -> List[str]: | |
| """Extract one or more districts from free text using whitelist matching. | |
| - Case-insensitive substring match for each known district name | |
| - Handles multi-district inputs like "Lwengo Kiboga District & Namutumba" | |
| """ | |
| if not text: | |
| return [] | |
| q = text.lower() | |
| found: List[str] = [] | |
| for name in self.district_whitelist: | |
| n = name.lower() | |
| if n in q: | |
| # Map Kampala -> KCCA canonical | |
| canonical = 'KCCA' if name.lower() == 'kampala' else name | |
| if canonical not in found: | |
| found.append(canonical) | |
| return found | |
| def _extract_years_list(self, text: str) -> List[str]: | |
| """Extract year list from text, supporting forms like '2022 / 23', '2022-2023', '2022β23'.""" | |
| if not text: | |
| return [] | |
| years: List[str] = [] | |
| q = text | |
| # Full 4-digit years | |
| for y in re.findall(r"\b(20\d{2})\b", q): | |
| if y not in years: | |
| years.append(y) | |
| # Shorthand like 2022/23 or 2022-23 | |
| for m in re.finditer(r"\b(20\d{2})\s*[\-/β]\s*(\d{2})\b", q): | |
| y1 = m.group(1) | |
| y2_short = int(m.group(2)) | |
| y2 = f"20{y2_short:02d}" | |
| for y in [y1, y2]: | |
| if y not in years: | |
| years.append(y) | |
| return years | |
| def _analyze_query(self, state: ConversationState) -> ConversationState: | |
| """Analyze the user query with conversation context""" | |
| query = state["current_query"] | |
| conversation_context = state.get("conversation_context", {}) | |
| self.logger.info(f"π§ QUERY ANALYSIS: Starting analysis for: '{query[:50]}...'") | |
| # Build conversation context for analysis | |
| context_info = "" | |
| if conversation_context: | |
| context_info = f"\n\nConversation context:\n" | |
| for key, value in conversation_context.items(): | |
| if value: | |
| context_info += f"- {key}: {value}\n" | |
| # Also include recent conversation messages for better context | |
| recent_messages = state.get("messages", []) | |
| if recent_messages and len(recent_messages) > 1: | |
| context_info += f"\nRecent conversation:\n" | |
| # Get last 3 messages for context | |
| for msg in recent_messages[-3:]: | |
| if hasattr(msg, 'content'): | |
| role = "User" if isinstance(msg, HumanMessage) else "Assistant" | |
| context_info += f"- {role}: {msg.content[:100]}...\n" | |
| # Create analysis prompt with data context | |
| analysis_prompt = ChatPromptTemplate.from_messages([ | |
| SystemMessage(content=f"""You are an expert at analyzing audit report queries. Your job is to extract specific information and determine if a query can be answered directly. | |
| {self.data_context} | |
| DISTRICT RECOGNITION RULES: | |
| - Kampala = KCCA (Kampala Capital City Authority) | |
| - Available districts: {', '.join(self.district_whitelist[:15])}... (and {len(self.district_whitelist)-15} more) | |
| - DLG = District Local Government | |
| - Uganda has {len(self.district_whitelist)} districts - recognize common ones | |
| SOURCE RECOGNITION RULES: | |
| - KCCA = Kampala Capital City Authority | |
| - MAAIF = Ministry of Agriculture, Animal Industry and Fisheries | |
| - MWTS = Ministry of Works and Transport | |
| - OAG = Office of the Auditor General | |
| - Consolidated = Annual Consolidated reports | |
| YEAR RECOGNITION RULES: | |
| - Available years: {', '.join(self.year_whitelist)} | |
| - Current year is {self.current_year} - use this to reason about relative years | |
| - If user mentions "last year", "previous year" - infer {self.previous_year} | |
| - If user mentions "this year", "current year" - infer {self.current_year} | |
| Analysis rules: | |
| 1. Be SMART - if you have enough context to search, do it | |
| 2. Use conversation context to fill in missing information | |
| 3. For budget/expenditure queries, try to infer missing details from context | |
| 4. Current year is {self.current_year} - use this to reason about relative years | |
| 5. If user mentions "last year", "previous year" - infer {self.previous_year} | |
| 6. If user mentions "this year", "current year" - infer {self.current_year} | |
| 7. If user mentions a department/ministry, infer the source | |
| 8. If user is getting frustrated or asking for results, proceed with RAG even if not perfect | |
| 9. Recognize Kampala as a district (KCCA) | |
| IMPORTANT: You must respond with ONLY valid JSON. No additional text. | |
| Return your analysis as JSON with these exact fields: | |
| {{ | |
| "has_district": boolean, | |
| "has_source": boolean, | |
| "has_year": boolean, | |
| "extracted_district": "string or null", | |
| "extracted_source": "string or null", | |
| "extracted_year": "string or null", | |
| "confidence_score": 0.0-1.0, | |
| "can_answer_directly": boolean, | |
| "missing_filters": ["list", "of", "missing", "filters"], | |
| "suggested_follow_up": "string or null", | |
| "expanded_query": "string or null" | |
| }} | |
| The expanded_query should be a natural language query that combines the original question with any inferred context for better RAG retrieval."""), | |
| HumanMessage(content=f"Analyze this query: '{query}'{context_info}") | |
| ]) | |
| # Get analysis from LLM | |
| response = self.llm.invoke(analysis_prompt.format_messages()) | |
| try: | |
| # Clean the response content to extract JSON | |
| content = response.content.strip() | |
| # Try to find JSON in the response | |
| if content.startswith('{') and content.endswith('}'): | |
| json_content = content | |
| else: | |
| # Try to extract JSON from the response | |
| import re | |
| json_match = re.search(r'\{.*\}', content, re.DOTALL) | |
| if json_match: | |
| json_content = json_match.group() | |
| else: | |
| raise json.JSONDecodeError("No JSON found in response", content, 0) | |
| # Parse JSON response | |
| analysis_data = json.loads(json_content) | |
| query_analysis = QueryAnalysis( | |
| has_district=analysis_data.get("has_district", False), | |
| has_source=analysis_data.get("has_source", False), | |
| has_year=analysis_data.get("has_year", False), | |
| extracted_district=analysis_data.get("extracted_district"), | |
| extracted_source=analysis_data.get("extracted_source"), | |
| extracted_year=analysis_data.get("extracted_year"), | |
| confidence_score=analysis_data.get("confidence_score", 0.0), | |
| can_answer_directly=analysis_data.get("can_answer_directly", False), | |
| missing_filters=analysis_data.get("missing_filters", []), | |
| suggested_follow_up=analysis_data.get("suggested_follow_up"), | |
| expanded_query=analysis_data.get("expanded_query") | |
| ) | |
| except (json.JSONDecodeError, KeyError, AttributeError) as e: | |
| self.logger.info(f"β οΈ JSON parsing failed: {e}") | |
| # Fallback analysis - be more permissive | |
| query_lower = query.lower() | |
| # Simple keyword matching - improved district recognition | |
| has_district = any(district.lower() in query_lower for district in [ | |
| 'gulu', 'kalangala', 'kampala', 'namutumba', 'lwengo', 'kiboga', 'kcca', 'maaif', 'mwts' | |
| ]) | |
| # Special case: Kampala = KCCA | |
| if 'kampala' in query_lower and not has_district: | |
| has_district = True | |
| has_source = any(source.lower() in query_lower for source in [ | |
| 'kcca', 'maaif', 'mwts', 'gulu', 'kalangala', 'consolidated', 'oag', 'government' | |
| ]) | |
| # Check for year mentions using dynamic year list | |
| has_year = any(year in query_lower for year in self.year_whitelist) | |
| # Also check for explicit relative year terms | |
| has_year = has_year or any(term in query_lower for term in [ | |
| 'this year', 'last year', 'previous year', 'current year' | |
| ]) | |
| # Extract specific values | |
| extracted_district = None | |
| extracted_source = None | |
| extracted_year = None | |
| # Extract districts using comprehensive whitelist | |
| for district_name in self.district_whitelist: | |
| if district_name.lower() in query_lower: | |
| extracted_district = district_name | |
| break | |
| # Also check common aliases | |
| district_aliases = { | |
| 'kampala': 'Kampala', | |
| 'kcca': 'Kampala', | |
| 'gulu': 'Gulu', | |
| 'kalangala': 'Kalangala' | |
| } | |
| for alias, full_name in district_aliases.items(): | |
| if alias in query_lower and not extracted_district: | |
| extracted_district = full_name | |
| break | |
| for source in ['kcca', 'maaif', 'mwts', 'consolidated', 'oag']: | |
| if source in query_lower: | |
| extracted_source = source.upper() | |
| break | |
| # Extract year using dynamic year list | |
| for year in self.year_whitelist: | |
| if year in query_lower: | |
| extracted_year = year | |
| has_year = True | |
| break | |
| # Only handle relative year terms if explicitly mentioned | |
| if not extracted_year: | |
| if 'last year' in query_lower or 'previous year' in query_lower: | |
| extracted_year = self.previous_year | |
| has_year = True | |
| elif 'this year' in query_lower or 'current year' in query_lower: | |
| extracted_year = self.current_year | |
| has_year = True | |
| elif 'recent' in query_lower and 'year' in query_lower: | |
| # Use the most recent year from available data | |
| extracted_year = max(self.year_whitelist) if self.year_whitelist else self.previous_year | |
| has_year = True | |
| # Be more permissive - if we have some context, try to answer | |
| missing_filters = [] | |
| if not has_district: | |
| missing_filters.append("district") | |
| if not has_source: | |
| missing_filters.append("source") | |
| if not has_year: | |
| missing_filters.append("year") | |
| # If user seems frustrated or asking for results, be more permissive | |
| frustration_indicators = ['already', 'just said', 'specified', 'provided', 'crazy', 'answer'] | |
| is_frustrated = any(indicator in query_lower for indicator in frustration_indicators) | |
| can_answer_directly = len(missing_filters) <= 1 or is_frustrated # More permissive | |
| confidence_score = 0.8 if can_answer_directly else 0.3 | |
| # Generate follow-up suggestion | |
| if missing_filters and not is_frustrated: | |
| if "district" in missing_filters and "source" in missing_filters: | |
| suggested_follow_up = "I'd be happy to help you with that information! Could you please specify which district and department/ministry you're asking about?" | |
| elif "district" in missing_filters: | |
| suggested_follow_up = "Thanks for your question! Could you please specify which district you're asking about?" | |
| elif "source" in missing_filters: | |
| suggested_follow_up = "I can help you with that! Could you please specify which department or ministry you're asking about?" | |
| elif "year" in missing_filters: | |
| suggested_follow_up = "Great question! Could you please specify which year you're interested in?" | |
| else: | |
| suggested_follow_up = "Could you please provide more specific details to help me give you a precise answer?" | |
| else: | |
| suggested_follow_up = None | |
| # Create expanded query | |
| expanded_query = query | |
| if extracted_district: | |
| expanded_query += f" for {extracted_district} district" | |
| if extracted_source: | |
| expanded_query += f" from {extracted_source}" | |
| if extracted_year: | |
| expanded_query += f" in {extracted_year}" | |
| query_analysis = QueryAnalysis( | |
| has_district=has_district, | |
| has_source=has_source, | |
| has_year=has_year, | |
| extracted_district=extracted_district, | |
| extracted_source=extracted_source, | |
| extracted_year=extracted_year, | |
| confidence_score=confidence_score, | |
| can_answer_directly=can_answer_directly, | |
| missing_filters=missing_filters, | |
| suggested_follow_up=suggested_follow_up, | |
| expanded_query=expanded_query | |
| ) | |
| # Update conversation context | |
| if query_analysis.extracted_district: | |
| conversation_context["district"] = query_analysis.extracted_district | |
| if query_analysis.extracted_source: | |
| conversation_context["source"] = query_analysis.extracted_source | |
| if query_analysis.extracted_year: | |
| conversation_context["year"] = query_analysis.extracted_year | |
| state["query_analysis"] = query_analysis | |
| state["conversation_context"] = conversation_context | |
| self.logger.info(f"β ANALYSIS COMPLETE: district={query_analysis.has_district}, source={query_analysis.has_source}, year={query_analysis.has_year}") | |
| self.logger.info(f"π Confidence: {query_analysis.confidence_score:.2f}, Can answer directly: {query_analysis.can_answer_directly}") | |
| if query_analysis.expanded_query: | |
| self.logger.info(f"π Expanded query: {query_analysis.expanded_query}") | |
| return state | |
| def _decide_action(self, state: ConversationState) -> ConversationState: | |
| """Decide what action to take based on query analysis""" | |
| analysis = state["query_analysis"] | |
| # Add decision reasoning | |
| if analysis.can_answer_directly and analysis.confidence_score > 0.7: | |
| self.logger.info(f"π DECISION: Query is complete, proceeding with RAG") | |
| self.logger.info(f"π REASONING: Confidence={analysis.confidence_score:.2f}, Missing filters={len(analysis.missing_filters or [])}") | |
| if analysis.missing_filters: | |
| self.logger.info(f"π Missing: {', '.join(analysis.missing_filters)}") | |
| else: | |
| self.logger.info(f"β All required information available") | |
| else: | |
| self.logger.info(f"β DECISION: Query incomplete, asking follow-up") | |
| self.logger.info(f"π REASONING: Confidence={analysis.confidence_score:.2f}, Missing filters={len(analysis.missing_filters or [])}") | |
| if analysis.missing_filters: | |
| self.logger.info(f"π Missing: {', '.join(analysis.missing_filters)}") | |
| self.logger.info(f"π‘ Follow-up needed: {analysis.suggested_follow_up}") | |
| return state | |
| def _should_perform_rag(self, state: ConversationState) -> str: | |
| """Determine whether to perform RAG or ask follow-up""" | |
| analysis = state["query_analysis"] | |
| conversation_context = state.get("conversation_context", {}) | |
| recent_messages = state.get("messages", []) | |
| # Check if we have enough context from conversation history | |
| has_district_context = analysis.has_district or conversation_context.get("district") | |
| has_source_context = analysis.has_source or conversation_context.get("source") | |
| has_year_context = analysis.has_year or conversation_context.get("year") | |
| # Count how many context pieces we have | |
| context_count = sum([bool(has_district_context), bool(has_source_context), bool(has_year_context)]) | |
| # For PDM queries, we need more specific information | |
| current_query = state["current_query"].lower() | |
| recent_messages = state.get("messages", []) | |
| # Check if this is a PDM query by looking at current query OR recent conversation | |
| is_pdm_query = "pdm" in current_query or "parish development" in current_query | |
| # Also check recent messages for PDM context | |
| if not is_pdm_query and recent_messages: | |
| for msg in recent_messages[-3:]: # Check last 3 messages | |
| if isinstance(msg, HumanMessage) and ("pdm" in msg.content.lower() or "parish development" in msg.content.lower()): | |
| is_pdm_query = True | |
| break | |
| if is_pdm_query: | |
| # For PDM queries, we need district AND year to be specific enough | |
| # But we need them to be explicitly provided in the current conversation, not just inferred | |
| if has_district_context and has_year_context: | |
| # Check if both district and year are explicitly mentioned in recent messages | |
| explicit_district = False | |
| explicit_year = False | |
| for msg in recent_messages[-3:]: # Check last 3 messages | |
| if isinstance(msg, HumanMessage): | |
| content = msg.content.lower() | |
| if any(district in content for district in ["gulu", "kalangala", "kampala", "namutumba"]): | |
| explicit_district = True | |
| if any(year in content for year in ["2022", "2023", "2022/23", "2023/24"]): | |
| explicit_year = True | |
| if explicit_district and explicit_year: | |
| self.logger.info(f"π DECISION: PDM query with explicit district and year, proceeding with RAG") | |
| self.logger.info(f"π REASONING: PDM query - explicit_district={explicit_district}, explicit_year={explicit_year}") | |
| return "rag" | |
| else: | |
| self.logger.info(f"β DECISION: PDM query needs explicit district and year, asking follow-up") | |
| self.logger.info(f"π REASONING: PDM query - explicit_district={explicit_district}, explicit_year={explicit_year}") | |
| return "follow_up" | |
| else: | |
| self.logger.info(f"β DECISION: PDM query needs more specific info, asking follow-up") | |
| self.logger.info(f"π REASONING: PDM query - district={has_district_context}, year={has_year_context}") | |
| return "follow_up" | |
| # For general queries, be more conservative - need at least 2 pieces AND high confidence | |
| if context_count >= 2 and analysis.confidence_score > 0.8: | |
| self.logger.info(f"π DECISION: Sufficient context with high confidence, proceeding with RAG") | |
| self.logger.info(f"π REASONING: Context pieces: district={has_district_context}, source={has_source_context}, year={has_year_context}, confidence={analysis.confidence_score}") | |
| return "rag" | |
| # If user seems frustrated (short responses like "no"), proceed with RAG | |
| if recent_messages and len(recent_messages) >= 3: # Need more messages to detect frustration | |
| last_user_message = None | |
| for msg in reversed(recent_messages): | |
| if isinstance(msg, HumanMessage): | |
| last_user_message = msg.content.lower().strip() | |
| break | |
| if last_user_message and len(last_user_message) < 10 and any(word in last_user_message for word in ["no", "yes", "ok", "sure"]): | |
| self.logger.info(f"π DECISION: User seems frustrated with short response, proceeding with RAG") | |
| return "rag" | |
| # Original logic for direct answers | |
| if analysis.can_answer_directly and analysis.confidence_score > 0.7: | |
| return "rag" | |
| else: | |
| return "follow_up" | |
| def _ask_follow_up(self, state: ConversationState) -> ConversationState: | |
| """Generate a follow-up question to clarify missing information""" | |
| analysis = state["query_analysis"] | |
| current_query = state["current_query"].lower() | |
| conversation_context = state.get("conversation_context", {}) | |
| # Check if this is a PDM query | |
| is_pdm_query = "pdm" in current_query or "parish development" in current_query | |
| if is_pdm_query: | |
| # Generate PDM-specific follow-up questions | |
| missing_info = [] | |
| if not analysis.has_district and not conversation_context.get("district"): | |
| missing_info.append("district (e.g., Gulu, Kalangala)") | |
| if not analysis.has_year and not conversation_context.get("year"): | |
| missing_info.append("year (e.g., 2022, 2023)") | |
| if missing_info: | |
| follow_up_message = f"For PDM administrative costs information, I need to know the {', '.join(missing_info)}. Could you please specify these details?" | |
| else: | |
| follow_up_message = "Could you please provide more specific details about the PDM administrative costs you're looking for?" | |
| else: | |
| # Use the original follow-up logic | |
| if analysis.suggested_follow_up: | |
| follow_up_message = analysis.suggested_follow_up | |
| else: | |
| follow_up_message = "Could you please provide more specific details to help me give you a precise answer?" | |
| state["final_response"] = follow_up_message | |
| state["last_ai_message_time"] = time.time() | |
| return state | |
| def _build_comprehensive_query(self, current_query: str, analysis, conversation_context: dict, recent_messages: list) -> str: | |
| """Build a better RAG query from conversation. | |
| - If latest message is a short modifier (e.g., "financial"), merge it into the last substantive question. | |
| - If latest message looks like filters (district/year), keep the last question unchanged. | |
| - Otherwise, use the current message. | |
| """ | |
| def is_interrogative(text: str) -> bool: | |
| t = text.lower().strip() | |
| return any(t.startswith(w) for w in ["what", "how", "why", "when", "where", "which", "who"]) or t.endswith("?") | |
| def is_filter_like(text: str) -> bool: | |
| t = text.lower() | |
| if "district" in t: | |
| return True | |
| if re.search(r"\b20\d{2}\b", t) or re.search(r"20\d{2}\s*[\-/β]\s*\d{2}\b", t): | |
| return True | |
| if self._extract_districts_list(text): | |
| return True | |
| return False | |
| # Find last substantive user question | |
| last_question = None | |
| for msg in reversed(recent_messages[:-1] if recent_messages else []): | |
| if isinstance(msg, HumanMessage): | |
| if is_interrogative(msg.content) and len(msg.content.strip()) > 15: | |
| last_question = msg.content.strip() | |
| break | |
| cq = current_query.strip() | |
| words = cq.split() | |
| is_short_modifier = (not is_interrogative(cq)) and (len(words) <= 3) | |
| if is_filter_like(cq) and last_question: | |
| comprehensive_query = last_question | |
| elif is_short_modifier and last_question: | |
| modifier = cq | |
| if modifier.lower() in last_question.lower(): | |
| comprehensive_query = last_question | |
| else: | |
| if last_question.endswith('?'): | |
| comprehensive_query = last_question[:-1] + f" for {modifier}?" | |
| else: | |
| comprehensive_query = last_question + f" for {modifier}" | |
| else: | |
| comprehensive_query = current_query | |
| self.logger.info(f"π COMPREHENSIVE QUERY: '{comprehensive_query}'") | |
| return comprehensive_query | |
| def _rewrite_query_with_llm(self, recent_messages: list, draft_query: str) -> str: | |
| """Use the LLM to rewrite a clean, focused RAG query from the conversation. | |
| Rules enforced in prompt: | |
| - Keep the user's main information need from the last substantive question | |
| - Integrate short modifiers (e.g., "financial") into that question when appropriate | |
| - Do NOT include filter text (years/districts/sources) in the query; those are handled separately | |
| - Return a single plain sentence only (no quotes, no markdown) | |
| """ | |
| try: | |
| # Build a compact conversation transcript (last 6 messages max) | |
| convo_lines = [] | |
| for msg in recent_messages[-6:]: | |
| if isinstance(msg, HumanMessage): | |
| convo_lines.append(f"User: {msg.content}") | |
| elif isinstance(msg, AIMessage): | |
| convo_lines.append(f"Assistant: {msg.content}") | |
| convo_text = "\n".join(convo_lines) | |
| """ | |
| "DECISION GUIDANCE:\n" | |
| "- If the latest user message looks like a modifier (e.g., 'financial'), merge it into the best prior question.\n" | |
| "- If the latest message provides filters (e.g., districts, years), DO NOT embed them; keep the base question.\n" | |
| "- If the latest message itself is a full, clear question, use it.\n" | |
| "- If the draft query is already good, you may refine its clarity but keep the same intent.\n\n" | |
| """ | |
| prompt = ChatPromptTemplate.from_messages([ | |
| SystemMessage(content=( | |
| "ROLE: Query Rewriter for a RAG system.\n\n" | |
| "PRIMARY OBJECTIVE:\n- Produce ONE retrieval-focused sentence that best represents the user's information need.\n" | |
| "- Maximize recall of relevant evidence; be specific but not overconstrained.\n\n" | |
| "INPUTS:\n- Conversation with User and Assistant turns (latest last).\n- A draft query (heuristic).\n\n" | |
| "OPERATING PRINCIPLES:\n" | |
| "1) Use the last substantive USER question as the backbone of intent.\n" | |
| "2) Merge helpful domain modifiers from any USER turns (financial, procurement, risk) when they sharpen focus; ignore if not helpful.\n" | |
| "3) Treat Assistant messages as guidance only; if the user later provided filters (years, districts, sources), DO NOT embed them in the query (filters are applied separately).\n" | |
| "4) Remove meta-verbs like 'summarize', 'list', 'explain', 'compare' from the query.\n" | |
| "5) Prefer content-bearing terms (topics, programs, outcomes) over task phrasing.\n" | |
| "6) If the latest user message is filters-only, keep the prior substantive question unchanged.\n" | |
| "7) If the draft query is already strong, refine wording for clarity but keep the same intent.\n\n" | |
| "EXAMPLES (multi-turn):\n" | |
| "A)\nUser: What are the top 5 priorities for improving audit procedures?\nAssistant: Could you specify the scope (e.g., financial, procurement)?\nUser: Financial\nβ Output: Top priorities for improving financial audit procedures.\n\n" | |
| "B)\nUser: How were PDM administrative costs utilized and what was the impact of shortfalls?\nAssistant: Please specify district/year for precision.\nUser: Namutumba and Lwengo Districts (2022/23)\nβ Output: How were PDM administrative costs utilized and what was the impact of shortfalls.\n(Exclude districts/years; they are filters.)\n\n" | |
| "C)\nUser: Summarize risk management issues in audit reports.\nβ Output: Key risk management issues in audit reports.\n\n" | |
| "CONSTRAINTS:\n- Do NOT include filters (years, districts, sources, filenames).\n- Do NOT include quotes/markdown/bullets or multiple sentences.\n- Return exactly one plain sentence." | |
| )), | |
| HumanMessage(content=( | |
| f"Conversation (most recent last):\n{convo_text}\n\n" | |
| f"Draft query: {draft_query}\n\n" | |
| "Rewrite the single best retrieval query sentence now:" | |
| )), | |
| ]) | |
| # Add timeout for LLM call | |
| import signal | |
| def timeout_handler(signum, frame): | |
| raise TimeoutError("LLM rewrite timeout") | |
| # Set 10 second timeout | |
| signal.signal(signal.SIGALRM, timeout_handler) | |
| signal.alarm(10) | |
| try: | |
| resp = self.llm.invoke(prompt.format_messages()) | |
| signal.alarm(0) # Cancel timeout | |
| rewritten = getattr(resp, 'content', '').strip() | |
| # Basic sanitization: keep it one line | |
| rewritten = rewritten.replace('\n', ' ').strip() | |
| if rewritten and len(rewritten) > 5: # Basic quality check | |
| self.logger.info(f"π οΈ LLM REWRITER: '{rewritten}'") | |
| return rewritten | |
| else: | |
| self.logger.info(f"β οΈ LLM rewrite too short/empty, using draft query") | |
| return draft_query | |
| except TimeoutError: | |
| signal.alarm(0) | |
| self.logger.info(f"β οΈ LLM rewrite timeout after 10s, using draft query") | |
| return draft_query | |
| except Exception as e: | |
| signal.alarm(0) | |
| self.logger.info(f"β οΈ LLM rewrite failed, using draft query. Error: {e}") | |
| return draft_query | |
| except Exception as e: | |
| self.logger.info(f"β οΈ LLM rewrite setup failed, using draft query. Error: {e}") | |
| return draft_query | |
| def _perform_rag(self, state: ConversationState) -> ConversationState: | |
| """Perform RAG retrieval with smart query expansion""" | |
| query = state["current_query"] | |
| analysis = state["query_analysis"] | |
| conversation_context = state.get("conversation_context", {}) | |
| recent_messages = state.get("messages", []) | |
| # Build comprehensive query from conversation history | |
| draft_query = self._build_comprehensive_query(query, analysis, conversation_context, recent_messages) | |
| # Let LLM rewrite a clean, focused search query | |
| search_query = self._rewrite_query_with_llm(recent_messages, draft_query) | |
| self.logger.info(f"π RAG RETRIEVAL: Starting for query: '{search_query[:50]}...'") | |
| self.logger.info(f"π Analysis: district={analysis.has_district}, source={analysis.has_source}, year={analysis.has_year}") | |
| try: | |
| # Build filters from analysis and conversation context | |
| filters = {} | |
| # Use conversation context to fill in missing filters | |
| source = analysis.extracted_source or conversation_context.get("source") | |
| district = analysis.extracted_district or conversation_context.get("district") | |
| year = analysis.extracted_year or conversation_context.get("year") | |
| if source: | |
| filters["source"] = [source] # Qdrant expects lists | |
| self.logger.info(f"π― Filter: source={source}") | |
| if year: | |
| filters["year"] = [year] | |
| self.logger.info(f"π― Filter: year={year}") | |
| if district: | |
| # Map district to source if needed | |
| if district.upper() == "KAMPALA": | |
| filters["source"] = ["KCCA"] | |
| self.logger.info(f"π― Filter: district={district} -> source=KCCA") | |
| elif district.upper() in ["GULU", "KALANGALA"]: | |
| filters["source"] = [f"{district.upper()} DLG"] | |
| self.logger.info(f"π― Filter: district={district} -> source={district.upper()} DLG") | |
| # Run RAG pipeline with correct parameters | |
| result = self.pipeline_manager.run( | |
| query=search_query, # Use expanded query | |
| sources=filters.get("source") if filters.get("source") else None, | |
| auto_infer_filters=False, # Our agent already handled filter inference | |
| filters=filters if filters else None | |
| ) | |
| self.logger.info(f"β RAG completed: Found {len(result.sources)} sources") | |
| self.logger.info(f"β±οΈ Execution time: {result.execution_time:.2f}s") | |
| # Store RAG result in state | |
| state["rag_result"] = result | |
| state["rag_query"] = search_query | |
| except Exception as e: | |
| self.logger.info(f"β RAG retrieval failed: {e}") | |
| state["rag_result"] = None | |
| return state | |
| def _generate_response(self, state: ConversationState) -> ConversationState: | |
| """Generate final response using RAG results""" | |
| rag_result = state["rag_result"] | |
| self.logger.info(f"π RESPONSE: Using RAG result ({len(rag_result.answer)} chars)") | |
| # Store the final response directly from RAG | |
| state["final_response"] = rag_result.answer | |
| state["last_ai_message_time"] = time.time() | |
| return state | |
| def chat(self, user_input: str, conversation_id: str = "default") -> str: | |
| """Main chat interface with conversation management""" | |
| self.logger.info(f"π¬ CHAT: Processing user input: '{user_input[:50]}...'") | |
| self.logger.info(f"π Session: {conversation_id}") | |
| # Load conversation history | |
| conversation_file = self.conversations_dir / f"{conversation_id}.json" | |
| conversation = self._load_conversation(conversation_file) | |
| # Add user message to conversation | |
| conversation["messages"].append(HumanMessage(content=user_input)) | |
| self.logger.info(f"π LANGGRAPH: Starting graph execution") | |
| # Prepare state for LangGraph with conversation context | |
| state = ConversationState( | |
| conversation_id=conversation_id, | |
| messages=conversation["messages"], | |
| current_query=user_input, | |
| query_analysis=None, | |
| conversation_context=conversation.get("context", {}), | |
| rag_result=None, | |
| final_response=None, | |
| session_start_time=conversation["session_start_time"], | |
| last_ai_message_time=conversation["last_ai_message_time"] | |
| ) | |
| # Run the graph | |
| final_state = self.graph.invoke(state) | |
| # Add the AI response to conversation | |
| if final_state["final_response"]: | |
| conversation["messages"].append(AIMessage(content=final_state["final_response"])) | |
| # Update conversation state | |
| conversation["last_ai_message_time"] = final_state["last_ai_message_time"] | |
| conversation["context"] = final_state["conversation_context"] | |
| # Save conversation | |
| self._save_conversation(conversation_file, conversation) | |
| self.logger.info(f"β LANGGRAPH: Graph execution completed") | |
| self.logger.info(f"π― CHAT COMPLETE: Response ready") | |
| # Return both response and RAG result for UI | |
| return { | |
| 'response': final_state["final_response"] or "I apologize, but I couldn't process your request.", | |
| 'rag_result': final_state["rag_result"], | |
| 'actual_rag_query': final_state.get("rag_query", "") | |
| } | |
| def _load_conversation(self, conversation_file: Path) -> Dict[str, Any]: | |
| """Load conversation from file""" | |
| if conversation_file.exists(): | |
| try: | |
| with open(conversation_file) as f: | |
| data = json.load(f) | |
| # Convert message dicts back to LangChain messages | |
| messages = [] | |
| for msg_data in data.get("messages", []): | |
| if msg_data["type"] == "human": | |
| messages.append(HumanMessage(content=msg_data["content"])) | |
| elif msg_data["type"] == "ai": | |
| messages.append(AIMessage(content=msg_data["content"])) | |
| data["messages"] = messages | |
| return data | |
| except Exception as e: | |
| self.logger.info(f"β οΈ Could not load conversation: {e}") | |
| # Return default conversation | |
| return { | |
| "messages": [], | |
| "session_start_time": time.time(), | |
| "last_ai_message_time": time.time(), | |
| "context": {} | |
| } | |
| def _save_conversation(self, conversation_file: Path, conversation: Dict[str, Any]): | |
| """Save conversation to file""" | |
| try: | |
| # Convert LangChain messages to serializable format | |
| messages_data = [] | |
| for msg in conversation["messages"]: | |
| if isinstance(msg, HumanMessage): | |
| messages_data.append({"type": "human", "content": msg.content}) | |
| elif isinstance(msg, AIMessage): | |
| messages_data.append({"type": "ai", "content": msg.content}) | |
| data = { | |
| "messages": messages_data, | |
| "session_start_time": conversation["session_start_time"], | |
| "last_ai_message_time": conversation["last_ai_message_time"], | |
| "context": conversation.get("context", {}), | |
| "last_updated": datetime.now().isoformat() | |
| } | |
| with open(conversation_file, "w") as f: | |
| json.dump(data, f, indent=2) | |
| except Exception as e: | |
| self.logger.info(f"β οΈ Could not save conversation: {e}") | |
| def get_chatbot(): | |
| """Get chatbot instance""" | |
| return IntelligentRAGChatbot() | |
| if __name__ == "__main__": | |
| # Test the chatbot | |
| chatbot = IntelligentRAGChatbot() | |
| # Test conversation | |
| test_queries = [ | |
| "How much was the budget allocation for government salary payroll management?", | |
| "Namutumba district in 2023", | |
| "KCCA" | |
| ] | |
| for query in test_queries: | |
| self.logger.info(f"\n{'='*50}") | |
| self.logger.info(f"User: {query}") | |
| response = chatbot.chat(query) | |
| self.logger.info(f"Bot: {response}") |