audit_assistant / smart_chatbot.py
akryldigital's picture
demo version (#5)
40b7ffe verified
"""
Intelligent RAG Chatbot with Smart Query Analysis and Conversation Management
This chatbot provides intelligent conversation flow with:
- Smart query analysis and expansion
- Single LangSmith conversation traces
- Local conversation logging
- Context-aware RAG retrieval
- Natural conversation without technical jargon
"""
import os
import json
import time
import logging
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, TypedDict
import re
from langgraph.graph import StateGraph, END
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from src.pipeline import PipelineManager
from src.config.loader import load_config
from src.config.paths import PROJECT_DIR
@dataclass
class QueryAnalysis:
"""Analysis result of a user query"""
has_district: bool
has_source: bool
has_year: bool
extracted_district: Optional[str]
extracted_source: Optional[str]
extracted_year: Optional[str]
confidence_score: float
can_answer_directly: bool
missing_filters: List[str]
suggested_follow_up: Optional[str]
expanded_query: Optional[str] = None # Query expansion for better RAG
class ConversationState(TypedDict):
"""State for the conversation flow"""
conversation_id: str
messages: List[Any]
current_query: str
query_analysis: Optional[QueryAnalysis]
rag_query: Optional[str]
rag_result: Optional[Any]
final_response: Optional[str]
conversation_context: Dict[str, Any] # Store conversation context
session_start_time: float
last_ai_message_time: float
class IntelligentRAGChatbot:
"""Intelligent chatbot with smart query analysis and conversation management"""
def __init__(self, suppress_logs=False):
"""Initialize the intelligent chatbot"""
# Setup logger to avoid cluttering UI
self.logger = logging.getLogger(__name__)
if suppress_logs:
self.logger.setLevel(logging.CRITICAL) # Suppress all logs
else:
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.info("πŸ€– INITIALIZING: Intelligent RAG Chatbot")
# Load configuration first
self.config = load_config()
# Use the same LLM configuration as the existing system
from auditqa.llm.adapters import get_llm_client
# Get LLM client using the same configuration
reader_config = self.config.get("reader", {})
default_type = reader_config.get("default_type", "INF_PROVIDERS")
# Convert to lowercase as that's how it's registered
provider_name = default_type.lower()
self.llm_adapter = get_llm_client(provider_name, self.config)
# Create a simple wrapper for LangChain compatibility
class LLMWrapper:
def __init__(self, adapter):
self.adapter = adapter
def invoke(self, messages):
# Convert LangChain messages to the format expected by the adapter
if isinstance(messages, list):
# Convert LangChain messages to dict format
message_dicts = []
for msg in messages:
if hasattr(msg, 'content'):
role = "user" if isinstance(msg, HumanMessage) else "assistant"
message_dicts.append({"role": role, "content": msg.content})
else:
message_dicts.append({"role": "user", "content": str(msg)})
else:
# Single message
message_dicts = [{"role": "user", "content": str(messages)}]
# Use the adapter to generate response
llm_response = self.adapter.generate(message_dicts)
# Return in LangChain format
class MockResponse:
def __init__(self, content):
self.content = content
return MockResponse(llm_response.content)
self.llm = LLMWrapper(self.llm_adapter)
# Initialize pipeline manager for RAG
self.logger.info("πŸ”§ PIPELINE: Initializing PipelineManager...")
self.pipeline_manager = PipelineManager(self.config)
# Ensure vectorstore is connected
self.logger.info("πŸ”— VECTORSTORE: Connecting to Qdrant...")
try:
vectorstore = self.pipeline_manager.vectorstore_manager.connect_to_existing()
self.logger.info("βœ… VECTORSTORE: Connected successfully")
except Exception as e:
self.logger.error(f"❌ VECTORSTORE: Connection failed: {e}")
# Fix LLM client to use the same provider as chatbot
self.logger.info("πŸ”§ LLM: Fixing PipelineManager LLM client...")
self.pipeline_manager.llm_client = self.llm_adapter
self.logger.info("βœ… LLM: PipelineManager now uses same LLM as chatbot")
self.logger.info("βœ… PIPELINE: PipelineManager initialized")
# Available metadata for filtering
self.available_metadata = {
'sources': [
'KCCA', 'MAAIF', 'MWTS', 'Gulu DLG', 'Kalangala DLG', 'Namutumba DLG',
'Lwengo DLG', 'Kiboga DLG', 'Annual Consolidated OAG', 'Consolidated',
'Hospital', 'Local Government', 'Ministry, Department and Agency',
'Project', 'Thematic', 'Value for Money'
],
'years': ['2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025'],
'districts': [
'Gulu', 'Kalangala', 'Kampala', 'Namutumba', 'Lwengo', 'Kiboga',
'Fort Portal', 'Arua', 'Kasese', 'Kabale', 'Masindi', 'Mbale', 'Jinja', 'Masaka', 'Mbarara',
'KCCA'
]
}
# Try to load district whitelist from filter_options.json
try:
fo = PROJECT_DIR / "src" / "config" / "filter_options.json"
if fo.exists():
with open(fo) as f:
data = json.load(f)
if isinstance(data, dict) and data.get("districts"):
self.district_whitelist = [d.strip() for d in data["districts"] if d]
else:
self.district_whitelist = self.available_metadata['districts']
else:
self.district_whitelist = self.available_metadata['districts']
except Exception:
self.district_whitelist = self.available_metadata['districts']
# Enrich whitelist from add_district_metadata.py if available (optional module)
try:
from add_district_metadata import DistrictMetadataProcessor
proc = DistrictMetadataProcessor()
names = set()
for key, mapping in proc.district_mappings.items():
if getattr(mapping, 'is_district', True):
names.add(mapping.name)
if names:
# Merge while preserving order: existing first, then new ones not present
merged = list(self.district_whitelist)
for n in sorted(names):
if n not in merged:
merged.append(n)
self.district_whitelist = merged
self.logger.info(f"🧭 District whitelist enriched: {len(self.district_whitelist)} entries")
except Exception as e:
self.logger.info(f"ℹ️ Could not enrich districts from add_district_metadata: {e}")
# Get dynamic year list from filter_options.json
try:
fo = PROJECT_DIR / "src" / "config" / "filter_options.json"
if fo.exists():
with open(fo) as f:
data = json.load(f)
if isinstance(data, dict) and data.get("years"):
self.year_whitelist = [str(y).strip() for y in data["years"] if y]
else:
self.year_whitelist = self.available_metadata['years']
else:
self.year_whitelist = self.available_metadata['years']
except Exception:
self.year_whitelist = self.available_metadata['years']
# Calculate current year dynamically
from datetime import datetime
self.current_year = str(datetime.now().year)
self.previous_year = str(datetime.now().year - 1)
# Data context for system prompt
self.data_context = self._load_data_context()
# Build the LangGraph
self.graph = self._build_graph()
# Conversation logging
self.conversations_dir = Path("conversations")
self.conversations_dir.mkdir(exist_ok=True)
def _load_data_context(self) -> str:
"""Load and analyze data context for system prompt"""
try:
# Try to load from generated context file
context_file = Path("data_context.md")
if context_file.exists():
with open(context_file) as f:
return f.read()
# Fallback to basic analysis
reports_dir = Path("reports")
testset_dir = Path("outputs/datasets/testset")
context_parts = []
# Report analysis
if reports_dir.exists():
report_folders = [d for d in reports_dir.iterdir() if d.is_dir()]
context_parts.append(f"πŸ“Š Available Reports: {len(report_folders)} audit report folders")
# Get year range
years = []
for folder in report_folders:
if "2018" in folder.name:
years.append("2018")
elif "2019" in folder.name:
years.append("2019")
elif "2020" in folder.name:
years.append("2020")
elif "2021" in folder.name:
years.append("2021")
elif "2022" in folder.name:
years.append("2022")
elif "2023" in folder.name:
years.append("2023")
if years:
context_parts.append(f"πŸ“… Years covered: {', '.join(sorted(set(years)))}")
# Test dataset analysis
if testset_dir.exists():
test_files = list(testset_dir.glob("*.json"))
context_parts.append(f"πŸ§ͺ Test dataset: {len(test_files)} files with sample questions")
return "\n".join(context_parts) if context_parts else "πŸ“Š Audit report database with comprehensive coverage"
except Exception as e:
self.logger.warning(f"⚠️ Could not load data context: {e}")
return "πŸ“Š Comprehensive audit report database"
def _build_graph(self) -> StateGraph:
"""Build the LangGraph for intelligent conversation flow"""
# Define the graph
workflow = StateGraph(ConversationState)
# Add nodes
workflow.add_node("analyze_query", self._analyze_query)
workflow.add_node("decide_action", self._decide_action)
workflow.add_node("perform_rag", self._perform_rag)
workflow.add_node("ask_follow_up", self._ask_follow_up)
workflow.add_node("generate_response", self._generate_response)
# Add edges
workflow.add_edge("analyze_query", "decide_action")
# Conditional edges from decide_action
workflow.add_conditional_edges(
"decide_action",
self._should_perform_rag,
{
"rag": "perform_rag",
"follow_up": "ask_follow_up"
}
)
# From perform_rag, go to generate_response
workflow.add_edge("perform_rag", "generate_response")
# From ask_follow_up, end
workflow.add_edge("ask_follow_up", END)
# From generate_response, end
workflow.add_edge("generate_response", END)
# Set entry point
workflow.set_entry_point("analyze_query")
return workflow.compile()
def _extract_districts_list(self, text: str) -> List[str]:
"""Extract one or more districts from free text using whitelist matching.
- Case-insensitive substring match for each known district name
- Handles multi-district inputs like "Lwengo Kiboga District & Namutumba"
"""
if not text:
return []
q = text.lower()
found: List[str] = []
for name in self.district_whitelist:
n = name.lower()
if n in q:
# Map Kampala -> KCCA canonical
canonical = 'KCCA' if name.lower() == 'kampala' else name
if canonical not in found:
found.append(canonical)
return found
def _extract_years_list(self, text: str) -> List[str]:
"""Extract year list from text, supporting forms like '2022 / 23', '2022-2023', '2022–23'."""
if not text:
return []
years: List[str] = []
q = text
# Full 4-digit years
for y in re.findall(r"\b(20\d{2})\b", q):
if y not in years:
years.append(y)
# Shorthand like 2022/23 or 2022-23
for m in re.finditer(r"\b(20\d{2})\s*[\-/–]\s*(\d{2})\b", q):
y1 = m.group(1)
y2_short = int(m.group(2))
y2 = f"20{y2_short:02d}"
for y in [y1, y2]:
if y not in years:
years.append(y)
return years
def _analyze_query(self, state: ConversationState) -> ConversationState:
"""Analyze the user query with conversation context"""
query = state["current_query"]
conversation_context = state.get("conversation_context", {})
self.logger.info(f"🧠 QUERY ANALYSIS: Starting analysis for: '{query[:50]}...'")
# Build conversation context for analysis
context_info = ""
if conversation_context:
context_info = f"\n\nConversation context:\n"
for key, value in conversation_context.items():
if value:
context_info += f"- {key}: {value}\n"
# Also include recent conversation messages for better context
recent_messages = state.get("messages", [])
if recent_messages and len(recent_messages) > 1:
context_info += f"\nRecent conversation:\n"
# Get last 3 messages for context
for msg in recent_messages[-3:]:
if hasattr(msg, 'content'):
role = "User" if isinstance(msg, HumanMessage) else "Assistant"
context_info += f"- {role}: {msg.content[:100]}...\n"
# Create analysis prompt with data context
analysis_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=f"""You are an expert at analyzing audit report queries. Your job is to extract specific information and determine if a query can be answered directly.
{self.data_context}
DISTRICT RECOGNITION RULES:
- Kampala = KCCA (Kampala Capital City Authority)
- Available districts: {', '.join(self.district_whitelist[:15])}... (and {len(self.district_whitelist)-15} more)
- DLG = District Local Government
- Uganda has {len(self.district_whitelist)} districts - recognize common ones
SOURCE RECOGNITION RULES:
- KCCA = Kampala Capital City Authority
- MAAIF = Ministry of Agriculture, Animal Industry and Fisheries
- MWTS = Ministry of Works and Transport
- OAG = Office of the Auditor General
- Consolidated = Annual Consolidated reports
YEAR RECOGNITION RULES:
- Available years: {', '.join(self.year_whitelist)}
- Current year is {self.current_year} - use this to reason about relative years
- If user mentions "last year", "previous year" - infer {self.previous_year}
- If user mentions "this year", "current year" - infer {self.current_year}
Analysis rules:
1. Be SMART - if you have enough context to search, do it
2. Use conversation context to fill in missing information
3. For budget/expenditure queries, try to infer missing details from context
4. Current year is {self.current_year} - use this to reason about relative years
5. If user mentions "last year", "previous year" - infer {self.previous_year}
6. If user mentions "this year", "current year" - infer {self.current_year}
7. If user mentions a department/ministry, infer the source
8. If user is getting frustrated or asking for results, proceed with RAG even if not perfect
9. Recognize Kampala as a district (KCCA)
IMPORTANT: You must respond with ONLY valid JSON. No additional text.
Return your analysis as JSON with these exact fields:
{{
"has_district": boolean,
"has_source": boolean,
"has_year": boolean,
"extracted_district": "string or null",
"extracted_source": "string or null",
"extracted_year": "string or null",
"confidence_score": 0.0-1.0,
"can_answer_directly": boolean,
"missing_filters": ["list", "of", "missing", "filters"],
"suggested_follow_up": "string or null",
"expanded_query": "string or null"
}}
The expanded_query should be a natural language query that combines the original question with any inferred context for better RAG retrieval."""),
HumanMessage(content=f"Analyze this query: '{query}'{context_info}")
])
# Get analysis from LLM
response = self.llm.invoke(analysis_prompt.format_messages())
try:
# Clean the response content to extract JSON
content = response.content.strip()
# Try to find JSON in the response
if content.startswith('{') and content.endswith('}'):
json_content = content
else:
# Try to extract JSON from the response
import re
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
json_content = json_match.group()
else:
raise json.JSONDecodeError("No JSON found in response", content, 0)
# Parse JSON response
analysis_data = json.loads(json_content)
query_analysis = QueryAnalysis(
has_district=analysis_data.get("has_district", False),
has_source=analysis_data.get("has_source", False),
has_year=analysis_data.get("has_year", False),
extracted_district=analysis_data.get("extracted_district"),
extracted_source=analysis_data.get("extracted_source"),
extracted_year=analysis_data.get("extracted_year"),
confidence_score=analysis_data.get("confidence_score", 0.0),
can_answer_directly=analysis_data.get("can_answer_directly", False),
missing_filters=analysis_data.get("missing_filters", []),
suggested_follow_up=analysis_data.get("suggested_follow_up"),
expanded_query=analysis_data.get("expanded_query")
)
except (json.JSONDecodeError, KeyError, AttributeError) as e:
self.logger.info(f"⚠️ JSON parsing failed: {e}")
# Fallback analysis - be more permissive
query_lower = query.lower()
# Simple keyword matching - improved district recognition
has_district = any(district.lower() in query_lower for district in [
'gulu', 'kalangala', 'kampala', 'namutumba', 'lwengo', 'kiboga', 'kcca', 'maaif', 'mwts'
])
# Special case: Kampala = KCCA
if 'kampala' in query_lower and not has_district:
has_district = True
has_source = any(source.lower() in query_lower for source in [
'kcca', 'maaif', 'mwts', 'gulu', 'kalangala', 'consolidated', 'oag', 'government'
])
# Check for year mentions using dynamic year list
has_year = any(year in query_lower for year in self.year_whitelist)
# Also check for explicit relative year terms
has_year = has_year or any(term in query_lower for term in [
'this year', 'last year', 'previous year', 'current year'
])
# Extract specific values
extracted_district = None
extracted_source = None
extracted_year = None
# Extract districts using comprehensive whitelist
for district_name in self.district_whitelist:
if district_name.lower() in query_lower:
extracted_district = district_name
break
# Also check common aliases
district_aliases = {
'kampala': 'Kampala',
'kcca': 'Kampala',
'gulu': 'Gulu',
'kalangala': 'Kalangala'
}
for alias, full_name in district_aliases.items():
if alias in query_lower and not extracted_district:
extracted_district = full_name
break
for source in ['kcca', 'maaif', 'mwts', 'consolidated', 'oag']:
if source in query_lower:
extracted_source = source.upper()
break
# Extract year using dynamic year list
for year in self.year_whitelist:
if year in query_lower:
extracted_year = year
has_year = True
break
# Only handle relative year terms if explicitly mentioned
if not extracted_year:
if 'last year' in query_lower or 'previous year' in query_lower:
extracted_year = self.previous_year
has_year = True
elif 'this year' in query_lower or 'current year' in query_lower:
extracted_year = self.current_year
has_year = True
elif 'recent' in query_lower and 'year' in query_lower:
# Use the most recent year from available data
extracted_year = max(self.year_whitelist) if self.year_whitelist else self.previous_year
has_year = True
# Be more permissive - if we have some context, try to answer
missing_filters = []
if not has_district:
missing_filters.append("district")
if not has_source:
missing_filters.append("source")
if not has_year:
missing_filters.append("year")
# If user seems frustrated or asking for results, be more permissive
frustration_indicators = ['already', 'just said', 'specified', 'provided', 'crazy', 'answer']
is_frustrated = any(indicator in query_lower for indicator in frustration_indicators)
can_answer_directly = len(missing_filters) <= 1 or is_frustrated # More permissive
confidence_score = 0.8 if can_answer_directly else 0.3
# Generate follow-up suggestion
if missing_filters and not is_frustrated:
if "district" in missing_filters and "source" in missing_filters:
suggested_follow_up = "I'd be happy to help you with that information! Could you please specify which district and department/ministry you're asking about?"
elif "district" in missing_filters:
suggested_follow_up = "Thanks for your question! Could you please specify which district you're asking about?"
elif "source" in missing_filters:
suggested_follow_up = "I can help you with that! Could you please specify which department or ministry you're asking about?"
elif "year" in missing_filters:
suggested_follow_up = "Great question! Could you please specify which year you're interested in?"
else:
suggested_follow_up = "Could you please provide more specific details to help me give you a precise answer?"
else:
suggested_follow_up = None
# Create expanded query
expanded_query = query
if extracted_district:
expanded_query += f" for {extracted_district} district"
if extracted_source:
expanded_query += f" from {extracted_source}"
if extracted_year:
expanded_query += f" in {extracted_year}"
query_analysis = QueryAnalysis(
has_district=has_district,
has_source=has_source,
has_year=has_year,
extracted_district=extracted_district,
extracted_source=extracted_source,
extracted_year=extracted_year,
confidence_score=confidence_score,
can_answer_directly=can_answer_directly,
missing_filters=missing_filters,
suggested_follow_up=suggested_follow_up,
expanded_query=expanded_query
)
# Update conversation context
if query_analysis.extracted_district:
conversation_context["district"] = query_analysis.extracted_district
if query_analysis.extracted_source:
conversation_context["source"] = query_analysis.extracted_source
if query_analysis.extracted_year:
conversation_context["year"] = query_analysis.extracted_year
state["query_analysis"] = query_analysis
state["conversation_context"] = conversation_context
self.logger.info(f"βœ… ANALYSIS COMPLETE: district={query_analysis.has_district}, source={query_analysis.has_source}, year={query_analysis.has_year}")
self.logger.info(f"πŸ“ˆ Confidence: {query_analysis.confidence_score:.2f}, Can answer directly: {query_analysis.can_answer_directly}")
if query_analysis.expanded_query:
self.logger.info(f"πŸ”„ Expanded query: {query_analysis.expanded_query}")
return state
def _decide_action(self, state: ConversationState) -> ConversationState:
"""Decide what action to take based on query analysis"""
analysis = state["query_analysis"]
# Add decision reasoning
if analysis.can_answer_directly and analysis.confidence_score > 0.7:
self.logger.info(f"πŸš€ DECISION: Query is complete, proceeding with RAG")
self.logger.info(f"πŸ“Š REASONING: Confidence={analysis.confidence_score:.2f}, Missing filters={len(analysis.missing_filters or [])}")
if analysis.missing_filters:
self.logger.info(f"πŸ“‹ Missing: {', '.join(analysis.missing_filters)}")
else:
self.logger.info(f"βœ… All required information available")
else:
self.logger.info(f"❓ DECISION: Query incomplete, asking follow-up")
self.logger.info(f"πŸ“Š REASONING: Confidence={analysis.confidence_score:.2f}, Missing filters={len(analysis.missing_filters or [])}")
if analysis.missing_filters:
self.logger.info(f"πŸ“‹ Missing: {', '.join(analysis.missing_filters)}")
self.logger.info(f"πŸ’‘ Follow-up needed: {analysis.suggested_follow_up}")
return state
def _should_perform_rag(self, state: ConversationState) -> str:
"""Determine whether to perform RAG or ask follow-up"""
analysis = state["query_analysis"]
conversation_context = state.get("conversation_context", {})
recent_messages = state.get("messages", [])
# Check if we have enough context from conversation history
has_district_context = analysis.has_district or conversation_context.get("district")
has_source_context = analysis.has_source or conversation_context.get("source")
has_year_context = analysis.has_year or conversation_context.get("year")
# Count how many context pieces we have
context_count = sum([bool(has_district_context), bool(has_source_context), bool(has_year_context)])
# For PDM queries, we need more specific information
current_query = state["current_query"].lower()
recent_messages = state.get("messages", [])
# Check if this is a PDM query by looking at current query OR recent conversation
is_pdm_query = "pdm" in current_query or "parish development" in current_query
# Also check recent messages for PDM context
if not is_pdm_query and recent_messages:
for msg in recent_messages[-3:]: # Check last 3 messages
if isinstance(msg, HumanMessage) and ("pdm" in msg.content.lower() or "parish development" in msg.content.lower()):
is_pdm_query = True
break
if is_pdm_query:
# For PDM queries, we need district AND year to be specific enough
# But we need them to be explicitly provided in the current conversation, not just inferred
if has_district_context and has_year_context:
# Check if both district and year are explicitly mentioned in recent messages
explicit_district = False
explicit_year = False
for msg in recent_messages[-3:]: # Check last 3 messages
if isinstance(msg, HumanMessage):
content = msg.content.lower()
if any(district in content for district in ["gulu", "kalangala", "kampala", "namutumba"]):
explicit_district = True
if any(year in content for year in ["2022", "2023", "2022/23", "2023/24"]):
explicit_year = True
if explicit_district and explicit_year:
self.logger.info(f"πŸš€ DECISION: PDM query with explicit district and year, proceeding with RAG")
self.logger.info(f"πŸ“Š REASONING: PDM query - explicit_district={explicit_district}, explicit_year={explicit_year}")
return "rag"
else:
self.logger.info(f"❓ DECISION: PDM query needs explicit district and year, asking follow-up")
self.logger.info(f"πŸ“Š REASONING: PDM query - explicit_district={explicit_district}, explicit_year={explicit_year}")
return "follow_up"
else:
self.logger.info(f"❓ DECISION: PDM query needs more specific info, asking follow-up")
self.logger.info(f"πŸ“Š REASONING: PDM query - district={has_district_context}, year={has_year_context}")
return "follow_up"
# For general queries, be more conservative - need at least 2 pieces AND high confidence
if context_count >= 2 and analysis.confidence_score > 0.8:
self.logger.info(f"πŸš€ DECISION: Sufficient context with high confidence, proceeding with RAG")
self.logger.info(f"πŸ“Š REASONING: Context pieces: district={has_district_context}, source={has_source_context}, year={has_year_context}, confidence={analysis.confidence_score}")
return "rag"
# If user seems frustrated (short responses like "no"), proceed with RAG
if recent_messages and len(recent_messages) >= 3: # Need more messages to detect frustration
last_user_message = None
for msg in reversed(recent_messages):
if isinstance(msg, HumanMessage):
last_user_message = msg.content.lower().strip()
break
if last_user_message and len(last_user_message) < 10 and any(word in last_user_message for word in ["no", "yes", "ok", "sure"]):
self.logger.info(f"πŸš€ DECISION: User seems frustrated with short response, proceeding with RAG")
return "rag"
# Original logic for direct answers
if analysis.can_answer_directly and analysis.confidence_score > 0.7:
return "rag"
else:
return "follow_up"
def _ask_follow_up(self, state: ConversationState) -> ConversationState:
"""Generate a follow-up question to clarify missing information"""
analysis = state["query_analysis"]
current_query = state["current_query"].lower()
conversation_context = state.get("conversation_context", {})
# Check if this is a PDM query
is_pdm_query = "pdm" in current_query or "parish development" in current_query
if is_pdm_query:
# Generate PDM-specific follow-up questions
missing_info = []
if not analysis.has_district and not conversation_context.get("district"):
missing_info.append("district (e.g., Gulu, Kalangala)")
if not analysis.has_year and not conversation_context.get("year"):
missing_info.append("year (e.g., 2022, 2023)")
if missing_info:
follow_up_message = f"For PDM administrative costs information, I need to know the {', '.join(missing_info)}. Could you please specify these details?"
else:
follow_up_message = "Could you please provide more specific details about the PDM administrative costs you're looking for?"
else:
# Use the original follow-up logic
if analysis.suggested_follow_up:
follow_up_message = analysis.suggested_follow_up
else:
follow_up_message = "Could you please provide more specific details to help me give you a precise answer?"
state["final_response"] = follow_up_message
state["last_ai_message_time"] = time.time()
return state
def _build_comprehensive_query(self, current_query: str, analysis, conversation_context: dict, recent_messages: list) -> str:
"""Build a better RAG query from conversation.
- If latest message is a short modifier (e.g., "financial"), merge it into the last substantive question.
- If latest message looks like filters (district/year), keep the last question unchanged.
- Otherwise, use the current message.
"""
def is_interrogative(text: str) -> bool:
t = text.lower().strip()
return any(t.startswith(w) for w in ["what", "how", "why", "when", "where", "which", "who"]) or t.endswith("?")
def is_filter_like(text: str) -> bool:
t = text.lower()
if "district" in t:
return True
if re.search(r"\b20\d{2}\b", t) or re.search(r"20\d{2}\s*[\-/–]\s*\d{2}\b", t):
return True
if self._extract_districts_list(text):
return True
return False
# Find last substantive user question
last_question = None
for msg in reversed(recent_messages[:-1] if recent_messages else []):
if isinstance(msg, HumanMessage):
if is_interrogative(msg.content) and len(msg.content.strip()) > 15:
last_question = msg.content.strip()
break
cq = current_query.strip()
words = cq.split()
is_short_modifier = (not is_interrogative(cq)) and (len(words) <= 3)
if is_filter_like(cq) and last_question:
comprehensive_query = last_question
elif is_short_modifier and last_question:
modifier = cq
if modifier.lower() in last_question.lower():
comprehensive_query = last_question
else:
if last_question.endswith('?'):
comprehensive_query = last_question[:-1] + f" for {modifier}?"
else:
comprehensive_query = last_question + f" for {modifier}"
else:
comprehensive_query = current_query
self.logger.info(f"πŸ”„ COMPREHENSIVE QUERY: '{comprehensive_query}'")
return comprehensive_query
def _rewrite_query_with_llm(self, recent_messages: list, draft_query: str) -> str:
"""Use the LLM to rewrite a clean, focused RAG query from the conversation.
Rules enforced in prompt:
- Keep the user's main information need from the last substantive question
- Integrate short modifiers (e.g., "financial") into that question when appropriate
- Do NOT include filter text (years/districts/sources) in the query; those are handled separately
- Return a single plain sentence only (no quotes, no markdown)
"""
try:
# Build a compact conversation transcript (last 6 messages max)
convo_lines = []
for msg in recent_messages[-6:]:
if isinstance(msg, HumanMessage):
convo_lines.append(f"User: {msg.content}")
elif isinstance(msg, AIMessage):
convo_lines.append(f"Assistant: {msg.content}")
convo_text = "\n".join(convo_lines)
"""
"DECISION GUIDANCE:\n"
"- If the latest user message looks like a modifier (e.g., 'financial'), merge it into the best prior question.\n"
"- If the latest message provides filters (e.g., districts, years), DO NOT embed them; keep the base question.\n"
"- If the latest message itself is a full, clear question, use it.\n"
"- If the draft query is already good, you may refine its clarity but keep the same intent.\n\n"
"""
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=(
"ROLE: Query Rewriter for a RAG system.\n\n"
"PRIMARY OBJECTIVE:\n- Produce ONE retrieval-focused sentence that best represents the user's information need.\n"
"- Maximize recall of relevant evidence; be specific but not overconstrained.\n\n"
"INPUTS:\n- Conversation with User and Assistant turns (latest last).\n- A draft query (heuristic).\n\n"
"OPERATING PRINCIPLES:\n"
"1) Use the last substantive USER question as the backbone of intent.\n"
"2) Merge helpful domain modifiers from any USER turns (financial, procurement, risk) when they sharpen focus; ignore if not helpful.\n"
"3) Treat Assistant messages as guidance only; if the user later provided filters (years, districts, sources), DO NOT embed them in the query (filters are applied separately).\n"
"4) Remove meta-verbs like 'summarize', 'list', 'explain', 'compare' from the query.\n"
"5) Prefer content-bearing terms (topics, programs, outcomes) over task phrasing.\n"
"6) If the latest user message is filters-only, keep the prior substantive question unchanged.\n"
"7) If the draft query is already strong, refine wording for clarity but keep the same intent.\n\n"
"EXAMPLES (multi-turn):\n"
"A)\nUser: What are the top 5 priorities for improving audit procedures?\nAssistant: Could you specify the scope (e.g., financial, procurement)?\nUser: Financial\n→ Output: Top priorities for improving financial audit procedures.\n\n"
"B)\nUser: How were PDM administrative costs utilized and what was the impact of shortfalls?\nAssistant: Please specify district/year for precision.\nUser: Namutumba and Lwengo Districts (2022/23)\n→ Output: How were PDM administrative costs utilized and what was the impact of shortfalls.\n(Exclude districts/years; they are filters.)\n\n"
"C)\nUser: Summarize risk management issues in audit reports.\n→ Output: Key risk management issues in audit reports.\n\n"
"CONSTRAINTS:\n- Do NOT include filters (years, districts, sources, filenames).\n- Do NOT include quotes/markdown/bullets or multiple sentences.\n- Return exactly one plain sentence."
)),
HumanMessage(content=(
f"Conversation (most recent last):\n{convo_text}\n\n"
f"Draft query: {draft_query}\n\n"
"Rewrite the single best retrieval query sentence now:"
)),
])
# Add timeout for LLM call
import signal
def timeout_handler(signum, frame):
raise TimeoutError("LLM rewrite timeout")
# Set 10 second timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(10)
try:
resp = self.llm.invoke(prompt.format_messages())
signal.alarm(0) # Cancel timeout
rewritten = getattr(resp, 'content', '').strip()
# Basic sanitization: keep it one line
rewritten = rewritten.replace('\n', ' ').strip()
if rewritten and len(rewritten) > 5: # Basic quality check
self.logger.info(f"πŸ› οΈ LLM REWRITER: '{rewritten}'")
return rewritten
else:
self.logger.info(f"⚠️ LLM rewrite too short/empty, using draft query")
return draft_query
except TimeoutError:
signal.alarm(0)
self.logger.info(f"⚠️ LLM rewrite timeout after 10s, using draft query")
return draft_query
except Exception as e:
signal.alarm(0)
self.logger.info(f"⚠️ LLM rewrite failed, using draft query. Error: {e}")
return draft_query
except Exception as e:
self.logger.info(f"⚠️ LLM rewrite setup failed, using draft query. Error: {e}")
return draft_query
def _perform_rag(self, state: ConversationState) -> ConversationState:
"""Perform RAG retrieval with smart query expansion"""
query = state["current_query"]
analysis = state["query_analysis"]
conversation_context = state.get("conversation_context", {})
recent_messages = state.get("messages", [])
# Build comprehensive query from conversation history
draft_query = self._build_comprehensive_query(query, analysis, conversation_context, recent_messages)
# Let LLM rewrite a clean, focused search query
search_query = self._rewrite_query_with_llm(recent_messages, draft_query)
self.logger.info(f"πŸ” RAG RETRIEVAL: Starting for query: '{search_query[:50]}...'")
self.logger.info(f"πŸ“Š Analysis: district={analysis.has_district}, source={analysis.has_source}, year={analysis.has_year}")
try:
# Build filters from analysis and conversation context
filters = {}
# Use conversation context to fill in missing filters
source = analysis.extracted_source or conversation_context.get("source")
district = analysis.extracted_district or conversation_context.get("district")
year = analysis.extracted_year or conversation_context.get("year")
if source:
filters["source"] = [source] # Qdrant expects lists
self.logger.info(f"🎯 Filter: source={source}")
if year:
filters["year"] = [year]
self.logger.info(f"🎯 Filter: year={year}")
if district:
# Map district to source if needed
if district.upper() == "KAMPALA":
filters["source"] = ["KCCA"]
self.logger.info(f"🎯 Filter: district={district} -> source=KCCA")
elif district.upper() in ["GULU", "KALANGALA"]:
filters["source"] = [f"{district.upper()} DLG"]
self.logger.info(f"🎯 Filter: district={district} -> source={district.upper()} DLG")
# Run RAG pipeline with correct parameters
result = self.pipeline_manager.run(
query=search_query, # Use expanded query
sources=filters.get("source") if filters.get("source") else None,
auto_infer_filters=False, # Our agent already handled filter inference
filters=filters if filters else None
)
self.logger.info(f"βœ… RAG completed: Found {len(result.sources)} sources")
self.logger.info(f"⏱️ Execution time: {result.execution_time:.2f}s")
# Store RAG result in state
state["rag_result"] = result
state["rag_query"] = search_query
except Exception as e:
self.logger.info(f"❌ RAG retrieval failed: {e}")
state["rag_result"] = None
return state
def _generate_response(self, state: ConversationState) -> ConversationState:
"""Generate final response using RAG results"""
rag_result = state["rag_result"]
self.logger.info(f"πŸ“ RESPONSE: Using RAG result ({len(rag_result.answer)} chars)")
# Store the final response directly from RAG
state["final_response"] = rag_result.answer
state["last_ai_message_time"] = time.time()
return state
def chat(self, user_input: str, conversation_id: str = "default") -> str:
"""Main chat interface with conversation management"""
self.logger.info(f"πŸ’¬ CHAT: Processing user input: '{user_input[:50]}...'")
self.logger.info(f"πŸ“Š Session: {conversation_id}")
# Load conversation history
conversation_file = self.conversations_dir / f"{conversation_id}.json"
conversation = self._load_conversation(conversation_file)
# Add user message to conversation
conversation["messages"].append(HumanMessage(content=user_input))
self.logger.info(f"πŸ”„ LANGGRAPH: Starting graph execution")
# Prepare state for LangGraph with conversation context
state = ConversationState(
conversation_id=conversation_id,
messages=conversation["messages"],
current_query=user_input,
query_analysis=None,
conversation_context=conversation.get("context", {}),
rag_result=None,
final_response=None,
session_start_time=conversation["session_start_time"],
last_ai_message_time=conversation["last_ai_message_time"]
)
# Run the graph
final_state = self.graph.invoke(state)
# Add the AI response to conversation
if final_state["final_response"]:
conversation["messages"].append(AIMessage(content=final_state["final_response"]))
# Update conversation state
conversation["last_ai_message_time"] = final_state["last_ai_message_time"]
conversation["context"] = final_state["conversation_context"]
# Save conversation
self._save_conversation(conversation_file, conversation)
self.logger.info(f"βœ… LANGGRAPH: Graph execution completed")
self.logger.info(f"🎯 CHAT COMPLETE: Response ready")
# Return both response and RAG result for UI
return {
'response': final_state["final_response"] or "I apologize, but I couldn't process your request.",
'rag_result': final_state["rag_result"],
'actual_rag_query': final_state.get("rag_query", "")
}
def _load_conversation(self, conversation_file: Path) -> Dict[str, Any]:
"""Load conversation from file"""
if conversation_file.exists():
try:
with open(conversation_file) as f:
data = json.load(f)
# Convert message dicts back to LangChain messages
messages = []
for msg_data in data.get("messages", []):
if msg_data["type"] == "human":
messages.append(HumanMessage(content=msg_data["content"]))
elif msg_data["type"] == "ai":
messages.append(AIMessage(content=msg_data["content"]))
data["messages"] = messages
return data
except Exception as e:
self.logger.info(f"⚠️ Could not load conversation: {e}")
# Return default conversation
return {
"messages": [],
"session_start_time": time.time(),
"last_ai_message_time": time.time(),
"context": {}
}
def _save_conversation(self, conversation_file: Path, conversation: Dict[str, Any]):
"""Save conversation to file"""
try:
# Convert LangChain messages to serializable format
messages_data = []
for msg in conversation["messages"]:
if isinstance(msg, HumanMessage):
messages_data.append({"type": "human", "content": msg.content})
elif isinstance(msg, AIMessage):
messages_data.append({"type": "ai", "content": msg.content})
data = {
"messages": messages_data,
"session_start_time": conversation["session_start_time"],
"last_ai_message_time": conversation["last_ai_message_time"],
"context": conversation.get("context", {}),
"last_updated": datetime.now().isoformat()
}
with open(conversation_file, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
self.logger.info(f"⚠️ Could not save conversation: {e}")
def get_chatbot():
"""Get chatbot instance"""
return IntelligentRAGChatbot()
if __name__ == "__main__":
# Test the chatbot
chatbot = IntelligentRAGChatbot()
# Test conversation
test_queries = [
"How much was the budget allocation for government salary payroll management?",
"Namutumba district in 2023",
"KCCA"
]
for query in test_queries:
self.logger.info(f"\n{'='*50}")
self.logger.info(f"User: {query}")
response = chatbot.chat(query)
self.logger.info(f"Bot: {response}")