Spaces:
Running
Running
| # evaluate.py | |
| import os | |
| import json | |
| import time | |
| import pandas as pd | |
| from typing import List, Dict, Any | |
| # --- Imports from the main application --- | |
| try: | |
| from alz_companion.agent import ( | |
| make_rag_chain, route_query_type, detect_tags_from_query, | |
| answer_query, call_llm | |
| ) | |
| from alz_companion.prompts import FAITHFULNESS_JUDGE_PROMPT | |
| from langchain_community.vectorstores import FAISS | |
| except ImportError: | |
| class FAISS: pass | |
| def make_rag_chain(*args, **kwargs): return lambda q, **k: {"answer": f"(Eval Fallback) You asked: {q}", "sources": []} | |
| def route_query_type(q): return "general_conversation" | |
| def detect_tags_from_query(*args, **kwargs): return {} | |
| def answer_query(chain, q, **kwargs): return chain(q, **kwargs) | |
| def call_llm(*args, **kwargs): return "{}" | |
| FAITHFULNESS_JUDGE_PROMPT = "" | |
| print("WARNING: Could not import from alz_companion. Evaluation functions will use fallbacks.") | |
| # --- LLM-as-a-Judge Prompt for Answer Correctness --- | |
| ANSWER_CORRECTNESS_JUDGE_PROMPT = """You are an expert evaluator. Your task is to assess the factual correctness of a generated answer against a ground truth answer. | |
| - GROUND_TRUTH_ANSWER: This is the gold-standard, correct answer. | |
| - GENERATED_ANSWER: This is the answer produced by the AI model. | |
| Evaluate if the GENERATED_ANSWER is factually aligned with the GROUND_TRUTH_ANSWER. Ignore minor differences in phrasing, tone, or structure. The key is factual accuracy. | |
| Respond with a single JSON object containing a float score from 0.0 to 1.0. | |
| - 1.0: The generated answer is factually correct and aligns perfectly with the ground truth. | |
| - 0.5: The generated answer is partially correct but misses key information or contains minor inaccuracies. | |
| - 0.0: The generated answer is factually incorrect or contradicts the ground truth. | |
| --- DATA TO EVALUATE --- | |
| GROUND_TRUTH_ANSWER: | |
| {ground_truth_answer} | |
| GENERATED_ANSWER: | |
| {generated_answer} | |
| --- | |
| Return a single JSON object with your score: | |
| {{ | |
| "correctness_score": <float> | |
| }} | |
| """ | |
| test_fixtures = [] | |
| def load_test_fixtures(): | |
| """Loads fixtures into the test_fixtures list.""" | |
| global test_fixtures | |
| test_fixtures = [] | |
| env_path = os.environ.get("TEST_FIXTURES_PATH", "").strip() | |
| candidates = [env_path] if env_path else ["conversation_test_fixtures_v8.jsonl", "conversation_test_fixtures_v5.jsonl"] | |
| path = next((p for p in candidates if p and os.path.exists(p)), None) | |
| if not path: | |
| print("Warning: No test fixtures file found for evaluation.") | |
| return | |
| # Use the corrected v8 file if available | |
| if "conversation_test_fixtures_v8.jsonl" in path: | |
| print(f"Using corrected test fixtures: {path}") | |
| with open(path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| try: | |
| test_fixtures.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| print(f"Skipping malformed JSON line in {path}") | |
| print(f"Loaded {len(test_fixtures)} fixtures for evaluation from {path}") | |
| def evaluate_nlu_tags(expected: Dict[str, Any], actual: Dict[str, Any], tag_key: str, expected_key_override: str = None) -> Dict[str, float]: | |
| lookup_key = expected_key_override or tag_key | |
| expected_raw = expected.get(lookup_key, []) | |
| expected_set = set(expected_raw if isinstance(expected_raw, list) else [expected_raw]) if expected_raw and expected_raw != "None" else set() | |
| actual_raw = actual.get(tag_key, []) | |
| actual_set = set(actual_raw if isinstance(actual_raw, list) else [actual_raw]) if actual_raw and actual_raw != "None" else set() | |
| if not expected_set and not actual_set: | |
| return {"precision": 1.0, "recall": 1.0, "f1_score": 1.0} | |
| true_positives = len(expected_set.intersection(actual_set)) | |
| precision = true_positives / len(actual_set) if actual_set else 0.0 | |
| recall = true_positives / len(expected_set) if expected_set else 0.0 | |
| f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0 | |
| return {"precision": precision, "recall": recall, "f1_score": f1_score} | |
| def _parse_judge_json(raw_str: str) -> dict | None: | |
| try: | |
| start_brace = raw_str.find('{') | |
| end_brace = raw_str.rfind('}') | |
| if start_brace != -1 and end_brace > start_brace: | |
| json_str = raw_str[start_brace : end_brace + 1] | |
| return json.loads(json_str) | |
| return None | |
| except (json.JSONDecodeError, AttributeError): | |
| return None | |
| def run_comprehensive_evaluation( | |
| vs_general: FAISS, | |
| vs_personal: FAISS, | |
| nlu_vectorstore: FAISS, | |
| config: Dict[str, Any] | |
| ): | |
| global test_fixtures | |
| if not test_fixtures: | |
| return "No test fixtures loaded. Please ensure conversation_test_fixtures_v8.jsonl exists.", [], [] | |
| def _norm(label: str) -> str: | |
| label = (label or "").strip().lower() | |
| return "factual_question" if "factual" in label else label | |
| print("Starting comprehensive evaluation...") | |
| results: List[Dict[str, Any]] = [] | |
| # ADD THESE LINES: | |
| total_fixtures = len(test_fixtures) | |
| print(f"\nπ STARTING EVALUATION on {total_fixtures} test cases...") | |
| # In evaluate.py, before the evaluation loop | |
| print("--- DEBUG: Checking personal vector store before evaluation ---") | |
| if vs_personal and hasattr(vs_personal.docstore, '_dict'): | |
| print(f"Personal vector store contains {len(vs_personal.docstore._dict)} documents.") | |
| else: | |
| print("Personal vector store appears to be empty or invalid.") | |
| # REPLACE the original for loop with this one to get the counter 'i' | |
| for i, fx in enumerate(test_fixtures): | |
| # for fx in test_fixtures: | |
| test_id = fx.get("test_id", "N/A") | |
| # This print statement now works because we have 'i' | |
| print(f"--- Processing Test Case {i+1}/{total_fixtures}: ID = {test_id} ---") | |
| turns = fx.get("turns") or [] | |
| api_chat_history = [{"role": t.get("role"), "content": t.get("text")} for t in turns] | |
| query = next((t["content"] for t in reversed(api_chat_history) if (t.get("role") or "user").lower() == "user"), "") | |
| if not query: continue | |
| ground_truth = fx.get("ground_truth", {}) | |
| expected_route = _norm(ground_truth.get("expected_route", "caregiving_scenario")) | |
| expected_tags = ground_truth.get("expected_tags", {}) | |
| actual_route = _norm(route_query_type(query)) | |
| route_correct = (actual_route == expected_route) | |
| actual_tags: Dict[str, Any] = {} | |
| if "caregiving_scenario" in actual_route: | |
| actual_tags = detect_tags_from_query( | |
| query, nlu_vectorstore=nlu_vectorstore, | |
| behavior_options=config["behavior_tags"], emotion_options=config["emotion_tags"], | |
| topic_options=config["topic_tags"], context_options=config["context_tags"], | |
| ) | |
| behavior_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_behaviors") | |
| emotion_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_emotion") | |
| topic_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_topics") | |
| context_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_contexts") | |
| final_tags = {} | |
| if "caregiving_scenario" in actual_route: | |
| final_tags = { | |
| "scenario_tag": (actual_tags.get("detected_behaviors") or [None])[0], | |
| "emotion_tag": actual_tags.get("detected_emotion"), | |
| "topic_tag": (actual_tags.get("detected_topics") or [None])[0], | |
| "context_tags": actual_tags.get("detected_contexts", []) | |
| } | |
| current_test_role = fx.get("test_role", "patient") | |
| rag_chain = make_rag_chain(vs_general, vs_personal, role=current_test_role) | |
| t0 = time.time() | |
| response = answer_query(rag_chain, query, query_type=actual_route, chat_history=api_chat_history, **final_tags) | |
| latency_ms = round((time.time() - t0) * 1000.0, 1) | |
| answer_text = response.get("answer", "ERROR") | |
| expected_sources_set = set(map(str, ground_truth.get("expected_sources", []))) | |
| raw_sources = response.get("sources", []) | |
| actual_sources_set = set(map(str, raw_sources if isinstance(raw_sources, (list, tuple)) else [raw_sources])) | |
| # --- START: ADD THIS STRATEGIC PRINT BLOCK --- | |
| print("\n" + "-"*20 + " SOURCE EVALUATION " + "-"*20) | |
| print(f" - Expected: {sorted(list(expected_sources_set))}") | |
| print(f" - Actual: {sorted(list(actual_sources_set))}") | |
| true_positives = expected_sources_set.intersection(actual_sources_set) | |
| false_positives = actual_sources_set - expected_sources_set | |
| false_negatives = expected_sources_set - actual_sources_set | |
| if not false_positives and not false_negatives: | |
| print(" - Result: β Perfect Match!") | |
| else: | |
| if false_positives: | |
| print(f" - π» False Positives (hurts precision): {sorted(list(false_positives))}") | |
| if false_negatives: | |
| print(f" - π» False Negatives (hurts recall): {sorted(list(false_negatives))}") | |
| print("-"*59 + "\n") | |
| # --- END: ADD THIS STRATEGIC PRINT BLOCK --- | |
| context_precision, context_recall = 0.0, 0.0 | |
| if expected_sources_set or actual_sources_set: | |
| true_positives = len(expected_sources_set.intersection(actual_sources_set)) | |
| if len(actual_sources_set) > 0: context_precision = true_positives / len(actual_sources_set) | |
| if len(expected_sources_set) > 0: context_recall = true_positives / len(expected_sources_set) | |
| elif not expected_sources_set and not actual_sources_set: | |
| context_precision, context_recall = 1.0, 1.0 | |
| answer_correctness_score = None | |
| ground_truth_answer = ground_truth.get("ground_truth_answer") | |
| if ground_truth_answer and "ERROR" not in answer_text: | |
| try: | |
| judge_msg = ANSWER_CORRECTNESS_JUDGE_PROMPT.format(ground_truth_answer=ground_truth_answer, generated_answer=answer_text) | |
| raw_correctness = call_llm([{"role": "user", "content": judge_msg}], temperature=0.0) | |
| correctness_data = _parse_judge_json(raw_correctness) | |
| if correctness_data and "correctness_score" in correctness_data: | |
| answer_correctness_score = float(correctness_data["correctness_score"]) | |
| except Exception as e: | |
| print(f"ERROR during answer correctness judging: {e}") | |
| faithfulness = None | |
| source_docs = response.get("source_documents", []) | |
| if source_docs and "ERROR" not in answer_text: | |
| context_blob = "\n---\n".join([doc.page_content for doc in source_docs]) | |
| judge_msg = FAITHFULNESS_JUDGE_PROMPT.format(query=query, answer=answer_text, sources=context_blob) | |
| try: | |
| if context_blob.strip(): | |
| raw = call_llm([{"role": "user", "content": judge_msg}], temperature=0.0) | |
| data = _parse_judge_json(raw) | |
| if data: | |
| denom = data.get("supported", 0) + data.get("contradicted", 0) + data.get("not_enough_info", 0) | |
| if denom > 0: faithfulness = round(data.get("supported", 0) / denom, 3) | |
| elif data.get("ignored", 0) > 0: faithfulness = 1.0 | |
| except Exception as e: | |
| print(f"ERROR during faithfulness judging: {e}") | |
| sources_pretty = ", ".join(sorted(s)) if (s:=actual_sources_set) else "" | |
| results.append({ | |
| "test_id": fx.get("test_id", "N/A"), "title": fx.get("title", "N/A"), | |
| "route_correct": "β " if route_correct else "β", "expected_route": expected_route, "actual_route": actual_route, | |
| "behavior_f1": f"{behavior_metrics['f1_score']:.2f}", "emotion_f1": f"{emotion_metrics['f1_score']:.2f}", | |
| "topic_f1": f"{topic_metrics['f1_score']:.2f}", "context_f1": f"{context_metrics['f1_score']:.2f}", | |
| "generated_answer": answer_text, "sources": sources_pretty, "source_count": len(actual_sources_set), | |
| "latency_ms": latency_ms, "faithfulness": faithfulness, | |
| "context_precision": context_precision, "context_recall": context_recall, | |
| "answer_correctness": answer_correctness_score, | |
| }) | |
| df = pd.DataFrame(results) | |
| output_path = "evaluation_results.csv" | |
| if not df.empty: | |
| cols = [ | |
| "test_id", "title", "route_correct", "expected_route", "actual_route", | |
| "context_precision", "context_recall", "faithfulness", "answer_correctness", | |
| "behavior_f1", "emotion_f1", "topic_f1", "context_f1", | |
| "source_count", "latency_ms", "sources", "generated_answer" | |
| ] | |
| df = df[[c for c in cols if c in df.columns]] | |
| df.to_csv(output_path, index=False, encoding="utf-8") | |
| print(f"Evaluation results saved to {output_path}") | |
| pct = df["route_correct"].value_counts(normalize=True).get("β ", 0) * 100 | |
| to_f = lambda s: pd.to_numeric(s, errors="coerce") | |
| cp_mean = to_f(df["context_precision"]).mean() | |
| cr_mean = to_f(df["context_recall"]).mean() | |
| faith_mean = to_f(df["faithfulness"]).mean() | |
| correct_mean = to_f(df["answer_correctness"]).mean() | |
| rag_with_sources_pct = (df["source_count"] > 0).mean() * 100 if "source_count" in df else 0 | |
| summary_text = f""" | |
| ## Evaluation Summary | |
| - **Routing Accuracy**: {pct:.2f}% | |
| - **Behaviour F1 (avg)**: {(to_f(df["behavior_f1"]).mean() * 100):.2f}% | |
| - **Emotion F1 (avg)**: {(to_f(df["emotion_f1"]).mean() * 100):.2f}% | |
| - **Topic F1 (avg)**: {(to_f(df["topic_f1"]).mean() * 100):.2f}% | |
| - **Context F1 (avg)**: {(to_f(df["context_f1"]).mean() * 100):.2f}% | |
| - **RAG: Context Precision**: {"N/A" if pd.isna(cp_mean) else f'{(cp_mean * 100):.1f}%'} | |
| - **RAG: Context Recall**: {"N/A" if pd.isna(cr_mean) else f'{(cr_mean * 100):.1f}%'} | |
| - **RAG: Faithfulness (LLM-judge)**: {"N/A" if pd.isna(faith_mean) else f'{(faith_mean * 100):.1f}%'} | |
| - **RAG: Answer Correctness (LLM-judge)**: {"N/A" if pd.isna(correct_mean) else f'{(correct_mean * 100):.1f}%'} | |
| - **RAG Answers w/ Sources**: {rag_with_sources_pct:.1f}% | |
| - **RAG: Avg Latency (ms)**: {to_f(df["latency_ms"]).mean():.1f} | |
| """ | |
| df_display = df.rename(columns={ | |
| "context_precision": "Ctx. Precision", "context_recall": "Ctx. Recall", | |
| "answer_correctness": "Answer Correct.", "faithfulness": "Faithfulness", | |
| "behavior_f1": "Behav. F1", "emotion_f1": "Emo. F1", "topic_f1": "Topic F1", "context_f1": "Ctx. F1" | |
| }) | |
| table_rows = df_display.values.tolist() | |
| headers = df_display.columns.tolist() | |
| else: | |
| summary_text = "No valid test fixtures found to evaluate." | |
| table_rows, headers = [], [] | |
| return summary_text, table_rows, headers | |