Spaces:
Running
Running
| import os | |
| import json | |
| import shutil | |
| import gradio as gr | |
| import tempfile | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| from pytube import YouTube | |
| from pathlib import Path | |
| import re | |
| import pandas as pd | |
| # --- Agent Imports --- | |
| try: | |
| from alz_companion.agent import ( | |
| bootstrap_vectorstore, make_rag_chain, answer_query, synthesize_tts, | |
| transcribe_audio, detect_tags_from_query, describe_image, build_or_load_vectorstore, | |
| _default_embeddings, route_query_type, call_llm | |
| ) | |
| from alz_companion.prompts import ( | |
| BEHAVIOUR_TAGS, EMOTION_STYLES, FAITHFULNESS_JUDGE_PROMPT | |
| ) | |
| from langchain.schema import Document | |
| from langchain_community.vectorstores import FAISS | |
| AGENT_OK = True | |
| except Exception as e: | |
| AGENT_OK = False | |
| class Document: | |
| def __init__(self, page_content, metadata): self.page_content, self.metadata = page_content, metadata | |
| class FAISS: | |
| def __init__(self): | |
| self.docstore = type('obj', (object,), {'_dict': {}})() | |
| def add_documents(self, docs): | |
| start_idx = len(self.docstore._dict) | |
| for i, d in enumerate(docs, start_idx): | |
| self.docstore._dict[i] = d | |
| def save_local(self, path): pass | |
| def from_documents(cls, docs, embeddings=None): | |
| inst = cls() | |
| inst.add_documents(docs) | |
| return inst | |
| def build_or_load_vectorstore(docs, index_path, is_personal=False): return FAISS.from_documents(docs or [], embeddings=None) | |
| def bootstrap_vectorstore(sample_paths=None, index_path="data/"): return object() | |
| def make_rag_chain(vs_general, vs_personal, **kwargs): return lambda q, **k: {"answer": f"(Demo) You asked: {q}", "sources": []} | |
| def answer_query(chain, q, **kwargs): return chain(q, **kwargs) | |
| def synthesize_tts(text: str, lang: str = "en"): return None | |
| def transcribe_audio(filepath: str, lang: str = "en"): return "This is a transcribed message." | |
| def detect_tags_from_query(*args, **kwargs): return {"detected_behavior": "None", "detected_emotion": "None"} | |
| def describe_image(image_path: str): return "This is a description of an image." | |
| def _default_embeddings(): return None | |
| def route_query_type(query: str): return "general_conversation" | |
| def call_llm(messages, **kwargs): return "Cannot call LLM in fallback mode." | |
| BEHAVIOUR_TAGS, EMOTION_STYLES, FAITHFULNESS_JUDGE_PROMPT = {"None": []}, {"None": {}}, "" | |
| print(f"WARNING: Could not import from alz_companion ({e}). Running in UI-only demo mode.") | |
| # --- NEW: Import for Evaluation Logic --- | |
| try: | |
| from evaluate import load_test_fixtures, run_comprehensive_evaluation | |
| except ImportError: | |
| # Fallback if evaluate.py is not found | |
| def load_test_fixtures(): print("WARNING: evaluate.py not found.") | |
| def run_comprehensive_evaluation(*args, **kwargs): return "Evaluation module not found.", [] | |
| # --- Centralized Configuration --- | |
| CONFIG = { | |
| "themes": ["All", "The Father", "Still Alice", "Away from Her", "Alive Inside", "General Caregiving"], | |
| "roles": ["patient", "caregiver"], | |
| "disease_stages": ["Default: Mild Stage", "Moderate Stage", "Advanced Stage"], | |
| "behavior_tags": ["None"] + list(BEHAVIOUR_TAGS.keys()), | |
| "emotion_tags": ["None"] + list(EMOTION_STYLES.keys()), | |
| "topic_tags": ["None", "caregiving_advice", "medical_fact", "personal_story", "research_update", "treatment_option:home_safety", "treatment_option:long_term_care", "treatment_option:music_therapy", "treatment_option:reassurance", "treatment_option:routine_structuring", "treatment_option:validation_therapy"], | |
| "context_tags": ["None", "disease_stage_mild", "disease_stage_moderate", "disease_stage_advanced", "disease_stage_unspecified", "interaction_mode_one_to_one", "interaction_mode_small_group", "interaction_mode_group_activity", "relationship_family", "relationship_spouse", "relationship_staff_or_caregiver", "relationship_unspecified", "setting_home_or_community", "setting_care_home", "setting_clinic_or_hospital"], | |
| "languages": {"English": "en", "Chinese": "zh", "Cantonese": "zh-yue", "Korean": "ko", "Japanese": "ja", "Malay": "ms", "French": "fr", "Spanish": "es", "Hindi": "hi", "Arabic": "ar"}, | |
| "tones": ["warm", "empathetic", "caring", "reassuring", "calm", "optimistic", "motivating", "neutral", "formal", "humorous"], | |
| # --- ADD THIS NEW KEY AND LIST --- | |
| "music_moods": [ | |
| "Confusion or Disorientation", | |
| "Reminiscence and Connection", | |
| "Sundowning or Restlessness", | |
| "Sadness or Longing", | |
| "Anxiety or Fear", | |
| "Agitation or Anger", | |
| "Joy or Affection" | |
| ] | |
| # --- END OF ADDITION --- | |
| } | |
| # --- File Management & Vector Store Logic --- | |
| def _storage_root() -> Path: | |
| for p in [Path(os.getenv("SPACE_STORAGE", "")), Path("/data"), Path.home() / ".cache" / "alz_companion"]: | |
| if not p: continue | |
| try: | |
| p.mkdir(parents=True, exist_ok=True) | |
| (p / ".write_test").write_text("ok") | |
| (p / ".write_test").unlink(missing_ok=True) | |
| return p | |
| except Exception: continue | |
| tmp = Path(tempfile.gettempdir()) / "alz_companion" | |
| tmp.mkdir(parents=True, exist_ok=True) | |
| return tmp | |
| STORAGE_ROOT = _storage_root() | |
| INDEX_BASE = STORAGE_ROOT / "index" | |
| # --- NEW: Define path for the auto-loading folder --- | |
| PERSISTENT_MEMORY_PATH = Path(__file__).parent / "Personal Memory Bank" | |
| # --- END NEW --- | |
| PERSONAL_DATA_BASE = STORAGE_ROOT / "personal" | |
| UPLOADS_BASE = INDEX_BASE / "uploads" | |
| PERSONAL_INDEX_PATH = str(PERSONAL_DATA_BASE / "personal_faiss_index") | |
| NLU_EXAMPLES_INDEX_PATH = str(INDEX_BASE / "nlu_examples_faiss_index") | |
| THEME_PATHS = {t: str(INDEX_BASE / f"faiss_index_{t.replace(' ', '').lower()}") for t in CONFIG["themes"]} | |
| os.makedirs(UPLOADS_BASE, exist_ok=True) | |
| os.makedirs(PERSONAL_DATA_BASE, exist_ok=True) | |
| # --- NEW: Create the folders on startup if it does not exist --- | |
| os.makedirs(PERSISTENT_MEMORY_PATH, exist_ok=True) | |
| # --- END NEW --- | |
| for p in THEME_PATHS.values(): os.makedirs(p, exist_ok=True) | |
| vectorstores = {} | |
| personal_vectorstore = None | |
| nlu_vectorstore = None | |
| try: | |
| personal_vectorstore = build_or_load_vectorstore([], PERSONAL_INDEX_PATH, is_personal=True) | |
| except Exception: | |
| personal_vectorstore = None | |
| def bootstrap_nlu_vectorstore(example_file: str, index_path: str) -> FAISS: | |
| if not os.path.exists(example_file): | |
| print(f"WARNING: NLU example file not found at {example_file}. NLU will be less accurate.") | |
| return build_or_load_vectorstore([], index_path) | |
| docs = [] | |
| with open(example_file, "r", encoding="utf-8") as f: | |
| for line in f: | |
| try: | |
| data = json.loads(line) | |
| doc = Document(page_content=data["query"], metadata=data) | |
| docs.append(doc) | |
| except (json.JSONDecodeError, KeyError): continue | |
| print(f"Found and loaded {len(docs)} NLU training examples.") | |
| if os.path.exists(index_path): shutil.rmtree(index_path) | |
| return build_or_load_vectorstore(docs, index_path) | |
| # In app.py, near the other path definitions | |
| PERSONAL_MUSIC_BASE = PERSONAL_DATA_BASE / "music" | |
| os.makedirs(PERSONAL_MUSIC_BASE, exist_ok=True) | |
| # In app.py, replace your existing versions of these three functions with the code below. | |
| # --- Function 1: Auto-loads non-music memories from the 'Personal Memory Bank' folder --- | |
| def load_personal_files_from_folder(): | |
| """ | |
| Scans the 'Personal Memory Bank' folder and loads new multi-modal files | |
| (text, audio, video, images) into the personal vectorstore. | |
| """ | |
| global personal_vectorstore | |
| print("Scanning 'Personal Memory Bank' folder for new files...") | |
| if not os.path.exists(PERSISTENT_MEMORY_PATH): | |
| return | |
| # Define supported file extensions | |
| TEXT_EXTENSIONS = (".txt",) | |
| AUDIO_EXTENSIONS = (".mp3", ".wav", ".m4a", ".flac") | |
| VIDEO_EXTENSIONS = (".mp4", ".mov", ".avi", ".mkv") | |
| IMAGE_EXTENSIONS = (".jpg", ".jpeg", ".png", ".gif", ".bmp") | |
| # Get a list of sources already in the vectorstore to avoid re-processing files | |
| existing_sources = set() | |
| if personal_vectorstore and hasattr(personal_vectorstore.docstore, '_dict'): | |
| for doc in personal_vectorstore.docstore._dict.values(): | |
| existing_sources.add(doc.metadata.get("source")) | |
| docs_to_add = [] | |
| for filename in os.listdir(PERSISTENT_MEMORY_PATH): | |
| if filename in existing_sources: | |
| continue | |
| filepath = PERSISTENT_MEMORY_PATH / filename | |
| content_to_process = "" | |
| file_lower = filename.lower() | |
| if file_lower.endswith(TEXT_EXTENSIONS): | |
| print(f" - Found new text file to load: {filename}") | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| content_to_process = f.read() | |
| elif file_lower.endswith(AUDIO_EXTENSIONS) or file_lower.endswith(VIDEO_EXTENSIONS): | |
| media_type = "Audio" if file_lower.endswith(AUDIO_EXTENSIONS) else "Video" | |
| print(f" - Found new {media_type} file to transcribe: {filename}") | |
| try: | |
| transcribed_text = transcribe_audio(str(filepath)) | |
| title = os.path.splitext(filename)[0].replace('_', ' ').replace('-', ' ') | |
| content_to_process = f"Title: {title}\n\nContent: {transcribed_text}" | |
| except Exception as e: | |
| print(f" - ERROR: Failed to transcribe {filename}. Reason: {e}") | |
| continue | |
| elif file_lower.endswith(IMAGE_EXTENSIONS): | |
| print(f" - Found new Image file to describe: {filename}") | |
| try: | |
| description = describe_image(str(filepath)) | |
| title = os.path.splitext(filename)[0].replace('_', ' ').replace('-', ' ') | |
| content_to_process = f"Title: {title}\n\nContent: {description}" | |
| except Exception as e: | |
| print(f" - ERROR: Failed to describe {filename}. Reason: {e}") | |
| continue | |
| if content_to_process: | |
| docs_to_add.extend(parse_and_tag_entries(content_to_process, source=filename, settings={})) | |
| if docs_to_add: | |
| if personal_vectorstore is None: | |
| personal_vectorstore = build_or_load_vectorstore(docs_to_add, PERSONAL_INDEX_PATH, is_personal=True) | |
| else: | |
| personal_vectorstore.add_documents(docs_to_add) | |
| personal_vectorstore.save_local(PERSONAL_INDEX_PATH) | |
| print(f"Successfully added {len(docs_to_add)} new document(s) from the folder.") | |
| # --- Function 2: Auto-syncs music from the 'Music Library' folder (Hybrid Approach) --- | |
| def sync_music_library_from_folder(): | |
| """Scans 'Music Library' folder, syncs manifest for playback, and adds lyrics to vectorstore.""" | |
| global personal_vectorstore | |
| music_library_path = PERSISTENT_MEMORY_PATH / "Music Library" | |
| os.makedirs(music_library_path, exist_ok=True) | |
| manifest_path = PERSONAL_MUSIC_BASE / "music_manifest.json" | |
| manifest = {} | |
| if manifest_path.exists(): | |
| with open(manifest_path, "r") as f: manifest = json.load(f) | |
| existing_sources = set() | |
| if personal_vectorstore and hasattr(personal_vectorstore.docstore, '_dict'): | |
| for doc in personal_vectorstore.docstore._dict.values(): | |
| existing_sources.add(doc.metadata.get("source")) | |
| print("Scanning 'Music Library' folder for new songs...") | |
| filename_pattern = re.compile(r'^(.*?) - (.*?) - (.*?)\.(mp3|wav|m4a|ogg|flac)$', re.IGNORECASE) | |
| synced_count = 0 | |
| docs_to_add = [] | |
| for filename in os.listdir(music_library_path): | |
| song_id = filename.replace(" ", "_").lower() | |
| if song_id in manifest and filename in existing_sources: | |
| continue | |
| match = filename_pattern.match(filename) | |
| if match: | |
| print(f" - Found new song to sync: {filename}") | |
| title, artist, tag = match.groups()[:3] | |
| source_path = music_library_path / filename | |
| dest_path = PERSONAL_MUSIC_BASE / filename | |
| if not os.path.exists(dest_path): | |
| shutil.copy2(str(source_path), str(dest_path)) | |
| # Add to manifest for playback system | |
| song_metadata = {"title": title.strip(), "artist": artist.strip(), "moods": [tag.strip().lower()], "filepath": str(dest_path)} | |
| manifest[song_id] = song_metadata | |
| # --- NEW HYBRID LOGIC: Transcribe and prep for vectorstore --- | |
| # Transcribe and prep for semantic memory system (vectorstore) | |
| if filename not in existing_sources: | |
| try: | |
| print(f" - Transcribing '{title}' for memory bank...") | |
| lyrics = transcribe_audio(str(dest_path)) | |
| content_for_rag = ( | |
| f"Title: Song - {song_metadata['title']}\n" | |
| f"Artist: {song_metadata['artist']}\n" | |
| f"Moods: {', '.join(song_metadata['moods'])}\n\n" | |
| f"Lyrics:\n{lyrics}" | |
| ) | |
| docs_to_add.extend(parse_and_tag_entries(content_for_rag, source=filename, settings={})) | |
| except Exception as e: | |
| print(f" - WARNING: Failed to transcribe {filename} for memory bank. Error: {e}") | |
| # --- END OF NEW HYBRID LOGIC --- | |
| synced_count += 1 | |
| if synced_count > 0: | |
| with open(manifest_path, "w") as f: json.dump(manifest, f, indent=2) | |
| print(f"Successfully synced {synced_count} new song(s) to the music manifest.") | |
| if docs_to_add: | |
| if personal_vectorstore is None: | |
| personal_vectorstore = build_or_load_vectorstore(docs_to_add, PERSONAL_INDEX_PATH, is_personal=True) | |
| else: | |
| personal_vectorstore.add_documents(docs_to_add) | |
| personal_vectorstore.save_local(PERSONAL_INDEX_PATH) | |
| print(f"Successfully added lyrics for {len(docs_to_add)} song(s) to the personal vectorstore.") | |
| def canonical_theme(tk: str) -> str: return tk if tk in CONFIG["themes"] else "All" | |
| def theme_upload_dir(theme: str) -> str: | |
| p = UPLOADS_BASE / f"theme_{canonical_theme(theme).replace(' ', '').lower()}" | |
| p.mkdir(exist_ok=True) | |
| return str(p) | |
| def load_manifest(theme: str) -> Dict[str, Any]: | |
| p = os.path.join(theme_upload_dir(theme), "manifest.json") | |
| if os.path.exists(p): | |
| try: | |
| with open(p, "r", encoding="utf-8") as f: return json.load(f) | |
| except Exception: pass | |
| return {"files": {}} | |
| def save_manifest(theme: str, man: Dict[str, Any]): | |
| with open(os.path.join(theme_upload_dir(theme), "manifest.json"), "w", encoding="utf-8") as f: json.dump(man, f, indent=2) | |
| def list_theme_files(theme: str) -> List[tuple[str, bool]]: | |
| man = load_manifest(theme) | |
| base = theme_upload_dir(theme) | |
| found = [(n, bool(e)) for n, e in man.get("files", {}).items() if os.path.exists(os.path.join(base, n))] | |
| existing = {n for n, e in found} | |
| for name in sorted(os.listdir(base)): | |
| if name not in existing and os.path.isfile(os.path.join(base, name)): found.append((name, False)) | |
| man["files"] = dict(found) | |
| save_manifest(theme, man) | |
| return found | |
| def copy_into_theme(theme: str, src_path: str) -> str: | |
| fname = os.path.basename(src_path) | |
| dest = os.path.join(theme_upload_dir(theme), fname) | |
| shutil.copy2(src_path, dest) | |
| return dest | |
| def seed_files_into_theme(theme: str): | |
| SEED_FILES = [("sample_data/caregiving_tips.txt", True), ("sample_data/the_father_segments_enriched_harmonized_plus.jsonl", True), ("sample_data/still_alice_enriched_harmonized_plus.jsonl", True), ("sample_data/away_from_her_enriched_harmonized_plus.jsonl", True), ("sample_data/alive_inside_enriched_harmonized.jsonl", True)] | |
| man, changed = load_manifest(theme), False | |
| for path, enable in SEED_FILES: | |
| if not os.path.exists(path): continue | |
| fname = os.path.basename(path) | |
| if not os.path.exists(os.path.join(theme_upload_dir(theme), fname)): | |
| copy_into_theme(theme, path) | |
| man["files"][fname] = bool(enable) | |
| changed = True | |
| if changed: save_manifest(theme, man) | |
| def ensure_index(theme='All'): | |
| theme = canonical_theme(theme) | |
| if theme in vectorstores: return vectorstores[theme] | |
| upload_dir = theme_upload_dir(theme) | |
| enabled_files = [os.path.join(upload_dir, n) for n, enabled in list_theme_files(theme) if enabled] | |
| index_path = THEME_PATHS.get(theme) | |
| vectorstores[theme] = bootstrap_vectorstore(sample_paths=enabled_files, index_path=index_path) | |
| return vectorstores[theme] | |
| # --- Gradio Callbacks --- | |
| # In app.py, modify the collect_settings function | |
| def collect_settings(*args): | |
| keys = ["role", "patient_name", "caregiver_name", "tone", "language", "tts_lang", "temperature", | |
| # --- ADD "disease_stage" to this list --- | |
| "disease_stage", | |
| "behaviour_tag", "emotion_tag", "topic_tag", "active_theme", "tts_on", "debug_mode"] | |
| return dict(zip(keys, args)) | |
| # NEW: MUST be consistent with collect_settings() defined above | |
| def auto_setup_on_load(theme): | |
| if not os.listdir(theme_upload_dir(theme)): seed_files_into_theme(theme) | |
| # --- START: DEFINITIVE FIX --- | |
| # This now provides the correct number and order of default settings. | |
| settings = collect_settings( | |
| "patient", # role | |
| "", # patient_name | |
| "", # caregiver_name | |
| "warm", # tone | |
| "English", # language | |
| "English", # tts_lang | |
| 0.7, # temperature | |
| "Default: Mild Stage", # disease_stage <-- Correctly set | |
| "None", # behaviour_tag | |
| "None", # emotion_tag | |
| "None", # topic_tag | |
| "All", # active_theme <-- Correctly set | |
| True, # tts_on | |
| False # debug_mode | |
| ) | |
| # --- END: DEFINITIVE FIX --- | |
| files_ui, status = refresh_file_list_ui(theme) | |
| return settings, files_ui, status | |
| # In app.py, replace the entire parse_and_tag_entries function. | |
| def parse_and_tag_entries(text_content: str, source: str, settings: dict = None) -> List[Document]: | |
| docs_to_add = [] | |
| # This logic correctly handles both simple text and complex journal entries | |
| entries = re.split(r'\n(?:---|--|-|-\*-|-\.-)\n', text_content) | |
| if len(entries) == 1 and "title:" not in entries[0].lower() and "content:" not in entries[0].lower(): | |
| entries = [text_content] # Treat simple text as a single entry | |
| for entry in entries: | |
| if not entry.strip(): continue | |
| lines = entry.strip().split('\n') | |
| title_line = lines[0].split(':', 1) | |
| title = title_line[1].strip() if len(title_line) > 1 and "title:" in lines[0].lower() else "Untitled Text Entry" | |
| content_part = "\n".join(lines[1:]) | |
| content = content_part.split(':', 1)[1].strip() if "content:" in content_part.lower() else content_part.strip() or entry.strip() | |
| if not content: continue | |
| full_content = f"Title: {title}\n\nContent: {content}" | |
| detected_tags = detect_tags_from_query( | |
| content, nlu_vectorstore=nlu_vectorstore, | |
| behavior_options=CONFIG["behavior_tags"], emotion_options=CONFIG["emotion_tags"], | |
| topic_options=CONFIG["topic_tags"], context_options=CONFIG["context_tags"], | |
| settings=settings | |
| ) | |
| metadata = {"source": source, "title": title} | |
| # --- START: CORRECTED METADATA ASSIGNMENT --- | |
| if detected_tags.get("detected_behaviors"): | |
| metadata["behaviors"] = [b.lower() for b in detected_tags["detected_behaviors"]] | |
| detected_emotion = detected_tags.get("detected_emotion") | |
| if detected_emotion and detected_emotion != "None": | |
| metadata["emotion"] = detected_emotion.lower() | |
| # Correctly handle the plural "detected_topics" key and list value | |
| detected_topics = detected_tags.get("detected_topics") | |
| if detected_topics: | |
| metadata["topic_tags"] = [t.lower() for t in detected_topics] | |
| if detected_tags.get("detected_contexts"): | |
| metadata["context_tags"] = [c.lower() for c in detected_tags["detected_contexts"]] | |
| # --- END: CORRECTED METADATA ASSIGNMENT --- | |
| docs_to_add.append(Document(page_content=full_content, metadata=metadata)) | |
| return docs_to_add | |
| def handle_add_knowledge(title, text_input, file_input, image_input, yt_url, settings): | |
| global personal_vectorstore | |
| docs_to_add = [] | |
| source, content = "Unknown", "" | |
| if text_input and text_input.strip(): | |
| source, content = "Text Input", f"Title: {title or 'Untitled'}\n\nContent: {text_input}" | |
| elif file_input: | |
| source = os.path.basename(file_input.name) | |
| if file_input.name.lower().endswith('.txt'): | |
| with open(file_input.name, 'r', encoding='utf-8') as f: content = f.read() | |
| else: | |
| transcribed = transcribe_audio(file_input.name) | |
| content = f"Title: {title or 'Audio/Video Note'}\n\nContent: {transcribed}" | |
| elif image_input: | |
| source, description = "Image Input", describe_image(image_input) | |
| content = f"Title: {title or 'Image Note'}\n\nContent: {description}" | |
| elif yt_url and ("youtube.com" in yt_url or "youtu.be" in yt_url): | |
| try: | |
| yt = YouTube(yt_url) | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_audio_file: | |
| yt.streams.get_audio_only().download(filename=temp_audio_file.name) | |
| transcribed = transcribe_audio(temp_audio_file.name) | |
| os.remove(temp_audio_file.name) | |
| source, content = f"YouTube: {yt.title}", f"Title: {title or yt.title}\n\nContent: {transcribed}" | |
| except Exception as e: | |
| return f"Error processing YouTube link: {e}" | |
| else: | |
| return "Please provide content to add." | |
| if content: | |
| docs_to_add = parse_and_tag_entries(content, source, settings=settings) | |
| if not docs_to_add: return "No processable content found to add." | |
| if personal_vectorstore is None: | |
| personal_vectorstore = build_or_load_vectorstore(docs_to_add, PERSONAL_INDEX_PATH, is_personal=True) | |
| else: | |
| personal_vectorstore.add_documents(docs_to_add) | |
| personal_vectorstore.save_local(PERSONAL_INDEX_PATH) | |
| return f"Successfully added {len(docs_to_add)} new memory/memories." | |
| # In app.py, add this new handler function | |
| def handle_add_music(file, title, artist, mood): | |
| if not all([file, title, artist, mood]): | |
| return "Please fill out all fields." | |
| # Save the audio file | |
| filename = os.path.basename(file.name) | |
| dest_path = PERSONAL_MUSIC_BASE / filename | |
| shutil.copy2(file.name, str(dest_path)) | |
| # Save the metadata to a manifest file | |
| manifest_path = PERSONAL_MUSIC_BASE / "music_manifest.json" | |
| manifest = {} | |
| if manifest_path.exists(): | |
| with open(manifest_path, "r") as f: | |
| manifest = json.load(f) | |
| song_id = filename.replace(" ", "_").lower() | |
| manifest[song_id] = { | |
| "title": title.strip(), | |
| "artist": artist.strip(), | |
| # "moods": [m.strip().lower() for m in mood.split(",")], | |
| "moods": [m.lower() for m in mood], # Correctly handles the list from the dropdown | |
| "filepath": str(dest_path) # Store the full path for backend access | |
| } | |
| with open(manifest_path, "w") as f: | |
| json.dump(manifest, f, indent=2) | |
| return f"Successfully added '{title}' to the music library." | |
| # In app.py, add these two new functions (e.g., after the handle_add_music function) | |
| def list_music_library(): | |
| """Loads the music manifest and formats it for the Gradio UI.""" | |
| manifest_path = PERSONAL_MUSIC_BASE / "music_manifest.json" | |
| if not manifest_path.exists(): | |
| return gr.update(value=[["Library is empty", "", ""]]), gr.update(choices=[], value=None) | |
| with open(manifest_path, "r") as f: | |
| manifest = json.load(f) | |
| if not manifest: | |
| return gr.update(value=[["Library is empty", "", ""]]), gr.update(choices=[], value=None) | |
| display_data = [[data['title'], data['artist'], ", ".join(data['moods'])] for data in manifest.values()] | |
| # Use the song's unique ID (the key in the manifest) for the delete dropdown | |
| delete_choices = list(manifest.keys()) | |
| return gr.update(value=display_data), gr.update(choices=delete_choices, value=None) | |
| def delete_music_from_library(song_id_to_delete): | |
| """Deletes a song from the manifest, the audio file, and the vectorstore.""" | |
| global personal_vectorstore | |
| if not song_id_to_delete: | |
| return "No music selected to delete." | |
| # 1. Remove from manifest and delete audio file | |
| manifest_path = PERSONAL_MUSIC_BASE / "music_manifest.json" | |
| if not manifest_path.exists(): return "Error: Music manifest not found." | |
| with open(manifest_path, "r") as f: manifest = json.load(f) | |
| song_to_delete = manifest.pop(song_id_to_delete, None) | |
| if not song_to_delete: return f"Error: Could not find song ID {song_id_to_delete} in manifest." | |
| with open(manifest_path, "w") as f: json.dump(manifest, f, indent=2) | |
| try: | |
| os.remove(song_to_delete['filepath']) | |
| except OSError as e: | |
| print(f"Error deleting audio file {song_to_delete['filepath']}: {e}") | |
| # 2. Remove lyrics from the personal vectorstore | |
| if personal_vectorstore and hasattr(personal_vectorstore.docstore, '_dict'): | |
| filename_to_delete = os.path.basename(song_to_delete['filepath']) | |
| all_docs = list(personal_vectorstore.docstore._dict.values()) | |
| # Find the document whose source matches the audio filename | |
| docs_to_keep = [d for d in all_docs if d.metadata.get("source") != filename_to_delete] | |
| if len(all_docs) > len(docs_to_keep): | |
| if not docs_to_keep: # If it was the last doc | |
| if os.path.isdir(PERSONAL_INDEX_PATH): shutil.rmtree(PERSONAL_INDEX_PATH) | |
| personal_vectorstore = build_or_load_vectorstore([], PERSONAL_INDEX_PATH, is_personal=True) | |
| else: | |
| new_vs = FAISS.from_documents(docs_to_keep, _default_embeddings()) | |
| new_vs.save_local(PERSONAL_INDEX_PATH) | |
| personal_vectorstore = new_vs | |
| return f"Successfully deleted '{song_to_delete['title']}' from the library and memory bank." | |
| return f"Successfully deleted '{song_to_delete['title']}' from the music library." | |
| def chat_fn(user_text, audio_file, settings, chat_history): | |
| # --- ADD THIS DEBUG BLOCK AT THE TOP --- | |
| print("\n" + "="*50) | |
| print(f"[DEBUG app.py] chat_fn received settings: {settings}") | |
| print("="*50 + "\n") | |
| # --- END OF ADDITION --- | |
| global personal_vectorstore | |
| question = (user_text or "").strip() | |
| if audio_file and not question: | |
| try: | |
| question = transcribe_audio(audio_file, lang=CONFIG["languages"].get(settings.get("tts_lang", "English"), "en")) | |
| except Exception as e: | |
| err_msg = f"Audio Error: {e}" if settings.get("debug_mode") else "Sorry, I couldn't understand the audio." | |
| chat_history.append({"role": "assistant", "content": err_msg}) | |
| return "", None, chat_history | |
| if not question: | |
| return "", None, chat_history | |
| # --- START FIX 1: Correctly process the incoming chat_history (list of dicts) --- | |
| # The incoming chat_history is already in the desired format for the API, | |
| # we just need to filter out our special system messages (like sources). | |
| api_chat_history = [ | |
| msg for msg in chat_history | |
| if msg.get("content") and not msg["content"].strip().startswith("*(") | |
| ] | |
| # Append the new user question to the history that will be displayed in the UI | |
| chat_history.append({"role": "user", "content": question}) | |
| # --- END FIX 1 --- | |
| # NEW | |
| query_type = route_query_type(question, severity=settings.get("disease_stage", "Default: Mild Stage")) | |
| # query_type = route_query_type(question) | |
| # --- ADD THIS DEBUG PRINT --- | |
| print(f"[DEBUG] Router classified query as: {query_type}") | |
| # --- END OF ADDITION --- | |
| final_tags = { "scenario_tag": None, "emotion_tag": None, "topic_tag": None, "context_tags": [] } | |
| manual_behavior = settings.get("behaviour_tag", "None") | |
| manual_emotion = settings.get("emotion_tag", "None") | |
| manual_topic = settings.get("topic_tag", "None") | |
| auto_detected_context = "" | |
| if not all(m == "None" for m in [manual_behavior, manual_emotion, manual_topic]): | |
| # --- ADD THIS DEBUG PRINT --- | |
| print(f"[DEBUG app.py] Manual override DETECTED. Behavior='{manual_behavior}', Emotion='{manual_emotion}', Topic='{manual_topic}'") | |
| # --- END OF ADDITION --- | |
| final_tags["scenario_tag"] = manual_behavior if manual_behavior != "None" else None | |
| final_tags["emotion_tag"] = manual_emotion if manual_emotion != "None" else None | |
| final_tags["topic_tag"] = manual_topic if manual_topic != "None" else None | |
| # NEW: Expand detecting emotions and behaviors for caregiving to music playing | |
| # whenever a request to play music, the system will first analyze their query to detect an underlying emotion or behavior | |
| elif "caregiving_scenario" in query_type or "play_music_request" in query_type: | |
| # --- NEW DEBUG BLOCK: Print inputs before calling NLU --- | |
| print("\n--- [DEBUG app.py] Preparing to call NLU ---") | |
| print(f" - Query to Analyze: '{question}'") | |
| print(f" - NLU Vectorstore Loaded: {nlu_vectorstore is not None}") | |
| print(f" - Current Settings Passed: {settings}") | |
| print("------------------------------------------") | |
| # --- END OF NEW DEBUG BLOCK --- | |
| detected_tags = detect_tags_from_query( | |
| question, nlu_vectorstore=nlu_vectorstore, behavior_options=CONFIG["behavior_tags"], | |
| emotion_options=CONFIG["emotion_tags"], topic_options=CONFIG["topic_tags"], | |
| context_options=CONFIG["context_tags"], settings=settings) | |
| # --- ADD THIS DEBUG PRINT --- | |
| print(f"[DEBUG app.py] Raw NLU output: {detected_tags}") | |
| # --- END OF ADDITION --- | |
| behaviors = detected_tags.get("detected_behaviors") | |
| final_tags["scenario_tag"] = behaviors[0] if behaviors else None | |
| final_tags["emotion_tag"] = detected_tags.get("detected_emotion") | |
| final_tags["topic_tag"] = detected_tags.get("detected_topic") | |
| final_tags["context_tags"] = detected_tags.get("detected_contexts", []) | |
| # --- ADD THIS DEBUG PRINT --- | |
| print(f"[DEBUG] NLU detected tags: {final_tags}") | |
| # --- END OF ADDITION --- | |
| detected_parts = [f"{k.split('_')[1]}=`{v}`" for k, v in final_tags.items() if v and v != "None" and v != []] | |
| if detected_parts: | |
| auto_detected_context = f"*(Auto-detected context: {', '.join(detected_parts)})*" | |
| vs_general = ensure_index(settings.get("active_theme", "All")) | |
| if personal_vectorstore is None: | |
| personal_vectorstore = build_or_load_vectorstore([], PERSONAL_INDEX_PATH, is_personal=True) | |
| # OLD rag_settings = {k: settings.get(k) for k in ["role", "temperature", "language", "patient_name", "caregiver_name", "tone"]} | |
| # NEW add "disease_stage" | |
| # rag_settings = {k: settings.get(k) for k in ["role", "temperature", "language", "patient_name", "caregiver_name", "tone", "disease_stage"]} | |
| # First, construct the path to the manifest file. | |
| manifest_path_str = str(PERSONAL_MUSIC_BASE / "music_manifest.json") | |
| # Then, gather all the settings from the UI into the dictionary. | |
| rag_settings = {k: settings.get(k) for k in ["role", "temperature", "language", "patient_name", "caregiver_name", "tone", "disease_stage"]} | |
| # Finally, add the special manifest path to that same dictionary. | |
| rag_settings["music_manifest_path"] = manifest_path_str | |
| chain = make_rag_chain(vs_general, personal_vectorstore, **rag_settings) | |
| response = answer_query(chain, question, query_type=query_type, chat_history=api_chat_history, **final_tags) | |
| # --- MUSIC PLAYBACK LOGIC START --- | |
| # 1. Extract the text answer and the potential music file path from the agent's response. | |
| answer = response.get("answer", "[No answer found]") | |
| audio_playback_url = response.get("audio_playback_url") | |
| # 2. Append the text part of the response to the chat history so the user sees it. | |
| chat_history.append({"role": "assistant", "content": answer}) | |
| if auto_detected_context: | |
| chat_history.append({"role": "assistant", "content": auto_detected_context}) | |
| if response.get("sources"): | |
| chat_history.append({"role": "assistant", "content": f"*(Sources used: {', '.join(response['sources'])})*"}) | |
| # 3. Decide what to play in the audio component: music takes priority over TTS. | |
| audio_out_update = None | |
| if audio_playback_url: | |
| # If a music URL was returned, update the audio component to play that music file. | |
| song_title = os.path.basename(audio_playback_url) | |
| audio_out_update = gr.update(value=audio_playback_url, visible=True, label=f"Now Playing: {song_title}", autoplay=True) | |
| elif settings.get("tts_on") and answer: | |
| # Otherwise, if no music is playing and TTS is on, fall back to reading the text answer aloud. | |
| tts_file = synthesize_tts(answer, lang=CONFIG["languages"].get(settings.get("tts_lang"), "en")) | |
| audio_out_update = gr.update(value=tts_file, visible=bool(tts_file), label="Response Audio", autoplay=True) | |
| # 4. Return all the updates for the Gradio UI. | |
| return "", audio_out_update, chat_history | |
| # --- MUSIC PLAYBACK LOGIC END --- | |
| # The save_chat_to_memory function incorrectly assumes the history is | |
| # a list of tuples, like [(True, "..."), (False, "...")] | |
| # However, The chat_fn function correctly builds the chat_history as | |
| # a list of dictionaries, like this: | |
| # [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] | |
| # To correctly parse the list of dictionaries. | |
| def save_chat_to_memory(chat_history): | |
| if not chat_history: | |
| return "Nothing to save." | |
| # --- START: MODIFIED LOGIC --- | |
| # Correctly processes the list of dictionaries from the chatbot | |
| formatted_chat = [ | |
| f"{msg.get('role', 'assistant').capitalize()}: {msg.get('content', '').strip()}" | |
| for msg in chat_history | |
| if isinstance(msg, dict) and msg.get('content') and not msg.get('content', '').strip().startswith("*(") | |
| ] | |
| # --- END: MODIFIED LOGIC --- | |
| if not formatted_chat: | |
| return "No conversation to save." | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| title = f"Conversation from {timestamp}" | |
| full_content = f"Title: {title}\n\nContent:\n" + "\n".join(formatted_chat) | |
| doc = Document(page_content=full_content, metadata={"source": "Saved Chat", "title": title}) | |
| global personal_vectorstore | |
| if personal_vectorstore is None: | |
| personal_vectorstore = build_or_load_vectorstore([doc], PERSONAL_INDEX_PATH, is_personal=True) | |
| else: | |
| personal_vectorstore.add_documents([doc]) | |
| personal_vectorstore.save_local(PERSONAL_INDEX_PATH) | |
| return f"Conversation from {timestamp} saved." | |
| def list_personal_memories(): | |
| global personal_vectorstore | |
| if personal_vectorstore is None or not hasattr(personal_vectorstore.docstore, '_dict') or not personal_vectorstore.docstore._dict: | |
| return gr.update(value=[["No memories", "", ""]]), gr.update(choices=[], value=None) | |
| docs = list(personal_vectorstore.docstore._dict.values()) | |
| return gr.update(value=[[d.metadata.get('title', '...'), d.metadata.get('source', '...'), d.page_content] for d in docs]), gr.update(choices=[d.page_content for d in docs]) | |
| def delete_personal_memory(memory_to_delete): | |
| global personal_vectorstore | |
| if personal_vectorstore is None or not memory_to_delete: return "No memory selected." | |
| all_docs = list(personal_vectorstore.docstore._dict.values()) | |
| docs_to_keep = [d for d in all_docs if d.page_content != memory_to_delete] | |
| if len(all_docs) == len(docs_to_keep): return "Error: Could not find memory." | |
| if not docs_to_keep: | |
| if os.path.isdir(PERSONAL_INDEX_PATH): shutil.rmtree(PERSONAL_INDEX_PATH) | |
| personal_vectorstore = build_or_load_vectorstore([], PERSONAL_INDEX_PATH, is_personal=True) | |
| else: | |
| new_vs = FAISS.from_documents(docs_to_keep, _default_embeddings()) | |
| new_vs.save_local(PERSONAL_INDEX_PATH) | |
| personal_vectorstore = new_vs | |
| return "Successfully deleted memory." | |
| # --- EVALUATION FUNCTIONS: move them into evaluate.py | |
| # def evaluate_nlu_tags(expected: Dict[str, Any], actual: Dict[str, Any], tag_key: str, expected_key_override: str = None) -> Dict[str, float]: | |
| # def _parse_judge_json(raw_str: str) -> dict | None: | |
| # def run_comprehensive_evaluation(): | |
| def upload_knowledge(files, theme): | |
| for f in files: copy_into_theme(theme, f.name) | |
| if theme in vectorstores: del vectorstores[theme] | |
| return f"Uploaded {len(files)} file(s)." | |
| def save_file_selection(theme, enabled): | |
| man = load_manifest(theme) | |
| for fname in man['files']: man['files'][fname] = fname in enabled | |
| save_manifest(theme, man) | |
| if theme in vectorstores: del vectorstores[theme] | |
| return f"Settings saved for theme '{theme}'." | |
| def refresh_file_list_ui(theme): | |
| files = list_theme_files(theme) | |
| return gr.update(choices=[f for f, _ in files], value=[f for f, en in files if en]), f"Found {len(files)} file(s)." | |
| def test_save_file(): | |
| try: | |
| path = PERSONAL_DATA_BASE / "persistence_test.txt" | |
| path.write_text(f"File saved at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| return f"✅ Success! Wrote test file to: {path}" | |
| except Exception as e: return f"❌ Error! Failed to write file: {e}" | |
| def check_test_file(): | |
| path = PERSONAL_DATA_BASE / "persistence_test.txt" | |
| if path.exists(): return f"✅ Success! Found test file. Contents: '{path.read_text()}'" | |
| return f"❌ Failure. Test file not found at: {path}" | |
| # --- UI Definition --- | |
| CSS = """ | |
| .gradio-container { font-size: 14px; } | |
| #chatbot { min-height: 400px; } | |
| #audio_in audio, #audio_out audio { max-height: 40px; } | |
| #audio_in .waveform, #audio_out .waveform { display: none !important; } | |
| #audio_in, #audio_out { min-height: 0px !important; } | |
| """ | |
| # OLD: add allowed_paths so the UI can access the music files | |
| # with gr.Blocks(theme=gr.themes.Soft(), css=CSS, allowed_paths=[str(PERSONAL_MUSIC_BASE)]) as demo: | |
| with gr.Blocks(theme=gr.themes.Soft(), css=CSS) as demo: | |
| settings_state = gr.State({}) | |
| with gr.Tab("Chat"): | |
| with gr.Row(): | |
| user_text = gr.Textbox(show_label=False, placeholder="Type your message here...", scale=7) | |
| submit_btn = gr.Button("Send", variant="primary", scale=1) | |
| with gr.Row(): | |
| audio_in = gr.Audio(sources=["microphone"], type="filepath", label="Voice Input", elem_id="audio_in") | |
| audio_out = gr.Audio(label="Response Audio", autoplay=True, visible=True, elem_id="audio_out") | |
| chatbot = gr.Chatbot(elem_id="chatbot", label="Conversation", type="messages") | |
| chat_status = gr.Markdown() | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear") | |
| save_btn = gr.Button("Save to Memory") | |
| with gr.Tab("Personalize"): | |
| gr.Markdown("### **Upload Personal Memory**") | |
| with gr.Accordion("Add Multimodal Data to Personal Memory Bank", open=True): | |
| personal_title = gr.Textbox(label="Title") | |
| personal_text = gr.Textbox(lines=5, label="Text Content") | |
| with gr.Row(): | |
| personal_file = gr.File(label="Upload Audio/Video/Text File") | |
| personal_image = gr.Image(type="filepath", label="Upload Image") | |
| personal_yt_url = gr.Textbox(label="Or, provide a YouTube URL") | |
| personal_add_btn = gr.Button("Add Knowledge", variant="primary") | |
| personal_status = gr.Markdown() | |
| # In app.py, within the "Personalize" Tab | |
| gr.Markdown("### **Upload Personal Music Library**") | |
| with gr.Accordion("Add Music to Personal Memory Bank", open=False): | |
| music_file = gr.File(label="Upload Audio File (.mp3, .wav)", file_types=["audio"]) | |
| music_title = gr.Textbox(label="Song Title (e.g., My Way)") | |
| music_artist = gr.Textbox(label="Artist (e.g., Frank Sinatra)") | |
| # music_mood = gr.Textbox(label="Mood Tags (comma-separated, e.g., calm, happy, nostalgic)") | |
| # NEW: Add a dropdown menu music tag selection based on emotion and behavior tags | |
| music_mood = gr.Dropdown( | |
| CONFIG["music_moods"], | |
| label="Select Moods/Contexts for this Song", | |
| multiselect=True | |
| ) | |
| music_add_btn = gr.Button("Add Music", variant="primary") | |
| music_status = gr.Markdown() | |
| gr.Markdown("### **Manage Personal Memory Bank**") | |
| with gr.Accordion("View/Hide Details", open=False): | |
| personal_memory_display = gr.DataFrame(headers=["Title", "Source", "Content"], label="Saved Memories", row_count=(5, "dynamic")) | |
| personal_refresh_btn = gr.Button("Refresh Memories") | |
| personal_delete_selector = gr.Dropdown(label="Select memory to delete", scale=3, interactive=True) | |
| personal_delete_btn = gr.Button("Delete Selected", variant="stop", scale=1) | |
| personal_delete_status = gr.Markdown() | |
| # --- NEW UI FOR MUSIC MANAGEMENT --- | |
| gr.Markdown("### **Manage Music Library**") | |
| with gr.Accordion("View/Hide Music Details", open=False): | |
| music_library_display = gr.DataFrame( | |
| headers=["Title", "Artist", "Moods"], | |
| label="Music Library", | |
| row_count=(5, "dynamic") | |
| ) | |
| music_refresh_btn = gr.Button("Refresh Music List") | |
| music_delete_selector = gr.Dropdown( | |
| label="Select music to delete", | |
| scale=3, | |
| interactive=True | |
| ) | |
| music_delete_btn = gr.Button("Delete Selected Music", variant="stop", scale=1) | |
| music_delete_status = gr.Markdown() | |
| # --- END OF NEW UI --- | |
| with gr.Tab("Settings"): | |
| with gr.Group(): | |
| gr.Markdown("## Conversation & Persona Settings") | |
| with gr.Row(): | |
| role = gr.Radio(CONFIG["roles"], value="patient", label="Your Role") | |
| patient_name = gr.Textbox(label="Patient's Name") | |
| caregiver_name = gr.Textbox(label="Caregiver's Name") | |
| with gr.Row(): | |
| temperature = gr.Slider(0.0, 1.2, value=0.7, step=0.1, label="Creativity") | |
| tone = gr.Dropdown(CONFIG["tones"], value="warm", label="Response Tone") | |
| with gr.Row(): | |
| # --- ADD THIS NEW DROPDOWN --- | |
| # disease_stage = gr.Dropdown(CONFIG["disease_stages"], value="Normal / Unspecified", label="Assumed Disease Stage") | |
| disease_stage = gr.Dropdown(CONFIG["disease_stages"], value="Default: Mild Stage", label="Assumed Disease Stage") | |
| # --- END OF ADDITION --- | |
| behaviour_tag = gr.Dropdown(CONFIG["behavior_tags"], value="None", label="Behaviour Filter (Manual)") | |
| emotion_tag = gr.Dropdown(CONFIG["emotion_tags"], value="None", label="Emotion Filter (Manual)") | |
| topic_tag = gr.Dropdown(CONFIG["topic_tags"], value="None", label="Topic Tag Filter (Manual)") | |
| with gr.Accordion("Language, Voice & Debugging", open=False): | |
| language = gr.Dropdown(list(CONFIG["languages"].keys()), value="English", label="Response Language") | |
| tts_lang = gr.Dropdown(list(CONFIG["languages"].keys()), value="English", label="Voice Language") | |
| tts_on = gr.Checkbox(True, label="Enable Voice Response") | |
| debug_mode = gr.Checkbox(False, label="Show Debug Info") | |
| gr.Markdown("--- \n ## General Knowledge Base Management") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| files_in = gr.File(file_count="multiple", file_types=[".jsonl", ".txt"], label="Upload Knowledge Files") | |
| upload_btn = gr.Button("Upload to Theme") | |
| seed_btn = gr.Button("Import Sample Data") | |
| mgmt_status = gr.Markdown() | |
| with gr.Column(scale=2): | |
| active_theme = gr.Radio(CONFIG["themes"], value="All", label="Active Knowledge Theme") | |
| files_box = gr.CheckboxGroup(choices=[], label="Enable Files for Selected Theme") | |
| with gr.Row(): | |
| save_files_btn = gr.Button("Save Selection", variant="primary") | |
| refresh_btn = gr.Button("Refresh List") | |
| with gr.Accordion("Persistence Test", open=False): | |
| test_save_btn = gr.Button("1. Run Persistence Test (Save File)") | |
| check_save_btn = gr.Button("3. Check for Test File") | |
| test_status = gr.Markdown() | |
| # --- UPDATED TESTING TAB --- | |
| with gr.Tab("Testing"): | |
| gr.Markdown("## Comprehensive Performance Evaluation") | |
| gr.Markdown("Click the button below to run a full evaluation on all test fixtures. This will test NLU (Routing & Tagging) and generate RAG responses for manual review.") | |
| run_comprehensive_btn = gr.Button("Run Comprehensive Evaluation", variant="primary") | |
| batch_summary_md = gr.Markdown("### Evaluation Summary: Not yet run.") | |
| comprehensive_results_df = gr.DataFrame( | |
| label="Detailed Evaluation Results", | |
| elem_id="comprehensive_results_df", | |
| headers=[ | |
| "Test ID","Title","Route Correct?","Expected Route","Actual Route", | |
| "Behavior F1","Emotion F1","Topic F1","Context F1", | |
| "Generated Answer","Sources","Source Count","Latency (ms)", "Faithfulness" | |
| ], | |
| interactive=False | |
| ) | |
| # --- Event Wiring --- | |
| all_settings = [ | |
| # Chat Tab Settings | |
| role, patient_name, caregiver_name, tone, language, tts_lang, temperature, | |
| # Disease Stage & Manual Filters | |
| disease_stage, behaviour_tag, emotion_tag, topic_tag, | |
| # Knowledge Base & Debug | |
| active_theme, tts_on, debug_mode | |
| ] | |
| settings_state = gr.State({}) | |
| # In app.py, replace the event wiring loop right after the all_settings list | |
| for component in all_settings: | |
| component.change(fn=collect_settings, inputs=all_settings, outputs=settings_state) | |
| submit_btn.click(fn=chat_fn, inputs=[user_text, audio_in, settings_state, chatbot], outputs=[user_text, audio_out, chatbot]) | |
| # for c in all_settings: c.change(fn=collect_settings, inputs=all_settings, outputs=settings_state) | |
| # submit_btn.click(fn=chat_fn, inputs=[user_text, audio_in, settings_state, chatbot], outputs=[user_text, audio_out, chatbot]) | |
| save_btn.click(fn=save_chat_to_memory, inputs=[chatbot], outputs=[chat_status]) | |
| clear_btn.click(lambda: (None, None, [], None, "", ""), outputs=[user_text, audio_out, chatbot, audio_in, user_text, chat_status]) | |
| personal_add_btn.click(fn=handle_add_knowledge, inputs=[personal_title, personal_text, personal_file, personal_image, personal_yt_url, settings_state], outputs=[personal_status]).then(lambda: (None, None, None, None, None), outputs=[personal_title, personal_text, personal_file, personal_image, personal_yt_url]) | |
| # Wire the button to the function in the UI event wiring section | |
| music_add_btn.click( | |
| fn=handle_add_music, | |
| inputs=[music_file, music_title, music_artist, music_mood], | |
| outputs=[music_status] | |
| ) | |
| # --- NEW EVENT WIRING FOR MUSIC MANAGEMENT --- | |
| music_refresh_btn.click( | |
| fn=list_music_library, | |
| inputs=None, | |
| outputs=[music_library_display, music_delete_selector] | |
| ) | |
| music_delete_btn.click( | |
| fn=delete_music_from_library, | |
| inputs=[music_delete_selector], | |
| outputs=[music_delete_status] | |
| ).then( | |
| fn=list_music_library, | |
| inputs=None, | |
| outputs=[music_library_display, music_delete_selector] | |
| ) | |
| # --- END OF NEW WIRING --- | |
| personal_refresh_btn.click(fn=list_personal_memories, inputs=None, outputs=[personal_memory_display, personal_delete_selector]) | |
| personal_delete_btn.click(fn=delete_personal_memory, inputs=[personal_delete_selector], outputs=[personal_delete_status]).then(fn=list_personal_memories, inputs=None, outputs=[personal_memory_display, personal_delete_selector]) | |
| upload_btn.click(upload_knowledge, inputs=[files_in, active_theme], outputs=[mgmt_status]).then(refresh_file_list_ui, inputs=[active_theme], outputs=[files_box, mgmt_status]) | |
| save_files_btn.click(save_file_selection, inputs=[active_theme, files_box], outputs=[mgmt_status]) | |
| seed_btn.click(seed_files_into_theme, inputs=[active_theme]).then(refresh_file_list_ui, inputs=[active_theme], outputs=[files_box, mgmt_status]) | |
| refresh_btn.click(refresh_file_list_ui, inputs=[active_theme], outputs=[files_box, mgmt_status]) | |
| active_theme.change(refresh_file_list_ui, inputs=[active_theme], outputs=[files_box, mgmt_status]) | |
| # Then update the .click() event handler | |
| run_comprehensive_btn.click( | |
| fn=lambda: run_comprehensive_evaluation( | |
| vs_general=ensure_index("All"), | |
| vs_personal=personal_vectorstore, # <-- This is correctly passed in | |
| nlu_vectorstore=nlu_vectorstore, | |
| config=CONFIG, | |
| storage_path=STORAGE_ROOT # <-- ADD THIS ARGUMENT | |
| ), | |
| outputs=[batch_summary_md, comprehensive_results_df, comprehensive_results_df] | |
| ) | |
| demo.load(auto_setup_on_load, inputs=[active_theme], outputs=[settings_state, files_box, mgmt_status]) | |
| demo.load(load_test_fixtures) | |
| test_save_btn.click(fn=test_save_file, inputs=None, outputs=[test_status]) | |
| check_save_btn.click(fn=check_test_file, inputs=None, outputs=[test_status]) | |
| # --- Startup Logic --- | |
| # --- Function 3: The Startup Orchestrator --- | |
| def pre_load_indexes(): | |
| """Loads all data sources and runs the auto-loading functions at startup.""" | |
| global personal_vectorstore, nlu_vectorstore | |
| print("Pre-loading all indexes at startup...") | |
| print(" - Loading NLU examples index...") | |
| nlu_vectorstore = bootstrap_nlu_vectorstore("nlu_training_examples.jsonl", NLU_EXAMPLES_INDEX_PATH) | |
| print(f" ...NLU index loaded.") | |
| for theme in CONFIG["themes"]: | |
| print(f" - Loading general index for theme: '{theme}'") | |
| try: | |
| ensure_index(theme) | |
| print(f" ...'{theme}' theme loaded.") | |
| except Exception as e: | |
| print(f" ...Error loading theme '{theme}': {e}") | |
| print(" - Loading personal knowledge index...") | |
| try: | |
| personal_vectorstore = build_or_load_vectorstore([], PERSONAL_INDEX_PATH, is_personal=True) | |
| print(" ...Personal knowledge loaded.") | |
| except Exception as e: | |
| print(f" ...Error loading personal knowledge: {e}") | |
| # NEW: auto-loading and syncing functions with a small pre-loaded Personal Memory Bank | |
| load_personal_files_from_folder() | |
| sync_music_library_from_folder() | |
| print("All indexes and personal files loaded. Application is ready.") | |
| if __name__ == "__main__": | |
| seed_files_into_theme('All') | |
| pre_load_indexes() | |
| demo.queue().launch(debug=True, allowed_paths=[str(PERSONAL_MUSIC_BASE)]) | |
| # demo.queue().launch(debug=True) | |