KeenWoo's picture
Create agent.py
d39220f verified
raw
history blame
20.4 kB
from __future__ import annotations
import os
import json
import base64
import time
import tempfile
import re # <-- ADD THIS LINE
from typing import List, Dict, Any, Optional
# OpenAI for LLM (optional)
try:
from openai import OpenAI
except Exception: # pragma: no cover
OpenAI = None # type: ignore
# LangChain & RAG
from langchain.schema import Document
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
# TTS
try:
from gtts import gTTS
except Exception: # pragma: no cover
gTTS = None # type: ignore
from .prompts import (
SYSTEM_TEMPLATE, ANSWER_TEMPLATE_CALM, ANSWER_TEMPLATE_ADQ,
SAFETY_GUARDRAILS, RISK_FOOTER, render_emotion_guidelines,
# --- Import the new decomposed NLU prompts ---
NLU_ROUTER_PROMPT, SPECIALIST_CLASSIFIER_PROMPT,
EMOTIONAL_SUPPORT_EXAMPLES, PRACTICAL_PLANNING_EXAMPLES,
# Other templates
ROUTER_PROMPT,
ANSWER_TEMPLATE_FACTUAL,
ANSWER_TEMPLATE_GENERAL_KNOWLEDGE,
ANSWER_TEMPLATE_GENERAL,
QUERY_EXPANSION_PROMPT
)
# -----------------------------
# Multimodal Processing Functions
# -----------------------------
def _openai_client() -> Optional[OpenAI]:
api_key = os.getenv("OPENAI_API_KEY", "").strip()
return OpenAI(api_key=api_key) if api_key and OpenAI else None
# In agent.py
def describe_image(image_path: str) -> str:
"""Uses a vision model to describe an image for context."""
client = _openai_client()
if not client:
return "(Image description failed: OpenAI API key not configured.)"
try:
# --- FIX START ---
# Determine the MIME type based on the file extension
extension = os.path.splitext(image_path)[1].lower()
if extension == ".png":
mime_type = "image/png"
elif extension in [".jpg", ".jpeg"]:
mime_type = "image/jpeg"
elif extension == ".gif":
mime_type = "image/gif"
elif extension == ".webp":
mime_type = "image/webp"
else:
# Default to JPEG, but this handles the most common cases
mime_type = "image/jpeg"
# --- FIX END ---
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Describe this image in a concise, factual way for a memory journal. Focus on people, places, and key objects. For example: 'A photo of John and Mary smiling on a bench at the park.'"},
{
"type": "image_url",
# Use the dynamically determined MIME type
"image_url": {"url": f"data:{mime_type};base64,{base64_image}"}
}
],
}
],
max_tokens=100,
)
return response.choices[0].message.content or "No description available."
except Exception as e:
return f"[Image description error: {e}]"
# -----------------------------
# NLU Classification Function
# -----------------------------
def detect_tags_from_query(query: str, behavior_options: list, emotion_options: list, topic_options: list, context_options: list, settings: dict = None) -> Dict[str, Any]:
"""Uses a two-step NLU process: Route -> Select Examples -> Classify."""
# --- STEP 1: Route the query to determine the primary goal ---
router_prompt = NLU_ROUTER_PROMPT.format(query=query)
router_messages = [{"role": "user", "content": router_prompt}]
primary_goal = call_llm(router_messages, temperature=0.0).strip().lower()
if "practical" in primary_goal:
selected_examples = PRACTICAL_PLANNING_EXAMPLES
goal_for_prompt = "Practical Planning"
else:
selected_examples = EMOTIONAL_SUPPORT_EXAMPLES
goal_for_prompt = "Emotional Support"
if settings and settings.get("debug_mode"):
print(f"\n--- NLU Router ---\nGoal: {goal_for_prompt}\n------------------\n")
# --- STEP 2: Use the Specialist Classifier with selected examples ---
behavior_str = ", ".join(f'"{opt}"' for opt in behavior_options if opt != "None")
emotion_str = ", ".join(f'"{opt}"' for opt in emotion_options if opt != "None")
topic_str = ", ".join(f'"{opt}"' for opt in topic_options if opt != "None")
context_str = ", ".join(f'"{opt}"' for opt in context_options if opt != "None")
prompt = SPECIALIST_CLASSIFIER_PROMPT.format(
primary_goal=goal_for_prompt,
examples=selected_examples,
behavior_options=behavior_str,
emotion_options=emotion_str,
topic_options=topic_str,
context_options=context_str,
query=query
)
messages = [{"role": "system", "content": "You are a helpful NLU classification assistant. Follow the instructions precisely."}, {"role": "user", "content": prompt}]
response_str = call_llm(messages, temperature=0.1)
if settings and settings.get("debug_mode"):
print(f"\n--- NLU Specialist Full Response ---\n{response_str}\n----------------------------------\n")
result_dict = {
"detected_behaviors": [], "detected_emotion": "None",
"detected_topic": "None", "detected_contexts": []
}
try:
# --- ROBUST PARSING LOGIC ---
start_brace = response_str.find('{')
end_brace = response_str.rfind('}')
if start_brace != -1 and end_brace != -1 and end_brace > start_brace:
json_str = response_str[start_brace : end_brace + 1]
result = json.loads(json_str)
behaviors = result.get("detected_behaviors")
result_dict["detected_behaviors"] = [b for b in behaviors if b in behavior_options] if behaviors else []
emotion = result.get("detected_emotion")
result_dict["detected_emotion"] = emotion if emotion in emotion_options else "None"
topic = result.get("detected_topic")
result_dict["detected_topic"] = topic if topic in topic_options else "None"
contexts = result.get("detected_contexts")
result_dict["detected_contexts"] = [c for c in contexts if c in context_options] if contexts else []
return result_dict
except (json.JSONDecodeError, AttributeError) as e:
print(f"ERROR parsing CoT JSON: {e}")
return result_dict
# -----------------------------
# Embeddings & VectorStore
# -----------------------------
def _default_embeddings():
"""Lightweight, widely available model."""
model_name = os.getenv("EMBEDDINGS_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
return HuggingFaceEmbeddings(model_name=model_name)
def build_or_load_vectorstore(docs: List[Document], index_path: str, is_personal: bool = False) -> FAISS:
os.makedirs(os.path.dirname(index_path), exist_ok=True)
if os.path.isdir(index_path) and os.path.exists(os.path.join(index_path, "index.faiss")):
try:
return FAISS.load_local(index_path, _default_embeddings(), allow_dangerous_deserialization=True)
except Exception:
pass
if is_personal and not docs:
docs = [Document(page_content="(This is the start of the personal memory journal.)", metadata={"source": "placeholder"})]
vs = FAISS.from_documents(docs, _default_embeddings())
vs.save_local(index_path)
return vs
def texts_from_jsonl(path: str) -> List[Document]:
out: List[Document] = []
try:
with open(path, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
line = line.strip()
if not line: continue
obj = json.loads(line)
txt = obj.get("text") or ""
if not isinstance(txt, str) or not txt.strip(): continue
# fix bugs by adding tags for topic and context
md = {"source": os.path.basename(path), "chunk": i}
for k in ("behaviors", "emotion", "topic_tags", "context_tags"):
if k in obj and obj[k]: # Ensure the key exists and is not empty
md[k] = obj[k]
out.append(Document(page_content=txt, metadata=md))
except Exception:
return []
return out
def bootstrap_vectorstore(sample_paths: List[str] | None = None, index_path: str = "data/faiss_index") -> FAISS:
docs: List[Document] = []
for p in (sample_paths or []):
try:
if p.lower().endswith(".jsonl"):
docs.extend(texts_from_jsonl(p))
else:
with open(p, "r", encoding="utf-8", errors="ignore") as fh:
docs.append(Document(page_content=fh.read(), metadata={"source": os.path.basename(p)}))
except Exception:
continue
if not docs:
docs = [Document(page_content="(empty index)", metadata={"source": "placeholder"})]
return build_or_load_vectorstore(docs, index_path=index_path)
# -----------------------------
# LLM Call
# -----------------------------
# updated the detect_tags_from_query function to call call_llm with a new stop argument,
# but I failed to update the call_llm function itself to accept that argument.
# Now fix call_llm function:
def call_llm(messages: List[Dict[str, str]], temperature: float = 0.6, stop: Optional[List[str]] = None) -> str:
"""Call OpenAI Chat Completions if available; else return a fallback."""
client = _openai_client()
model = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
if not client:
return "(Offline Mode: OpenAI API key not configured.)"
try:
# Prepare arguments for the API call to handle the optional 'stop' parameter
api_args = {
"model": model,
"messages": messages,
"temperature": float(temperature if temperature is not None else 0.6)
}
if stop:
api_args["stop"] = stop
resp = client.chat.completions.create(**api_args)
return (resp.choices[0].message.content or "").strip()
except Exception as e:
return f"[LLM API Error: {e}]"
# -----------------------------
# Prompting & RAG Chain
# -----------------------------
def _format_sources(docs: List[Document]) -> List[str]:
return list(set(d.metadata.get("source", "unknown") for d in docs))
# In agent.py, replace the existing make_rag_chain function with this new one to handle general & specific conversations .
# The logic for the "factual_question" path needs to be updated to perform the expansion query
def make_rag_chain(
vs_general: FAISS,
vs_personal: FAISS,
*,
role: str = "patient",
temperature: float = 0.6,
language: str = "English",
patient_name: str = "the patient",
caregiver_name: str = "the caregiver",
tone: str = "warm",
):
"""Returns a callable that performs the complete, intelligent RAG process."""
def _format_docs(docs: List[Document], default_msg: str) -> str:
if not docs: return default_msg
unique_docs = {doc.page_content: doc for doc in docs}.values()
return "\n".join([f"- {d.page_content.strip()}" for d in unique_docs])
# def _answer_fn(query: str, chat_history: List[Dict[str, str]], scenario_tag: Optional[str] = None, emotion_tag: Optional[str] = None) -> Dict[str, Any]:
def _answer_fn(query: str, chat_history: List[Dict[str, str]], scenario_tag: Optional[str] = None, emotion_tag: Optional[str] = None, topic_tag: Optional[str] = None, context_tags: Optional[List[str]] = None) -> Dict[str, Any]:
router_messages = [{"role": "user", "content": ROUTER_PROMPT.format(query=query)}]
query_type = call_llm(router_messages, temperature=0.0).strip().lower()
print(f"Query classified as: {query_type}")
system_message = SYSTEM_TEMPLATE.format(tone=tone, language=language, patient_name=patient_name or "the patient", caregiver_name=caregiver_name or "the caregiver", guardrails=SAFETY_GUARDRAILS)
messages = [{"role": "system", "content": system_message}]
messages.extend(chat_history)
# --- NEW 'general_knowledge_question' PATH ---
if "general_knowledge_question" in query_type:
user_prompt = ANSWER_TEMPLATE_GENERAL_KNOWLEDGE.format(question=query, language=language)
messages.append({"role": "user", "content": user_prompt})
answer = call_llm(messages, temperature=temperature)
return {"answer": answer, "sources": ["General Knowledge"]}
# --- END NEW PATH ---
elif "factual_question" in query_type:
# ... (This entire section for query expansion and factual search remains the same)
print(f"Performing query expansion for: '{query}'")
expansion_prompt = QUERY_EXPANSION_PROMPT.format(question=query)
expansion_response = call_llm([{"role": "user", "content": expansion_prompt}], temperature=0.1)
try:
clean_response = expansion_response.strip().replace("```json", "").replace("```", "")
expanded_queries = json.loads(clean_response)
search_queries = [query] + expanded_queries
except json.JSONDecodeError:
search_queries = [query]
print(f"Searching with queries: {search_queries}")
retriever_personal = vs_personal.as_retriever(search_kwargs={"k": 2})
retriever_general = vs_general.as_retriever(search_kwargs={"k": 2})
all_docs = []
for q in search_queries:
all_docs.extend(retriever_personal.invoke(q))
all_docs.extend(retriever_general.invoke(q))
context = _format_docs(all_docs, "(No relevant information found in the memory journal.)")
user_prompt = ANSWER_TEMPLATE_FACTUAL.format(context=context, question=query, language=language)
messages.append({"role": "user", "content": user_prompt})
answer = call_llm(messages, temperature=temperature)
return {"answer": answer, "sources": _format_sources(all_docs)}
elif "general_conversation" in query_type:
user_prompt = ANSWER_TEMPLATE_GENERAL.format(question=query, language=language)
messages.append({"role": "user", "content": user_prompt})
answer = call_llm(messages, temperature=temperature)
return {"answer": answer, "sources": []}
else: # Default to the original caregiving logic
# ... (This entire section for caregiving scenarios remains the same)
search_filter = {}
if scenario_tag and scenario_tag != "None":
search_filter["behaviors"] = scenario_tag.lower()
if emotion_tag and emotion_tag != "None":
search_filter["emotion"] = emotion_tag.lower()
# fix bug by adding topic tag and context tag
if topic_tag and topic_tag != "None": # <-- ADD THESE TWO LINES
search_filter["topic_tags"] = topic_tag.lower()
if context_tags: # <-- ADD THESE TWO LINES
search_filter["context_tags"] = {"in": [tag.lower() for tag in context_tags]}
# --- Robust Search Strategy ---
# 1. Start with a general, unfiltered search to always get text-based matches.
retriever_personal = vs_personal.as_retriever(search_kwargs={"k": 3})
retriever_general = vs_general.as_retriever(search_kwargs={"k": 3})
personal_docs = retriever_personal.invoke(query)
general_docs = retriever_general.invoke(query)
# 2. If filters exist, perform a second, more specific search and add the results.
if search_filter:
print(f"Performing additional search with filter: {search_filter}")
personal_docs.extend(vs_personal.similarity_search(query, k=3, filter=search_filter))
general_docs.extend(vs_general.similarity_search(query, k=3, filter=search_filter))
# 3. Combine and de-duplicate the results to get the best of both searches.
all_personal_docs = list({doc.page_content: doc for doc in personal_docs}.values())
all_general_docs = list({doc.page_content: doc for doc in general_docs}.values())
# 4. Define the context variables based on the new, combined results.
personal_context = _format_docs(all_personal_docs, "(No relevant personal memories found.)")
general_context = _format_docs(all_general_docs, "(No general guidance found.)")
first_emotion = None
all_docs_care = all_personal_docs + all_general_docs
# -- end of Robust Search Strategy
for doc in all_docs_care:
if "emotion" in doc.metadata and doc.metadata["emotion"]:
emotion_data = doc.metadata["emotion"]
if isinstance(emotion_data, list): first_emotion = emotion_data[0]
else: first_emotion = emotion_data
if first_emotion: break
emotions_context = render_emotion_guidelines(first_emotion or emotion_tag)
is_tagged_scenario = (scenario_tag and scenario_tag != "None") or (emotion_tag and emotion_tag != "None") or (first_emotion is not None)
template = ANSWER_TEMPLATE_ADQ if is_tagged_scenario else ANSWER_TEMPLATE_CALM
if template == ANSWER_TEMPLATE_ADQ:
user_prompt = template.format(general_context=general_context, personal_context=personal_context, question=query, scenario_tag=scenario_tag, emotions_context=emotions_context, role=role, language=language)
else:
combined_context = f"General Guidance:\n{general_context}\n\nPersonal Memories:\n{personal_context}"
user_prompt = template.format(context=combined_context, question=query, language=language)
messages.append({"role": "user", "content": user_prompt})
answer = call_llm(messages, temperature=temperature)
high_risk_scenarios = ["exit_seeking", "wandering", "elopement"]
if scenario_tag and scenario_tag.lower() in high_risk_scenarios:
answer += f"\n\n---\n{RISK_FOOTER}"
return {"answer": answer, "sources": _format_sources(all_docs_care)}
return _answer_fn
# Fix bug by adding topic tag ... how about context tag??
def answer_query(chain, question: str, **kwargs) -> Dict[str, Any]:
if not callable(chain): return {"answer": "[Error: RAG chain is not callable]", "sources": []}
chat_history = kwargs.get("chat_history", [])
scenario_tag = kwargs.get("scenario_tag")
emotion_tag = kwargs.get("emotion_tag")
topic_tag = kwargs.get("topic_tag") # <-- ADD THIS LINE
context_tags = kwargs.get("context_tags") # <-- ADD THIS LINE
try:
return chain(question, chat_history=chat_history, scenario_tag=scenario_tag, emotion_tag=emotion_tag, topic_tag=topic_tag, context_tags=context_tags) # <-- ADD topic_tag and context_tags
except Exception as e:
print(f"ERROR in answer_query: {e}")
return {"answer": f"[Error executing chain: {e}]", "sources": []}
# -----------------------------
# TTS & Transcription
# -----------------------------
def synthesize_tts(text: str, lang: str = "en"):
if not text or gTTS is None: return None
try:
fd, path = tempfile.mkstemp(suffix=".mp3")
os.close(fd)
tts = gTTS(text=text, lang=(lang or "en"))
tts.save(path)
return path
except Exception:
return None
def transcribe_audio(filepath: str, lang: str = "en"):
client = _openai_client()
if not client:
return "[Transcription failed: API key not configured]"
api_args = {"model": "whisper-1"}
if lang and lang != "auto":
api_args["language"] = lang
with open(filepath, "rb") as audio_file:
transcription = client.audio.transcriptions.create(file=audio_file, **api_args)
return transcription.text