Spaces:
Running
Running
| from __future__ import annotations | |
| import os | |
| import json | |
| import base64 | |
| import time | |
| import tempfile | |
| import re # <-- ADD THIS LINE | |
| from typing import List, Dict, Any, Optional | |
| # OpenAI for LLM (optional) | |
| try: | |
| from openai import OpenAI | |
| except Exception: # pragma: no cover | |
| OpenAI = None # type: ignore | |
| # LangChain & RAG | |
| from langchain.schema import Document | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| # TTS | |
| try: | |
| from gtts import gTTS | |
| except Exception: # pragma: no cover | |
| gTTS = None # type: ignore | |
| from .prompts import ( | |
| SYSTEM_TEMPLATE, ANSWER_TEMPLATE_CALM, ANSWER_TEMPLATE_ADQ, | |
| SAFETY_GUARDRAILS, RISK_FOOTER, render_emotion_guidelines, | |
| # --- Import the new decomposed NLU prompts --- | |
| NLU_ROUTER_PROMPT, SPECIALIST_CLASSIFIER_PROMPT, | |
| EMOTIONAL_SUPPORT_EXAMPLES, PRACTICAL_PLANNING_EXAMPLES, | |
| # Other templates | |
| ROUTER_PROMPT, | |
| ANSWER_TEMPLATE_FACTUAL, | |
| ANSWER_TEMPLATE_GENERAL_KNOWLEDGE, | |
| ANSWER_TEMPLATE_GENERAL, | |
| QUERY_EXPANSION_PROMPT | |
| ) | |
| # ----------------------------- | |
| # Multimodal Processing Functions | |
| # ----------------------------- | |
| def _openai_client() -> Optional[OpenAI]: | |
| api_key = os.getenv("OPENAI_API_KEY", "").strip() | |
| return OpenAI(api_key=api_key) if api_key and OpenAI else None | |
| # In agent.py | |
| def describe_image(image_path: str) -> str: | |
| """Uses a vision model to describe an image for context.""" | |
| client = _openai_client() | |
| if not client: | |
| return "(Image description failed: OpenAI API key not configured.)" | |
| try: | |
| # --- FIX START --- | |
| # Determine the MIME type based on the file extension | |
| extension = os.path.splitext(image_path)[1].lower() | |
| if extension == ".png": | |
| mime_type = "image/png" | |
| elif extension in [".jpg", ".jpeg"]: | |
| mime_type = "image/jpeg" | |
| elif extension == ".gif": | |
| mime_type = "image/gif" | |
| elif extension == ".webp": | |
| mime_type = "image/webp" | |
| else: | |
| # Default to JPEG, but this handles the most common cases | |
| mime_type = "image/jpeg" | |
| # --- FIX END --- | |
| with open(image_path, "rb") as image_file: | |
| base64_image = base64.b64encode(image_file.read()).decode('utf-8') | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": "Describe this image in a concise, factual way for a memory journal. Focus on people, places, and key objects. For example: 'A photo of John and Mary smiling on a bench at the park.'"}, | |
| { | |
| "type": "image_url", | |
| # Use the dynamically determined MIME type | |
| "image_url": {"url": f"data:{mime_type};base64,{base64_image}"} | |
| } | |
| ], | |
| } | |
| ], | |
| max_tokens=100, | |
| ) | |
| return response.choices[0].message.content or "No description available." | |
| except Exception as e: | |
| return f"[Image description error: {e}]" | |
| # ----------------------------- | |
| # NLU Classification Function | |
| # ----------------------------- | |
| def detect_tags_from_query(query: str, behavior_options: list, emotion_options: list, topic_options: list, context_options: list, settings: dict = None) -> Dict[str, Any]: | |
| """Uses a two-step NLU process: Route -> Select Examples -> Classify.""" | |
| # --- STEP 1: Route the query to determine the primary goal --- | |
| router_prompt = NLU_ROUTER_PROMPT.format(query=query) | |
| router_messages = [{"role": "user", "content": router_prompt}] | |
| primary_goal = call_llm(router_messages, temperature=0.0).strip().lower() | |
| if "practical" in primary_goal: | |
| selected_examples = PRACTICAL_PLANNING_EXAMPLES | |
| goal_for_prompt = "Practical Planning" | |
| else: | |
| selected_examples = EMOTIONAL_SUPPORT_EXAMPLES | |
| goal_for_prompt = "Emotional Support" | |
| if settings and settings.get("debug_mode"): | |
| print(f"\n--- NLU Router ---\nGoal: {goal_for_prompt}\n------------------\n") | |
| # --- STEP 2: Use the Specialist Classifier with selected examples --- | |
| behavior_str = ", ".join(f'"{opt}"' for opt in behavior_options if opt != "None") | |
| emotion_str = ", ".join(f'"{opt}"' for opt in emotion_options if opt != "None") | |
| topic_str = ", ".join(f'"{opt}"' for opt in topic_options if opt != "None") | |
| context_str = ", ".join(f'"{opt}"' for opt in context_options if opt != "None") | |
| prompt = SPECIALIST_CLASSIFIER_PROMPT.format( | |
| primary_goal=goal_for_prompt, | |
| examples=selected_examples, | |
| behavior_options=behavior_str, | |
| emotion_options=emotion_str, | |
| topic_options=topic_str, | |
| context_options=context_str, | |
| query=query | |
| ) | |
| messages = [{"role": "system", "content": "You are a helpful NLU classification assistant. Follow the instructions precisely."}, {"role": "user", "content": prompt}] | |
| response_str = call_llm(messages, temperature=0.1) | |
| if settings and settings.get("debug_mode"): | |
| print(f"\n--- NLU Specialist Full Response ---\n{response_str}\n----------------------------------\n") | |
| result_dict = { | |
| "detected_behaviors": [], "detected_emotion": "None", | |
| "detected_topic": "None", "detected_contexts": [] | |
| } | |
| try: | |
| # --- ROBUST PARSING LOGIC --- | |
| start_brace = response_str.find('{') | |
| end_brace = response_str.rfind('}') | |
| if start_brace != -1 and end_brace != -1 and end_brace > start_brace: | |
| json_str = response_str[start_brace : end_brace + 1] | |
| result = json.loads(json_str) | |
| behaviors = result.get("detected_behaviors") | |
| result_dict["detected_behaviors"] = [b for b in behaviors if b in behavior_options] if behaviors else [] | |
| emotion = result.get("detected_emotion") | |
| result_dict["detected_emotion"] = emotion if emotion in emotion_options else "None" | |
| topic = result.get("detected_topic") | |
| result_dict["detected_topic"] = topic if topic in topic_options else "None" | |
| contexts = result.get("detected_contexts") | |
| result_dict["detected_contexts"] = [c for c in contexts if c in context_options] if contexts else [] | |
| return result_dict | |
| except (json.JSONDecodeError, AttributeError) as e: | |
| print(f"ERROR parsing CoT JSON: {e}") | |
| return result_dict | |
| # ----------------------------- | |
| # Embeddings & VectorStore | |
| # ----------------------------- | |
| def _default_embeddings(): | |
| """Lightweight, widely available model.""" | |
| model_name = os.getenv("EMBEDDINGS_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
| return HuggingFaceEmbeddings(model_name=model_name) | |
| def build_or_load_vectorstore(docs: List[Document], index_path: str, is_personal: bool = False) -> FAISS: | |
| os.makedirs(os.path.dirname(index_path), exist_ok=True) | |
| if os.path.isdir(index_path) and os.path.exists(os.path.join(index_path, "index.faiss")): | |
| try: | |
| return FAISS.load_local(index_path, _default_embeddings(), allow_dangerous_deserialization=True) | |
| except Exception: | |
| pass | |
| if is_personal and not docs: | |
| docs = [Document(page_content="(This is the start of the personal memory journal.)", metadata={"source": "placeholder"})] | |
| vs = FAISS.from_documents(docs, _default_embeddings()) | |
| vs.save_local(index_path) | |
| return vs | |
| def texts_from_jsonl(path: str) -> List[Document]: | |
| out: List[Document] = [] | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| for i, line in enumerate(f): | |
| line = line.strip() | |
| if not line: continue | |
| obj = json.loads(line) | |
| txt = obj.get("text") or "" | |
| if not isinstance(txt, str) or not txt.strip(): continue | |
| # fix bugs by adding tags for topic and context | |
| md = {"source": os.path.basename(path), "chunk": i} | |
| for k in ("behaviors", "emotion", "topic_tags", "context_tags"): | |
| if k in obj and obj[k]: # Ensure the key exists and is not empty | |
| md[k] = obj[k] | |
| out.append(Document(page_content=txt, metadata=md)) | |
| except Exception: | |
| return [] | |
| return out | |
| def bootstrap_vectorstore(sample_paths: List[str] | None = None, index_path: str = "data/faiss_index") -> FAISS: | |
| docs: List[Document] = [] | |
| for p in (sample_paths or []): | |
| try: | |
| if p.lower().endswith(".jsonl"): | |
| docs.extend(texts_from_jsonl(p)) | |
| else: | |
| with open(p, "r", encoding="utf-8", errors="ignore") as fh: | |
| docs.append(Document(page_content=fh.read(), metadata={"source": os.path.basename(p)})) | |
| except Exception: | |
| continue | |
| if not docs: | |
| docs = [Document(page_content="(empty index)", metadata={"source": "placeholder"})] | |
| return build_or_load_vectorstore(docs, index_path=index_path) | |
| # ----------------------------- | |
| # LLM Call | |
| # ----------------------------- | |
| # updated the detect_tags_from_query function to call call_llm with a new stop argument, | |
| # but I failed to update the call_llm function itself to accept that argument. | |
| # Now fix call_llm function: | |
| def call_llm(messages: List[Dict[str, str]], temperature: float = 0.6, stop: Optional[List[str]] = None) -> str: | |
| """Call OpenAI Chat Completions if available; else return a fallback.""" | |
| client = _openai_client() | |
| model = os.getenv("OPENAI_MODEL", "gpt-4o-mini") | |
| if not client: | |
| return "(Offline Mode: OpenAI API key not configured.)" | |
| try: | |
| # Prepare arguments for the API call to handle the optional 'stop' parameter | |
| api_args = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": float(temperature if temperature is not None else 0.6) | |
| } | |
| if stop: | |
| api_args["stop"] = stop | |
| resp = client.chat.completions.create(**api_args) | |
| return (resp.choices[0].message.content or "").strip() | |
| except Exception as e: | |
| return f"[LLM API Error: {e}]" | |
| # ----------------------------- | |
| # Prompting & RAG Chain | |
| # ----------------------------- | |
| def _format_sources(docs: List[Document]) -> List[str]: | |
| return list(set(d.metadata.get("source", "unknown") for d in docs)) | |
| # In agent.py, replace the existing make_rag_chain function with this new one to handle general & specific conversations . | |
| # The logic for the "factual_question" path needs to be updated to perform the expansion query | |
| def make_rag_chain( | |
| vs_general: FAISS, | |
| vs_personal: FAISS, | |
| *, | |
| role: str = "patient", | |
| temperature: float = 0.6, | |
| language: str = "English", | |
| patient_name: str = "the patient", | |
| caregiver_name: str = "the caregiver", | |
| tone: str = "warm", | |
| ): | |
| """Returns a callable that performs the complete, intelligent RAG process.""" | |
| def _format_docs(docs: List[Document], default_msg: str) -> str: | |
| if not docs: return default_msg | |
| unique_docs = {doc.page_content: doc for doc in docs}.values() | |
| return "\n".join([f"- {d.page_content.strip()}" for d in unique_docs]) | |
| # def _answer_fn(query: str, chat_history: List[Dict[str, str]], scenario_tag: Optional[str] = None, emotion_tag: Optional[str] = None) -> Dict[str, Any]: | |
| def _answer_fn(query: str, chat_history: List[Dict[str, str]], scenario_tag: Optional[str] = None, emotion_tag: Optional[str] = None, topic_tag: Optional[str] = None, context_tags: Optional[List[str]] = None) -> Dict[str, Any]: | |
| router_messages = [{"role": "user", "content": ROUTER_PROMPT.format(query=query)}] | |
| query_type = call_llm(router_messages, temperature=0.0).strip().lower() | |
| print(f"Query classified as: {query_type}") | |
| system_message = SYSTEM_TEMPLATE.format(tone=tone, language=language, patient_name=patient_name or "the patient", caregiver_name=caregiver_name or "the caregiver", guardrails=SAFETY_GUARDRAILS) | |
| messages = [{"role": "system", "content": system_message}] | |
| messages.extend(chat_history) | |
| # --- NEW 'general_knowledge_question' PATH --- | |
| if "general_knowledge_question" in query_type: | |
| user_prompt = ANSWER_TEMPLATE_GENERAL_KNOWLEDGE.format(question=query, language=language) | |
| messages.append({"role": "user", "content": user_prompt}) | |
| answer = call_llm(messages, temperature=temperature) | |
| return {"answer": answer, "sources": ["General Knowledge"]} | |
| # --- END NEW PATH --- | |
| elif "factual_question" in query_type: | |
| # ... (This entire section for query expansion and factual search remains the same) | |
| print(f"Performing query expansion for: '{query}'") | |
| expansion_prompt = QUERY_EXPANSION_PROMPT.format(question=query) | |
| expansion_response = call_llm([{"role": "user", "content": expansion_prompt}], temperature=0.1) | |
| try: | |
| clean_response = expansion_response.strip().replace("```json", "").replace("```", "") | |
| expanded_queries = json.loads(clean_response) | |
| search_queries = [query] + expanded_queries | |
| except json.JSONDecodeError: | |
| search_queries = [query] | |
| print(f"Searching with queries: {search_queries}") | |
| retriever_personal = vs_personal.as_retriever(search_kwargs={"k": 2}) | |
| retriever_general = vs_general.as_retriever(search_kwargs={"k": 2}) | |
| all_docs = [] | |
| for q in search_queries: | |
| all_docs.extend(retriever_personal.invoke(q)) | |
| all_docs.extend(retriever_general.invoke(q)) | |
| context = _format_docs(all_docs, "(No relevant information found in the memory journal.)") | |
| user_prompt = ANSWER_TEMPLATE_FACTUAL.format(context=context, question=query, language=language) | |
| messages.append({"role": "user", "content": user_prompt}) | |
| answer = call_llm(messages, temperature=temperature) | |
| return {"answer": answer, "sources": _format_sources(all_docs)} | |
| elif "general_conversation" in query_type: | |
| user_prompt = ANSWER_TEMPLATE_GENERAL.format(question=query, language=language) | |
| messages.append({"role": "user", "content": user_prompt}) | |
| answer = call_llm(messages, temperature=temperature) | |
| return {"answer": answer, "sources": []} | |
| else: # Default to the original caregiving logic | |
| # ... (This entire section for caregiving scenarios remains the same) | |
| search_filter = {} | |
| if scenario_tag and scenario_tag != "None": | |
| search_filter["behaviors"] = scenario_tag.lower() | |
| if emotion_tag and emotion_tag != "None": | |
| search_filter["emotion"] = emotion_tag.lower() | |
| # fix bug by adding topic tag and context tag | |
| if topic_tag and topic_tag != "None": # <-- ADD THESE TWO LINES | |
| search_filter["topic_tags"] = topic_tag.lower() | |
| if context_tags: # <-- ADD THESE TWO LINES | |
| search_filter["context_tags"] = {"in": [tag.lower() for tag in context_tags]} | |
| # --- Robust Search Strategy --- | |
| # 1. Start with a general, unfiltered search to always get text-based matches. | |
| retriever_personal = vs_personal.as_retriever(search_kwargs={"k": 3}) | |
| retriever_general = vs_general.as_retriever(search_kwargs={"k": 3}) | |
| personal_docs = retriever_personal.invoke(query) | |
| general_docs = retriever_general.invoke(query) | |
| # 2. If filters exist, perform a second, more specific search and add the results. | |
| if search_filter: | |
| print(f"Performing additional search with filter: {search_filter}") | |
| personal_docs.extend(vs_personal.similarity_search(query, k=3, filter=search_filter)) | |
| general_docs.extend(vs_general.similarity_search(query, k=3, filter=search_filter)) | |
| # 3. Combine and de-duplicate the results to get the best of both searches. | |
| all_personal_docs = list({doc.page_content: doc for doc in personal_docs}.values()) | |
| all_general_docs = list({doc.page_content: doc for doc in general_docs}.values()) | |
| # 4. Define the context variables based on the new, combined results. | |
| personal_context = _format_docs(all_personal_docs, "(No relevant personal memories found.)") | |
| general_context = _format_docs(all_general_docs, "(No general guidance found.)") | |
| first_emotion = None | |
| all_docs_care = all_personal_docs + all_general_docs | |
| # -- end of Robust Search Strategy | |
| for doc in all_docs_care: | |
| if "emotion" in doc.metadata and doc.metadata["emotion"]: | |
| emotion_data = doc.metadata["emotion"] | |
| if isinstance(emotion_data, list): first_emotion = emotion_data[0] | |
| else: first_emotion = emotion_data | |
| if first_emotion: break | |
| emotions_context = render_emotion_guidelines(first_emotion or emotion_tag) | |
| is_tagged_scenario = (scenario_tag and scenario_tag != "None") or (emotion_tag and emotion_tag != "None") or (first_emotion is not None) | |
| template = ANSWER_TEMPLATE_ADQ if is_tagged_scenario else ANSWER_TEMPLATE_CALM | |
| if template == ANSWER_TEMPLATE_ADQ: | |
| user_prompt = template.format(general_context=general_context, personal_context=personal_context, question=query, scenario_tag=scenario_tag, emotions_context=emotions_context, role=role, language=language) | |
| else: | |
| combined_context = f"General Guidance:\n{general_context}\n\nPersonal Memories:\n{personal_context}" | |
| user_prompt = template.format(context=combined_context, question=query, language=language) | |
| messages.append({"role": "user", "content": user_prompt}) | |
| answer = call_llm(messages, temperature=temperature) | |
| high_risk_scenarios = ["exit_seeking", "wandering", "elopement"] | |
| if scenario_tag and scenario_tag.lower() in high_risk_scenarios: | |
| answer += f"\n\n---\n{RISK_FOOTER}" | |
| return {"answer": answer, "sources": _format_sources(all_docs_care)} | |
| return _answer_fn | |
| # Fix bug by adding topic tag ... how about context tag?? | |
| def answer_query(chain, question: str, **kwargs) -> Dict[str, Any]: | |
| if not callable(chain): return {"answer": "[Error: RAG chain is not callable]", "sources": []} | |
| chat_history = kwargs.get("chat_history", []) | |
| scenario_tag = kwargs.get("scenario_tag") | |
| emotion_tag = kwargs.get("emotion_tag") | |
| topic_tag = kwargs.get("topic_tag") # <-- ADD THIS LINE | |
| context_tags = kwargs.get("context_tags") # <-- ADD THIS LINE | |
| try: | |
| return chain(question, chat_history=chat_history, scenario_tag=scenario_tag, emotion_tag=emotion_tag, topic_tag=topic_tag, context_tags=context_tags) # <-- ADD topic_tag and context_tags | |
| except Exception as e: | |
| print(f"ERROR in answer_query: {e}") | |
| return {"answer": f"[Error executing chain: {e}]", "sources": []} | |
| # ----------------------------- | |
| # TTS & Transcription | |
| # ----------------------------- | |
| def synthesize_tts(text: str, lang: str = "en"): | |
| if not text or gTTS is None: return None | |
| try: | |
| fd, path = tempfile.mkstemp(suffix=".mp3") | |
| os.close(fd) | |
| tts = gTTS(text=text, lang=(lang or "en")) | |
| tts.save(path) | |
| return path | |
| except Exception: | |
| return None | |
| def transcribe_audio(filepath: str, lang: str = "en"): | |
| client = _openai_client() | |
| if not client: | |
| return "[Transcription failed: API key not configured]" | |
| api_args = {"model": "whisper-1"} | |
| if lang and lang != "auto": | |
| api_args["language"] = lang | |
| with open(filepath, "rb") as audio_file: | |
| transcription = client.audio.transcriptions.create(file=audio_file, **api_args) | |
| return transcription.text | |