audit_assistant / multi_agent_chatbot.py
akryldigital's picture
demo version (#5)
40b7ffe verified
"""
Multi-Agent RAG Chatbot using LangGraph
This system implements a 3-agent architecture:
1. Main Agent: Handles conversation flow, follow-ups, and determines when to call RAG
2. RAG Agent: Rewrites queries and applies filters for document retrieval
3. Response Agent: Generates final answers from retrieved documents
Each agent has specialized prompts and responsibilities.
"""
import re
import json
import time
import logging
import traceback
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, List, Any, Optional, TypedDict
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from src.pipeline import PipelineManager
from src.llm.adapters import get_llm_client
from src.config.paths import PROJECT_DIR, CONVERSATIONS_DIR
from src.config.loader import load_config, get_embedding_model_for_collection
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class QueryContext:
"""Context extracted from conversation"""
has_district: bool = False
has_source: bool = False
has_year: bool = False
extracted_district: Optional[str] = None
extracted_source: Optional[str] = None
extracted_year: Optional[str] = None
ui_filters: Dict[str, List[str]] = None
confidence_score: float = 0.0
needs_follow_up: bool = False
follow_up_question: Optional[str] = None
class MultiAgentState(TypedDict):
"""State for the multi-agent conversation flow"""
conversation_id: str
messages: List[Any]
current_query: str
query_context: Optional[QueryContext]
rag_query: Optional[str]
rag_filters: Optional[Dict[str, Any]]
retrieved_documents: Optional[List[Any]]
final_response: Optional[str]
agent_logs: List[str]
conversation_context: Dict[str, Any]
session_start_time: float
last_ai_message_time: float
class MultiAgentRAGChatbot:
"""Multi-agent RAG chatbot with specialized agents"""
def __init__(self, config_path: str = "src/config/settings.yaml"):
"""Initialize the multi-agent chatbot"""
self.config = load_config(config_path)
# Get LLM provider from config
reader_config = self.config.get("reader", {})
default_type = reader_config.get("default_type", "INF_PROVIDERS")
provider_name = default_type.lower()
self.llm_adapter = get_llm_client(provider_name, self.config)
# Create a simple wrapper for LangChain compatibility
class LLMWrapper:
def __init__(self, adapter):
self.adapter = adapter
def invoke(self, messages):
# Convert LangChain messages to the format expected by the adapter
if isinstance(messages, list):
formatted_messages = []
for msg in messages:
if hasattr(msg, 'content'):
role = "user" if msg.__class__.__name__ == "HumanMessage" else "assistant"
formatted_messages.append({"role": role, "content": msg.content})
else:
formatted_messages.append({"role": "user", "content": str(msg)})
else:
formatted_messages = [{"role": "user", "content": str(messages)}]
# Use the adapter to get response
response = self.adapter.generate(formatted_messages)
# Return a mock response object
class MockResponse:
def __init__(self, content):
self.content = content
return MockResponse(response.content)
self.llm = LLMWrapper(self.llm_adapter)
# Initialize pipeline manager early to load models
logger.info("πŸ”„ Initializing pipeline manager and loading models...")
try:
self.pipeline_manager = PipelineManager(self.config)
logger.info("βœ… Pipeline manager initialized and models loaded")
except Exception as e:
logger.error(f"❌ Failed to initialize pipeline manager: {e}")
traceback.print_exc()
raise RuntimeError(f"Pipeline manager initialization failed: {e}")
# Connect to vector store
logger.info("πŸ”„ Connecting to vector store...")
try:
if not self.pipeline_manager.connect_vectorstore():
logger.error("❌ Failed to connect to vector store")
logger.error("πŸ’‘ Check that QDRANT_API_KEY environment variable is set")
logger.error("πŸ’‘ Check that Qdrant URL and collection name are correct in config")
raise RuntimeError("Vector store connection failed")
logger.info("βœ… Vector store connected successfully")
except RuntimeError:
raise # Re-raise RuntimeError as-is
except Exception as e:
logger.error(f"❌ Error during vector store connection: {e}")
traceback.print_exc()
raise RuntimeError(f"Vector store connection failed: {e}")
# Load dynamic data
self._load_dynamic_data()
# Build the multi-agent graph
self.graph = self._build_graph()
# Conversations directory - use PROJECT_DIR for local vs deployed compatibility
self.conversations_dir = CONVERSATIONS_DIR
try:
# Use 777 permissions for maximum compatibility (HF Spaces runs as different user)
self.conversations_dir.mkdir(parents=True, mode=0o777, exist_ok=True)
except (PermissionError, OSError) as e:
logger.warning(f"Could not create conversations directory at {self.conversations_dir}: {e}")
# Fallback to a relative path (current directory)
self.conversations_dir = Path("conversations")
try:
self.conversations_dir.mkdir(parents=True, mode=0o777, exist_ok=True)
except (PermissionError, OSError) as e2:
logger.error(f"Could not create conversations directory at {self.conversations_dir}: {e2}")
raise RuntimeError(f"Failed to create conversations directory: {e2}")
logger.info("πŸ€– Multi-Agent RAG Chatbot initialized")
def _load_dynamic_data(self):
"""Load dynamic data from filter_options.json and add_district_metadata.py"""
# Load filter options - use PROJECT_DIR relative path
try:
fo = PROJECT_DIR / "src" / "config" / "filter_options.json"
if fo.exists():
with open(fo) as f:
data = json.load(f)
self.year_whitelist = [str(y).strip() for y in data.get("years", [])]
self.source_whitelist = [str(s).strip() for s in data.get("sources", [])]
self.district_whitelist = [str(d).strip() for d in data.get("districts", [])]
else:
# Fallback to default values
self.year_whitelist = ['2018', '2019', '2020', '2021', '2022', '2023', '2024']
self.source_whitelist = ['Consolidated', 'Local Government', 'Ministry, Department and Agency']
self.district_whitelist = ['Kampala', 'Gulu', 'Kalangala']
except Exception as e:
logger.warning(f"Could not load filter options: {e}")
self.year_whitelist = ['2018', '2019', '2020', '2021', '2022', '2023', '2024']
self.source_whitelist = ['Consolidated', 'Local Government', 'Ministry, Department and Agency']
self.district_whitelist = ['Kampala', 'Gulu', 'Kalangala']
# Enrich district list from add_district_metadata.py (if available)
try:
from add_district_metadata import DistrictMetadataProcessor
proc = DistrictMetadataProcessor()
names = set()
for key, mapping in proc.district_mappings.items():
if getattr(mapping, 'is_district', True):
names.add(mapping.name)
if names:
merged = list(self.district_whitelist)
for n in sorted(names):
if n not in merged:
merged.append(n)
self.district_whitelist = merged
logger.info(f"🧭 District whitelist enriched: {len(self.district_whitelist)} entries")
except Exception as e:
logger.info(f"ℹ️ Could not enrich districts: {e}")
# Calculate current year dynamically
self.current_year = str(datetime.now().year)
self.previous_year = str(datetime.now().year - 1)
# Log the actual filter values for debugging
logger.info(f"πŸ“Š ACTUAL FILTER VALUES:")
logger.info(f" Years: {self.year_whitelist}")
logger.info(f" Sources: {self.source_whitelist}")
logger.info(f" Districts: {len(self.district_whitelist)} districts (first 10: {self.district_whitelist[:10]})")
def _build_graph(self) -> StateGraph:
"""Build the multi-agent LangGraph"""
graph = StateGraph(MultiAgentState)
# Add nodes for each agent
graph.add_node("main_agent", self._main_agent)
graph.add_node("rag_agent", self._rag_agent)
graph.add_node("response_agent", self._response_agent)
# Define the flow
graph.set_entry_point("main_agent")
# Main agent decides next step
graph.add_conditional_edges(
"main_agent",
self._should_call_rag,
{
"follow_up": END,
"call_rag": "rag_agent"
}
)
# RAG agent calls response agent
graph.add_edge("rag_agent", "response_agent")
# Response agent returns to main agent for potential follow-ups
graph.add_edge("response_agent", "main_agent")
return graph.compile()
def _should_call_rag(self, state: MultiAgentState) -> str:
"""Determine if we should call RAG or ask follow-up"""
# If we already have a final response (from response agent), end
if state.get("final_response"):
return "follow_up"
context = state["query_context"]
if context and context.needs_follow_up:
return "follow_up"
return "call_rag"
def _main_agent(self, state: MultiAgentState) -> MultiAgentState:
"""Main Agent: Handles conversation flow and follow-ups"""
logger.info("🎯 MAIN AGENT: Starting analysis")
# If we already have a final response from response agent, end gracefully
if state.get("final_response"):
logger.info("🎯 MAIN AGENT: Final response already exists, ending conversation flow")
return state
query = state["current_query"]
messages = state["messages"]
logger.info(f"🎯 MAIN AGENT: Extracting UI filters from query")
ui_filters = self._extract_ui_filters(query)
logger.info(f"🎯 MAIN AGENT: UI filters extracted: {ui_filters}")
# Analyze query context
logger.info(f"🎯 MAIN AGENT: Analyzing query context")
context = self._analyze_query_context(query, messages, ui_filters)
# Log agent decision
state["agent_logs"].append(f"MAIN AGENT: Context analyzed - district={context.has_district}, source={context.has_source}, year={context.has_year}")
logger.info(f"🎯 MAIN AGENT: Context analysis complete - district={context.has_district}, source={context.has_source}, year={context.has_year}")
# Store context
state["query_context"] = context
# If follow-up needed, generate response
if context.needs_follow_up:
logger.info(f"🎯 MAIN AGENT: Follow-up needed, generating question")
response = context.follow_up_question
state["final_response"] = response
state["last_ai_message_time"] = time.time()
logger.info(f"🎯 MAIN AGENT: Follow-up question generated: {response[:100]}...")
else:
logger.info("🎯 MAIN AGENT: No follow-up needed, proceeding to RAG")
return state
def _rag_agent(self, state: MultiAgentState) -> MultiAgentState:
"""RAG Agent: Rewrites queries and applies filters"""
logger.info("πŸ” RAG AGENT: Starting query rewriting and filter preparation")
context = state["query_context"]
messages = state["messages"]
logger.info(f"πŸ” RAG AGENT: Context received - district={context.has_district}, source={context.has_source}, year={context.has_year}")
# Rewrite query for RAG
logger.info(f"πŸ” RAG AGENT: Rewriting query for optimal retrieval")
rag_query = self._rewrite_query_for_rag(messages, context)
logger.info(f"πŸ” RAG AGENT: Query rewritten: '{rag_query}'")
# Build filters
logger.info(f"πŸ” RAG AGENT: Building filters from context")
filters = self._build_filters(context)
logger.info(f"πŸ” RAG AGENT: Filters built: {filters}")
# Log RAG preparation
state["agent_logs"].append(f"RAG AGENT: Query='{rag_query}', Filters={filters}")
# Store for response agent
state["rag_query"] = rag_query
state["rag_filters"] = filters
logger.info(f"πŸ” RAG AGENT: Preparation complete, ready for retrieval")
return state
def _response_agent(self, state: MultiAgentState) -> MultiAgentState:
"""Response Agent: Generates final answer from retrieved documents"""
logger.info("πŸ“ RESPONSE AGENT: Starting document retrieval and answer generation")
rag_query = state["rag_query"]
filters = state["rag_filters"]
logger.info(f"πŸ“ RESPONSE AGENT: Starting RAG retrieval with query: '{rag_query}'")
logger.info(f"πŸ“ RESPONSE AGENT: Using filters: {filters}")
# Perform RAG retrieval
logger.info(f"πŸ“ RESPONSE AGENT: Calling pipeline manager for retrieval")
logger.info(f"πŸ” ACTUAL RAG QUERY: '{rag_query}'")
logger.info(f"πŸ” ACTUAL FILTERS: {filters}")
try:
# Extract filenames from filters if present
filenames = filters.get("filenames") if filters else None
result = self.pipeline_manager.run(
query=rag_query,
sources=filters.get("sources") if filters else None,
auto_infer_filters=False,
filters=filters if filters else None
)
logger.info(f"πŸ“ RESPONSE AGENT: RAG retrieval completed - {len(result.sources)} documents retrieved")
logger.info(f"πŸ” RETRIEVAL DEBUG: Result type: {type(result)}")
logger.info(f"πŸ” RETRIEVAL DEBUG: Result sources type: {type(result.sources)}")
# logger.info(f"πŸ” RETRIEVAL DEBUG: Result metadata: {getattr(result, 'metadata', 'No metadata')}")
if len(result.sources) == 0:
logger.warning(f"⚠️ NO DOCUMENTS RETRIEVED: Query='{rag_query}', Filters={filters}")
logger.warning(f"⚠️ RETRIEVAL DEBUG: This could be due to:")
logger.warning(f" - Query too specific for available documents")
logger.warning(f" - Filters too restrictive")
logger.warning(f" - Vector store connection issues")
logger.warning(f" - Embedding model issues")
else:
logger.info(f"βœ… DOCUMENTS RETRIEVED: {len(result.sources)} documents found")
for i, doc in enumerate(result.sources[:3]): # Log first 3 docs
logger.info(f" Doc {i+1}: {getattr(doc, 'metadata', {}).get('filename', 'Unknown')[:50]}...")
state["retrieved_documents"] = result.sources
state["agent_logs"].append(f"RESPONSE AGENT: Retrieved {len(result.sources)} documents")
# Check highest similarity score
highest_score = 0.0
if result.sources:
# Check reranked_score first (more accurate), fallback to original_score
for doc in result.sources:
score = doc.metadata.get('reranked_score') or doc.metadata.get('original_score', 0.0)
if score > highest_score:
highest_score = score
logger.info(f"πŸ“ RESPONSE AGENT: Highest similarity score: {highest_score:.4f}")
# If highest score is too low, don't use retrieved documents
if highest_score <= 0.15:
logger.warning(f"⚠️ RESPONSE AGENT: Low similarity score ({highest_score:.4f} <= 0.15), using LLM knowledge only")
response = self._generate_conversational_response_without_docs(
state["current_query"],
state["messages"]
)
else:
# Generate conversational response with documents
logger.info(f"πŸ“ RESPONSE AGENT: Generating conversational response from {len(result.sources)} documents")
response = self._generate_conversational_response(
state["current_query"],
result.sources,
result.answer,
state["messages"]
)
logger.info(f"πŸ“ RESPONSE AGENT: Response generated: {response[:100]}...")
state["final_response"] = response
state["last_ai_message_time"] = time.time()
logger.info(f"πŸ“ RESPONSE AGENT: Answer generation complete")
except Exception as e:
logger.error(f"❌ RESPONSE AGENT ERROR: {e}")
state["final_response"] = "I apologize, but I encountered an error while retrieving information. Please try again."
state["last_ai_message_time"] = time.time()
return state
def _extract_ui_filters(self, query: str) -> Dict[str, List[str]]:
"""Extract UI filters from query"""
filters = {}
# Look for FILTER CONTEXT in query
if "FILTER CONTEXT:" in query:
# Extract the entire filter section (until USER QUERY: or end of query)
filter_section = query.split("FILTER CONTEXT:")[1]
if "USER QUERY:" in filter_section:
filter_section = filter_section.split("USER QUERY:")[0]
filter_section = filter_section.strip()
# Parse sources
if "Sources:" in filter_section:
sources_line = [line for line in filter_section.split('\n') if line.strip().startswith('Sources:')][0]
sources_str = sources_line.split("Sources:")[1].strip()
if sources_str and sources_str != "None":
filters["sources"] = [s.strip() for s in sources_str.split(",")]
# Parse years
if "Years:" in filter_section:
years_line = [line for line in filter_section.split('\n') if line.strip().startswith('Years:')][0]
years_str = years_line.split("Years:")[1].strip()
if years_str and years_str != "None":
filters["years"] = [y.strip() for y in years_str.split(",")]
# Parse districts
if "Districts:" in filter_section:
districts_line = [line for line in filter_section.split('\n') if line.strip().startswith('Districts:')][0]
districts_str = districts_line.split("Districts:")[1].strip()
if districts_str and districts_str != "None":
filters["districts"] = [d.strip() for d in districts_str.split(",")]
# Parse filenames
if "Filenames:" in filter_section:
filenames_line = [line for line in filter_section.split('\n') if line.strip().startswith('Filenames:')][0]
filenames_str = filenames_line.split("Filenames:")[1].strip()
if filenames_str and filenames_str != "None":
filters["filenames"] = [f.strip() for f in filenames_str.split(",")]
return filters
def _analyze_query_context(self, query: str, messages: List[Any], ui_filters: Dict[str, List[str]]) -> QueryContext:
"""Analyze query context using LLM"""
logger.info(f"πŸ” QUERY ANALYSIS: '{query[:50]}...' | UI filters: {ui_filters} | Messages: {len(messages)}")
# Build conversation context
conversation_context = ""
for i, msg in enumerate(messages[-6:]): # Last 6 messages
if isinstance(msg, HumanMessage):
conversation_context += f"User: {msg.content}\n"
elif isinstance(msg, AIMessage):
conversation_context += f"Assistant: {msg.content}\n"
# Create analysis prompt
analysis_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=f"""You are the Main Agent in an advanced multi-agent RAG system for audit report analysis.
🎯 PRIMARY GOAL: Intelligently analyze user queries and determine the optimal conversation flow, whether that's answering directly, asking follow-ups, or proceeding to RAG retrieval.
🧠 INTELLIGENCE LEVEL: You are a sophisticated conversational AI that can handle any type of user interaction - from greetings to complex audit queries.
πŸ“Š YOUR EXPERTISE: You specialize in analyzing audit reports from various sources (Local Government, Ministry, Hospital, etc.) across different years and districts in Uganda.
πŸ” AVAILABLE FILTERS:
- Years: {', '.join(self.year_whitelist)}
- Current year: {self.current_year}, Previous year: {self.previous_year}
- Sources: {', '.join(self.source_whitelist)}
- Districts: {', '.join(self.district_whitelist[:50])}... (and {len(self.district_whitelist)-50} more)
πŸŽ›οΈ UI FILTERS PROVIDED: {ui_filters}
πŸ“‹ UI FILTER HANDLING:
- If UI filters contain multiple values (e.g., districts: ['Lwengo', 'Kiboga']), extract ALL values
- For multiple districts: extract each district separately and validate each one
- For multiple years: extract each year separately and validate each one
- For multiple sources: extract each source separately and validate each one
- UI filters take PRIORITY over conversation context - use them first
🧭 CONVERSATION FLOW INTELLIGENCE:
1. **GREETINGS & GENERAL CHAT**:
- If user greets you ("Hi", "Hello", "How are you"), respond warmly and guide them to audit-related questions
- Example: "Hello! I'm here to help you analyze audit reports. What would you like to know about budget allocations, expenditures, or audit findings?"
2. **EDGE CASES**:
- Handle "What can you do?", "Help", "I don't know what to ask" with helpful guidance
- Example: "I can help you analyze audit reports! Try asking about budget allocations, salary management, PDM implementation, or any specific audit findings."
3. **AUDIT QUERIES**:
- Extract ONLY values that EXACTLY match the available lists above
- DO NOT hallucinate or infer values not in the lists
- If user mentions "salary payroll management" - this is NOT a valid source filter
**YEAR EXTRACTION**:
- If user mentions "2023" and it's in the years list - extract "2023"
- If user mentions "2022 / 23" - extract ["2022", "2023"] (as a JSON array)
- If user mentions "2022-2023" - extract ["2022", "2023"] (as a JSON array)
- If user mentions "latest couple of years" - extract the 2 most recent years from available data as JSON array
- Always return years as JSON arrays when multiple years are mentioned
**DISTRICT EXTRACTION**:
- If user mentions "Kampala" and it's in the districts list - extract "Kampala"
- If user mentions "Pader District" - extract "Pader" (remove "District" suffix)
- If user mentions "Lwengo, Kiboga and Namutumba" - extract ["Lwengo", "Kiboga", "Namutumba"] (as JSON array)
- If user mentions "Lwengo District and Kiboga District" - extract ["Lwengo", "Kiboga"] (as JSON array, remove "District" suffix)
- Always return districts as JSON arrays when multiple districts are mentioned
- If no exact matches found, set extracted values to null
4. **FILENAME FILTERING (MUTUALLY EXCLUSIVE)**:
- If UI provides filenames filter - ONLY use that, ignore all other filters (year, district, source)
- With filenames filter, no follow-ups needed - proceed directly to RAG
- When filenames are specified, skip filter inference entirely
5. **HALLUCINATION PREVENTION**:
- If user asks about a specific report but NO filename is selected in UI and NONE is extracted from conversation - DO NOT hallucinate
- Clearly state: "I don't have any specific report selected. Could you please select a report from the list or tell me which report you'd like to analyze?"
- DO NOT pretend to know which report they mean
- DO NOT infer reports from context alone - only use explicitly mentioned reports
6. **CONVERSATION CONTEXT AWARENESS**:
- ALWAYS consider the full conversation context when extracting filters
- If district was mentioned in previous messages, include it in current analysis
- If year was mentioned in previous messages, include it in current analysis
- If source was mentioned in previous messages, include it in current analysis
- Example: If conversation shows "User: Tell me about Pader District" then "User: 2023", extract both: district="Pader" and year="2023"
5. **SMART FOLLOW-UP STRATEGY**:
- NEVER ask the same question twice in a row
- If user provides source info, ask for year or district next
- If user provides year info, ask for source or district next
- If user provides district info, ask for year or source next
- If user provides 2+ pieces of info, proceed to RAG instead of asking more
- Make follow-ups conversational and contextual, not robotic
5. **DYNAMIC FOLLOW-UP EXAMPLES**:
- Budget queries: "What year are you interested in?" or "Which department - Local Government or Ministry?"
- PDM queries: "Which district are you interested in?" or "What year?"
- General queries: "Could you be more specific about what you'd like to know?"
🎯 DECISION LOGIC:
- If query is a greeting/general chat β†’ needs_follow_up: true, provide helpful guidance
- If query has 2+ pieces of info β†’ needs_follow_up: false, proceed to RAG
- If query has 1 piece of info β†’ needs_follow_up: true, ask for missing piece
- If query has 0 pieces of info β†’ needs_follow_up: true, ask for clarification
RESPOND WITH JSON ONLY:
{{
"has_district": boolean,
"has_source": boolean,
"has_year": boolean,
"extracted_district": "single district name or JSON array of districts or null",
"extracted_source": "single source name or JSON array of sources or null",
"extracted_year": "single year or JSON array of years or null",
"confidence_score": 0.0-1.0,
"needs_follow_up": boolean,
"follow_up_question": "conversational question or helpful guidance or null"
}}"""),
HumanMessage(content=f"""Query: {query}
Conversation Context:
{conversation_context}
CRITICAL: You MUST analyze the FULL conversation context above, not just the current query.
- If ANY district was mentioned in previous messages, extract it
- If ANY year was mentioned in previous messages, extract it
- If ANY source was mentioned in previous messages, extract it
- Combine information from ALL messages in the conversation
Analyze this query using ONLY the exact values provided above:""")
])
try:
response = self.llm.invoke(analysis_prompt.format_messages())
# Clean the response to extract JSON
content = response.content.strip()
if content.startswith("```json"):
# Remove markdown formatting
content = content.replace("```json", "").replace("```", "").strip()
elif content.startswith("```"):
# Remove generic markdown formatting
content = content.replace("```", "").strip()
# Clean and parse JSON with better error handling
try:
# Remove comments (// and /* */) from JSON
# Remove single-line comments
content = re.sub(r'//.*?$', '', content, flags=re.MULTILINE)
# Remove multi-line comments
content = re.sub(r'/\*.*?\*/', '', content, flags=re.DOTALL)
analysis = json.loads(content)
logger.info(f"πŸ” QUERY ANALYSIS: βœ… Parsed successfully")
except json.JSONDecodeError as e:
logger.error(f"❌ JSON parsing failed: {e}")
logger.error(f"❌ Raw content: {content[:200]}...")
# Try to extract JSON from text if embedded
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
try:
# Clean the extracted JSON
cleaned_json = json_match.group()
cleaned_json = re.sub(r'//.*?$', '', cleaned_json, flags=re.MULTILINE)
cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json, flags=re.DOTALL)
analysis = json.loads(cleaned_json)
logger.info(f"πŸ” QUERY ANALYSIS: βœ… Extracted and cleaned JSON from text")
except json.JSONDecodeError as e2:
logger.error(f"❌ Failed to extract JSON from text: {e2}")
# Return fallback context
context = QueryContext(
has_district=False,
has_source=False,
has_year=False,
extracted_district=None,
extracted_source=None,
extracted_year=None,
confidence_score=0.0,
needs_follow_up=True,
follow_up_question="I apologize, but I'm having trouble processing your request. Could you please rephrase it or ask for help?"
)
return context
else:
# Return fallback context
context = QueryContext(
has_district=False,
has_source=False,
has_year=False,
extracted_district=None,
extracted_source=None,
extracted_year=None,
confidence_score=0.0,
needs_follow_up=True,
follow_up_question="I apologize, but I'm having trouble processing your request. Could you please rephrase it or ask for help?"
)
return context
# Validate extracted values against whitelists
extracted_district = analysis.get("extracted_district")
extracted_source = analysis.get("extracted_source")
extracted_year = analysis.get("extracted_year")
logger.info(f"πŸ” QUERY ANALYSIS: Raw extracted values - district: {extracted_district}, source: {extracted_source}, year: {extracted_year}")
# Validate district (handle both single values and arrays)
if extracted_district:
if isinstance(extracted_district, list):
# Validate each district in the array
valid_districts = []
for district in extracted_district:
if district in self.district_whitelist:
valid_districts.append(district)
else:
# Try removing "District" suffix
district_name = district.replace(" District", "").replace(" district", "")
if district_name in self.district_whitelist:
valid_districts.append(district_name)
if valid_districts:
extracted_district = valid_districts[0] if len(valid_districts) == 1 else valid_districts
logger.info(f"πŸ” QUERY ANALYSIS: Extracted districts: {extracted_district}")
else:
logger.warning(f"⚠️ No valid districts found in: '{extracted_district}'")
extracted_district = None
else:
# Single district validation
if extracted_district not in self.district_whitelist:
# Try removing "District" suffix
district_name = extracted_district.replace(" District", "").replace(" district", "")
if district_name in self.district_whitelist:
logger.info(f"πŸ” QUERY ANALYSIS: Normalized district '{extracted_district}' to '{district_name}'")
extracted_district = district_name
else:
logger.warning(f"⚠️ Invalid district extracted: '{extracted_district}' not in whitelist")
extracted_district = None
# Validate source (handle both single values and arrays)
if extracted_source:
if isinstance(extracted_source, list):
# Validate each source in the array
valid_sources = []
for source in extracted_source:
if source in self.source_whitelist:
valid_sources.append(source)
else:
logger.warning(f"⚠️ Invalid source in array: '{source}' not in whitelist")
if valid_sources:
extracted_source = valid_sources[0] if len(valid_sources) == 1 else valid_sources
logger.info(f"πŸ” QUERY ANALYSIS: Extracted sources: {extracted_source}")
else:
logger.warning(f"⚠️ No valid sources found in: '{extracted_source}'")
extracted_source = None
else:
# Single source validation
if extracted_source not in self.source_whitelist:
logger.warning(f"⚠️ Invalid source extracted: '{extracted_source}' not in whitelist")
extracted_source = None
# Validate year (handle both single values and arrays)
if extracted_year:
if isinstance(extracted_year, list):
# Validate each year in the array
valid_years = []
for year in extracted_year:
year_str = str(year)
if year_str in self.year_whitelist:
valid_years.append(year_str)
if valid_years:
extracted_year = valid_years[0] if len(valid_years) == 1 else valid_years
logger.info(f"πŸ” QUERY ANALYSIS: Extracted years: {extracted_year}")
else:
logger.warning(f"⚠️ No valid years found in: '{extracted_year}'")
extracted_year = None
else:
# Single year validation
year_str = str(extracted_year)
if year_str not in self.year_whitelist:
logger.warning(f"⚠️ Invalid year extracted: '{extracted_year}' not in whitelist")
extracted_year = None
else:
extracted_year = year_str
logger.info(f"πŸ” QUERY ANALYSIS: Validated values - district: {extracted_district}, source: {extracted_source}, year: {extracted_year}")
# Create QueryContext object
context = QueryContext(
has_district=bool(extracted_district),
has_source=bool(extracted_source),
has_year=bool(extracted_year),
extracted_district=extracted_district,
extracted_source=extracted_source,
extracted_year=extracted_year,
ui_filters=ui_filters,
confidence_score=analysis.get("confidence_score", 0.0),
needs_follow_up=analysis.get("needs_follow_up", False),
follow_up_question=analysis.get("follow_up_question")
)
logger.info(f"πŸ” QUERY ANALYSIS: Analysis complete - needs_follow_up: {context.needs_follow_up}, confidence: {context.confidence_score}")
# If filenames are provided in UI, skip follow-ups and proceed to RAG
if ui_filters and ui_filters.get("filenames"):
logger.info(f"πŸ” QUERY ANALYSIS: Filenames provided, skipping follow-ups, proceeding to RAG")
context.needs_follow_up = False
context.follow_up_question = None
# Additional smart decision logic
if context.needs_follow_up:
# Check if we have enough information to proceed
info_count = sum([
bool(context.extracted_district),
bool(context.extracted_source),
bool(context.extracted_year)
])
# Check if user is asking for more info vs providing it
query_lower = query.lower()
is_requesting_info = any(phrase in query_lower for phrase in [
"please provide", "could you provide", "can you provide",
"what is", "what are", "how much", "which", "what year",
"what district", "what source", "tell me about"
])
# If we have 2+ pieces of info AND user is not requesting more info, proceed to RAG
if info_count >= 2 and not is_requesting_info:
logger.info(f"πŸ” QUERY ANALYSIS: Smart override - have {info_count} pieces of info and user not requesting more, proceeding to RAG")
context.needs_follow_up = False
context.follow_up_question = None
elif info_count >= 2 and is_requesting_info:
logger.info(f"πŸ” QUERY ANALYSIS: User requesting more info despite having {info_count} pieces, proceeding to RAG with comprehensive answer")
context.needs_follow_up = False
context.follow_up_question = None
return context
except Exception as e:
logger.error(f"❌ Query analysis failed: {e}")
# Fallback: proceed with RAG
return QueryContext(
has_district=bool(ui_filters.get("districts")),
has_source=bool(ui_filters.get("sources")),
has_year=bool(ui_filters.get("years")),
ui_filters=ui_filters,
confidence_score=0.5,
needs_follow_up=False
)
def _rewrite_query_for_rag(self, messages: List[Any], context: QueryContext) -> str:
"""Rewrite query for optimal RAG retrieval"""
logger.info("πŸ”„ QUERY REWRITING: Starting query rewrite for RAG")
logger.info(f"πŸ”„ QUERY REWRITING: Processing {len(messages)} messages")
# Build conversation context
logger.info(f"πŸ”„ QUERY REWRITING: Building conversation context from last 6 messages")
conversation_lines = []
for i, msg in enumerate(messages[-6:]):
if isinstance(msg, HumanMessage):
conversation_lines.append(f"User: {msg.content}")
logger.info(f"πŸ”„ QUERY REWRITING: Message {i+1}: User - {msg.content[:50]}...")
elif isinstance(msg, AIMessage):
conversation_lines.append(f"Assistant: {msg.content}")
logger.info(f"πŸ”„ QUERY REWRITING: Message {i+1}: Assistant - {msg.content[:50]}...")
convo_text = "\n".join(conversation_lines)
logger.info(f"πŸ”„ QUERY REWRITING: Conversation context built ({len(convo_text)} chars)")
# Create rewrite prompt
rewrite_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=f"""You are a query rewriter for RAG retrieval.
GOAL: Create the best possible search query for document retrieval.
CRITICAL RULES:
1. Focus on the core information need from the conversation
2. Remove meta-verbs like "summarize", "list", "compare", "how much", "what" - keep the content focus
3. DO NOT include filter details (years, districts, sources) - these are applied separately as filters
4. DO NOT include specific years, district names, or source types in the query
5. Output ONE clear sentence suitable for vector search
6. Keep it generic and focused on the topic/subject matter
EXAMPLES:
- "What are the top challenges in budget allocation?" β†’ "budget allocation challenges"
- "How were PDM administrative costs utilized in 2023?" β†’ "PDM administrative costs utilization"
- "Compare salary management across districts" β†’ "salary management"
- "How much was budget allocation for Local Government in 2023?" β†’ "budget allocation"
OUTPUT FORMAT:
Provide your response in this exact format:
EXPLANATION: [Your reasoning here]
QUERY: [One clean sentence for retrieval]
The QUERY line will be extracted and used directly for RAG retrieval."""),
HumanMessage(content=f"""Conversation:
{convo_text}
Rewrite the best retrieval query:""")
])
try:
logger.info(f"πŸ”„ QUERY REWRITING: Calling LLM for query rewrite")
response = self.llm.invoke(rewrite_prompt.format_messages())
logger.info(f"πŸ”„ QUERY REWRITING: LLM response received: {response.content[:100]}...")
rewritten = response.content.strip()
# Extract only the QUERY line from the structured response
lines = rewritten.split('\n')
query_line = None
for line in lines:
if line.strip().startswith('QUERY:'):
query_line = line.replace('QUERY:', '').strip()
break
if query_line and len(query_line) > 5:
logger.info(f"πŸ”„ QUERY REWRITING: Query rewritten successfully: '{query_line[:50]}...'")
return query_line
else:
logger.info(f"πŸ”„ QUERY REWRITING: No QUERY line found or too short, using fallback")
# Fallback to last user message
for msg in reversed(messages):
if isinstance(msg, HumanMessage):
logger.info(f"πŸ”„ QUERY REWRITING: Using fallback message: '{msg.content[:50]}...'")
return msg.content
logger.info(f"πŸ”„ QUERY REWRITING: Using default fallback")
return "audit report information"
except Exception as e:
logger.error(f"❌ QUERY REWRITING: Error during rewrite: {e}")
# Fallback
for msg in reversed(messages):
if isinstance(msg, HumanMessage):
logger.info(f"πŸ”„ QUERY REWRITING: Using error fallback message: '{msg.content[:50]}...'")
return msg.content
logger.info(f"πŸ”„ QUERY REWRITING: Using default error fallback")
return "audit report information"
def _build_filters(self, context: QueryContext) -> Dict[str, Any]:
"""Build filters for RAG retrieval"""
logger.info("πŸ”§ FILTER BUILDING: Starting filter construction")
filters = {}
# Check for filename filtering first (mutually exclusive)
if context.ui_filters and context.ui_filters.get("filenames"):
logger.info(f"πŸ”§ FILTER BUILDING: Filename filtering requested (mutually exclusive mode)")
filters["filenames"] = context.ui_filters["filenames"]
logger.info(f"πŸ”§ FILTER BUILDING: Added filenames filter: {context.ui_filters['filenames']}")
logger.info(f"πŸ”§ FILTER BUILDING: Final filters: {filters}")
return filters # Return early, skip all other filters
# UI filters take priority, but merge with extracted context if UI filters are incomplete
if context.ui_filters:
logger.info(f"πŸ”§ FILTER BUILDING: UI filters present: {context.ui_filters}")
# Add UI filters first
if context.ui_filters.get("sources"):
filters["sources"] = context.ui_filters["sources"]
logger.info(f"πŸ”§ FILTER BUILDING: Added sources filter from UI: {context.ui_filters['sources']}")
if context.ui_filters.get("years"):
filters["year"] = context.ui_filters["years"]
logger.info(f"πŸ”§ FILTER BUILDING: Added years filter from UI: {context.ui_filters['years']}")
if context.ui_filters.get("districts"):
# Normalize district names to title case (match Qdrant metadata format)
normalized_districts = [d.title() for d in context.ui_filters['districts']]
filters["district"] = normalized_districts
logger.info(f"πŸ”§ FILTER BUILDING: Added districts filter from UI: {context.ui_filters['districts']} β†’ normalized: {normalized_districts}")
# Merge with extracted context for missing filters
if not filters.get("year") and context.extracted_year:
# Handle both single values and arrays
if isinstance(context.extracted_year, list):
filters["year"] = context.extracted_year
else:
filters["year"] = [context.extracted_year]
logger.info(f"πŸ”§ FILTER BUILDING: Added extracted year filter (UI missing): {context.extracted_year}")
if not filters.get("district") and context.extracted_district:
# Handle both single values and arrays
if isinstance(context.extracted_district, list):
# Normalize district names to title case (match Qdrant metadata format)
normalized = [d.title() for d in context.extracted_district]
filters["district"] = normalized
else:
filters["district"] = [context.extracted_district.title()]
logger.info(f"πŸ”§ FILTER BUILDING: Added extracted district filter (UI missing): {context.extracted_district}")
if not filters.get("sources") and context.extracted_source:
# Handle both single values and arrays
if isinstance(context.extracted_source, list):
filters["sources"] = context.extracted_source
else:
filters["sources"] = [context.extracted_source]
logger.info(f"πŸ”§ FILTER BUILDING: Added extracted source filter (UI missing): {context.extracted_source}")
else:
logger.info(f"πŸ”§ FILTER BUILDING: No UI filters, using extracted context")
# Use extracted context
if context.extracted_source:
# Handle both single values and arrays
if isinstance(context.extracted_source, list):
filters["sources"] = context.extracted_source
else:
filters["sources"] = [context.extracted_source]
logger.info(f"πŸ”§ FILTER BUILDING: Added extracted source filter: {context.extracted_source}")
if context.extracted_year:
# Handle both single values and arrays
if isinstance(context.extracted_year, list):
filters["year"] = context.extracted_year
else:
filters["year"] = [context.extracted_year]
logger.info(f"πŸ”§ FILTER BUILDING: Added extracted year filter: {context.extracted_year}")
if context.extracted_district:
# Handle both single values and arrays
if isinstance(context.extracted_district, list):
filters["district"] = context.extracted_district
else:
filters["district"] = [context.extracted_district]
logger.info(f"πŸ”§ FILTER BUILDING: Added extracted district filter: {context.extracted_district}")
logger.info(f"πŸ”§ FILTER BUILDING: Final filters: {filters}")
return filters
def _generate_conversational_response(self, query: str, documents: List[Any], rag_answer: str, messages: List[Any]) -> str:
"""Generate conversational response from RAG results"""
logger.info("πŸ’¬ RESPONSE GENERATION: Starting conversational response generation")
logger.info(f"πŸ’¬ RESPONSE GENERATION: Processing {len(documents)} documents")
logger.info(f"πŸ’¬ RESPONSE GENERATION: Query: '{query[:50]}...'")
# Create response prompt
logger.info(f"πŸ’¬ RESPONSE GENERATION: Building response prompt")
response_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a helpful audit report assistant. Generate a natural, conversational response.
RULES:
1. Answer the user's question directly and clearly
2. Use the retrieved documents as evidence
3. Be conversational, not technical
4. Don't mention scores, retrieval details, or technical implementation
5. If relevant documents were found, reference them naturally
6. If no relevant documents, explain based on your knowledge (if you have it) or just say you do not have enough information.
7. If the passages have useful facts or numbers, use them in your answer.
8. When you use information from a passage, mention where it came from by using [Doc i] at the end of the sentence. i stands for the number of the document.
9. Do not use the sentence 'Doc i says ...' to say where information came from.
10. If the same thing is said in more than one document, you can mention all of them like this: [Doc i, Doc j, Doc k]
11. Do not just summarize each passage one by one. Group your summaries to highlight the key parts in the explanation.
12. If it makes sense, use bullet points and lists to make your answers easier to understand.
13. You do not need to use every passage. Only use the ones that help answer the question.
14. If the documents do not have the information needed to answer the question, just say you do not have enough information.
TONE: Professional but friendly, like talking to a colleague."""),
HumanMessage(content=f"""User Question: {query}
Retrieved Documents: {len(documents)} documents found
RAG Answer: {rag_answer}
Generate a conversational response:""")
])
try:
logger.info(f"πŸ’¬ RESPONSE GENERATION: Calling LLM for final response")
response = self.llm.invoke(response_prompt.format_messages())
logger.info(f"πŸ’¬ RESPONSE GENERATION: LLM response received: {response.content[:100]}...")
return response.content.strip()
except Exception as e:
logger.error(f"❌ RESPONSE GENERATION: Error during generation: {e}")
logger.info(f"πŸ’¬ RESPONSE GENERATION: Using RAG answer as fallback")
return rag_answer # Fallback to RAG answer
def _generate_conversational_response_without_docs(self, query: str, messages: List[Any]) -> str:
"""Generate conversational response using only LLM knowledge and conversation history"""
logger.info("πŸ’¬ RESPONSE GENERATION (NO DOCS): Starting response generation without documents")
logger.info(f"πŸ’¬ RESPONSE GENERATION (NO DOCS): Query: '{query[:50]}...'")
# Build conversation context
conversation_context = ""
for i, msg in enumerate(messages[-6:]): # Last 6 messages for context
if isinstance(msg, HumanMessage):
conversation_context += f"User: {msg.content}\n"
elif isinstance(msg, AIMessage):
conversation_context += f"Assistant: {msg.content}\n"
# Create response prompt
logger.info(f"πŸ’¬ RESPONSE GENERATION (NO DOCS): Building response prompt")
response_prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""You are a helpful audit report assistant. Generate a natural, conversational response.
RULES:
1. Answer the user's question directly and clearly based on your knowledge
2. Use conversation history for context
3. Be conversational, not technical
4. Acknowledge if the answer is based on general knowledge rather than specific documents
5. Stay professional but friendly
TONE: Professional but friendly, like talking to a colleague."""),
HumanMessage(content=f"""Current Question: {query}
Conversation History:
{conversation_context}
Generate a conversational response based on your knowledge:""")
])
try:
logger.info(f"πŸ’¬ RESPONSE GENERATION (NO DOCS): Calling LLM")
response = self.llm.invoke(response_prompt.format_messages())
logger.info(f"πŸ’¬ RESPONSE GENERATION (NO DOCS): LLM response received: {response.content[:100]}...")
return response.content.strip()
except Exception as e:
logger.error(f"❌ RESPONSE GENERATION (NO DOCS): Error during generation: {e}")
return "I apologize, but I encountered an error. Please try asking your question differently."
def chat(self, user_input: str, conversation_id: str = "default") -> Dict[str, Any]:
"""Main chat interface"""
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Processing '{user_input[:50]}...'")
# Load conversation
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Loading conversation {conversation_id}")
conversation_file = self.conversations_dir / f"{conversation_id}.json"
conversation = self._load_conversation(conversation_file)
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Loaded {len(conversation['messages'])} previous messages")
# Add user message
conversation["messages"].append(HumanMessage(content=user_input))
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Added user message to conversation")
# Prepare state
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Preparing state for graph execution")
state = MultiAgentState(
conversation_id=conversation_id,
messages=conversation["messages"],
current_query=user_input,
query_context=None,
rag_query=None,
rag_filters=None,
retrieved_documents=None,
final_response=None,
agent_logs=[],
conversation_context=conversation.get("context", {}),
session_start_time=conversation["session_start_time"],
last_ai_message_time=conversation["last_ai_message_time"]
)
# Run multi-agent graph
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Executing multi-agent graph")
final_state = self.graph.invoke(state)
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Graph execution completed")
# Add AI response to conversation
if final_state["final_response"]:
conversation["messages"].append(AIMessage(content=final_state["final_response"]))
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Added AI response to conversation")
# Update conversation
conversation["last_ai_message_time"] = final_state["last_ai_message_time"]
conversation["context"] = final_state["conversation_context"]
# Save conversation
logger.info(f"πŸ’¬ MULTI-AGENT CHAT: Saving conversation")
self._save_conversation(conversation_file, conversation)
logger.info("βœ… MULTI-AGENT CHAT: Completed")
# Return response and RAG results
return {
'response': final_state["final_response"],
'rag_result': {
'sources': final_state["retrieved_documents"] or [],
'answer': final_state["final_response"]
},
'agent_logs': final_state["agent_logs"],
'actual_rag_query': final_state.get("rag_query", "")
}
def _load_conversation(self, conversation_file: Path) -> Dict[str, Any]:
"""Load conversation from file"""
if conversation_file.exists():
try:
with open(conversation_file) as f:
data = json.load(f)
# Convert message dicts back to LangChain messages
messages = []
for msg_data in data.get("messages", []):
if msg_data["type"] == "human":
messages.append(HumanMessage(content=msg_data["content"]))
elif msg_data["type"] == "ai":
messages.append(AIMessage(content=msg_data["content"]))
data["messages"] = messages
return data
except Exception as e:
logger.warning(f"Could not load conversation: {e}")
# Return default conversation
return {
"messages": [],
"session_start_time": time.time(),
"last_ai_message_time": time.time(),
"context": {}
}
def _save_conversation(self, conversation_file: Path, conversation: Dict[str, Any]):
"""Save conversation to file"""
try:
# Ensure the conversations directory exists with proper permissions
conversation_file.parent.mkdir(parents=True, mode=0o777, exist_ok=True)
# Convert messages to serializable format
messages_data = []
for msg in conversation["messages"]:
if isinstance(msg, HumanMessage):
messages_data.append({"type": "human", "content": msg.content})
elif isinstance(msg, AIMessage):
messages_data.append({"type": "ai", "content": msg.content})
conversation_data = {
"messages": messages_data,
"session_start_time": conversation["session_start_time"],
"last_ai_message_time": conversation["last_ai_message_time"],
"context": conversation.get("context", {})
}
with open(conversation_file, 'w') as f:
json.dump(conversation_data, f, indent=2)
except Exception as e:
logger.error(f"Could not save conversation: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
def get_multi_agent_chatbot():
"""Get multi-agent chatbot instance"""
return MultiAgentRAGChatbot()
if __name__ == "__main__":
# Test the multi-agent system
chatbot = MultiAgentRAGChatbot()
# Test conversation
result = chatbot.chat("List me top 10 challenges in budget allocation for the last 3 years")
print("Response:", result['response'])
print("Agent Logs:", result['agent_logs'])