Spaces:
Running
Running
| # evaluate.py | |
| import os | |
| import json | |
| import time | |
| import re # <-- ADD THIS IMPORT | |
| import pandas as pd | |
| from typing import List, Dict, Any | |
| from pathlib import Path | |
| # --- Imports from the main application --- | |
| # In evaluate.py | |
| try: | |
| from alz_companion.agent import ( | |
| make_rag_chain, route_query_type, detect_tags_from_query, | |
| answer_query, call_llm, build_or_load_vectorstore | |
| ) | |
| from alz_companion.prompts import FAITHFULNESS_JUDGE_PROMPT | |
| from langchain_community.vectorstores import FAISS | |
| # --- Also move this import inside the try block for consistency --- | |
| from langchain.schema import Document | |
| except ImportError: | |
| # --- START: FALLBACK DEFINITIONS --- | |
| class FAISS: | |
| def __init__(self): self.docstore = type('obj', (object,), {'_dict': {}})() | |
| def add_documents(self, docs): pass | |
| def save_local(self, path): pass | |
| def from_documents(cls, docs, embeddings=None): return cls() | |
| class Document: | |
| def __init__(self, page_content, metadata=None): | |
| self.page_content = page_content | |
| self.metadata = metadata or {} | |
| def make_rag_chain(*args, **kwargs): return lambda q, **k: {"answer": f"(Eval Fallback) You asked: {q}", "sources": []} | |
| def route_query_type(q, **kwargs): return "general_conversation" | |
| def detect_tags_from_query(*args, **kwargs): return {} | |
| def answer_query(chain, q, **kwargs): return chain(q, **kwargs) | |
| def call_llm(*args, **kwargs): return "{}" | |
| # --- ADD FALLBACK DEFINITION FOR THE MISSING FUNCTION --- | |
| def build_or_load_vectorstore(docs, index_path, is_personal=False): | |
| return FAISS() | |
| # --- END OF ADDITION --- | |
| FAITHFULNESS_JUDGE_PROMPT = "" | |
| print("WARNING: Could not import from alz_companion. Evaluation functions will use fallbacks.") | |
| # --- END: FALLBACK DEFINITIONS --- | |
| # --- LLM-as-a-Judge Prompt for Answer Correctness --- | |
| ANSWER_CORRECTNESS_JUDGE_PROMPT = """You are an expert evaluator. Your task is to assess the factual correctness of a generated answer against a ground truth answer. | |
| - GROUND_TRUTH_ANSWER: This is the gold-standard, correct answer. | |
| - GENERATED_ANSWER: This is the answer produced by the AI model. | |
| Evaluate if the GENERATED_ANSWER is factually aligned with the GROUND_TRUTH_ANSWER. Ignore minor differences in phrasing, tone, or structure. The key is factual accuracy. | |
| Respond with a single JSON object containing a float score from 0.0 to 1.0. | |
| - 1.0: The generated answer is factually correct and aligns perfectly with the ground truth. | |
| - 0.5: The generated answer is partially correct but misses key information or contains minor inaccuracies. | |
| - 0.0: The generated answer is factually incorrect or contradicts the ground truth. | |
| --- DATA TO EVALUATE --- | |
| GROUND_TRUTH_ANSWER: | |
| {ground_truth_answer} | |
| GENERATED_ANSWER: | |
| {generated_answer} | |
| --- | |
| Return a single JSON object with your score: | |
| {{ | |
| "correctness_score": <float> | |
| }} | |
| """ | |
| test_fixtures = [] | |
| def load_test_fixtures(): | |
| """Loads fixtures into the test_fixtures list.""" | |
| global test_fixtures | |
| test_fixtures = [] | |
| env_path = os.environ.get("TEST_FIXTURES_PATH", "").strip() | |
| # --- START: DEFINITIVE FIX --- | |
| # The old code used a relative path, which is unreliable. | |
| # This new code builds an absolute path to the fixture file based on | |
| # the location of this (evaluate.py) script. | |
| script_dir = Path(__file__).parent | |
| default_fixture_file = script_dir / "small_test_cases_v10.jsonl" | |
| candidates = [env_path] if env_path else [str(default_fixture_file)] | |
| # --- END: DEFINITIVE FIX --- | |
| # candidates = [env_path] if env_path else ["conversation_test_fixtures_v10.jsonl"] | |
| # candidates = [env_path] if env_path else ["small_test_cases_v10.jsonl"] | |
| path = next((p for p in candidates if p and os.path.exists(p)), None) | |
| if not path: | |
| print("Warning: No test fixtures file found for evaluation.") | |
| return | |
| # Use the corrected v10 file if available | |
| # if "conversation_test_fixtures_v10.jsonl" in path: | |
| if "small_test_cases_v10.jsonl" in path: | |
| print(f"Using corrected test fixtures: {path}") | |
| with open(path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| try: | |
| test_fixtures.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| print(f"Skipping malformed JSON line in {path}") | |
| print(f"Loaded {len(test_fixtures)} fixtures for evaluation from {path}") | |
| def evaluate_nlu_tags(expected: Dict[str, Any], actual: Dict[str, Any], tag_key: str, expected_key_override: str = None) -> Dict[str, float]: | |
| lookup_key = expected_key_override or tag_key | |
| expected_raw = expected.get(lookup_key, []) | |
| expected_set = set(expected_raw if isinstance(expected_raw, list) else [expected_raw]) if expected_raw and expected_raw != "None" else set() | |
| actual_raw = actual.get(tag_key, []) | |
| actual_set = set(actual_raw if isinstance(actual_raw, list) else [actual_raw]) if actual_raw and actual_raw != "None" else set() | |
| if not expected_set and not actual_set: | |
| return {"precision": 1.0, "recall": 1.0, "f1_score": 1.0} | |
| true_positives = len(expected_set.intersection(actual_set)) | |
| precision = true_positives / len(actual_set) if actual_set else 0.0 | |
| recall = true_positives / len(expected_set) if expected_set else 0.0 | |
| f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0 | |
| return {"precision": precision, "recall": recall, "f1_score": f1_score} | |
| def _parse_judge_json(raw_str: str) -> dict | None: | |
| try: | |
| start_brace = raw_str.find('{') | |
| end_brace = raw_str.rfind('}') | |
| if start_brace != -1 and end_brace > start_brace: | |
| json_str = raw_str[start_brace : end_brace + 1] | |
| return json.loads(json_str) | |
| return None | |
| except (json.JSONDecodeError, AttributeError): | |
| return None | |
| # --- NEW: helpers for categorisation and error-class labelling --- | |
| def _categorize_test(test_id: str) -> str: | |
| tid = (test_id or "").lower() | |
| if "synonym" in tid: return "synonym" | |
| if "multi_fact" in tid or "multi-hop" in tid or "multihop" in tid: return "multi_fact" | |
| if "omission" in tid: return "omission" | |
| if "hallucination" in tid: return "hallucination" | |
| if "time" in tid or "temporal" in tid: return "temporal" | |
| if "context" in tid: return "context_disambig" | |
| return "baseline" | |
| def _classify_error(gt: str, gen: str) -> str: | |
| import re | |
| gt = (gt or "").strip().lower() | |
| gen = (gen or "").strip().lower() | |
| if not gen: | |
| return "empty" | |
| if not gt: | |
| return "hallucination" if gen else "empty" | |
| if gt in gen: | |
| return "paraphrase" | |
| gt_tokens = set([t for t in re.split(r'\W+', gt) if t]) | |
| gen_tokens = set([t for t in re.split(r'\W+', gen) if t]) | |
| overlap = len(gt_tokens & gen_tokens) / max(1, len(gt_tokens)) | |
| if overlap >= 0.3: | |
| return "omission" | |
| return "contradiction" | |
| ## NEW | |
| # In evaluate.py | |
| def run_comprehensive_evaluation( | |
| vs_general: FAISS, | |
| vs_personal: FAISS, | |
| nlu_vectorstore: FAISS, | |
| config: Dict[str, Any], | |
| storage_path: Path # <-- ADD THIS PARAMETER | |
| ): | |
| global test_fixtures | |
| if not test_fixtures: | |
| # The return signature is now back to 3 items. | |
| return "No test fixtures loaded.", [], [] | |
| vs_personal_test = None | |
| personal_context_docs = [] | |
| personal_context_file = "sample_data/1 Complaints of a Dutiful Daughter.txt" | |
| if os.path.exists(personal_context_file): | |
| print(f"Found personal context file for evaluation: '{personal_context_file}'") | |
| with open(personal_context_file, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| doc = Document(page_content=content, metadata={"source": os.path.basename(personal_context_file)}) | |
| personal_context_docs.append(doc) | |
| else: | |
| print(f"WARNING: Personal context file not found at '{personal_context_file}'. Factual tests will likely fail.") | |
| vs_personal_test = build_or_load_vectorstore( | |
| personal_context_docs, | |
| index_path="tmp/eval_personal_index", | |
| is_personal=True | |
| ) | |
| print(f"Successfully created temporary personal vectorstore with {len(personal_context_docs)} document(s) for this evaluation run.") | |
| def _norm(label: str) -> str: | |
| label = (label or "").strip().lower() | |
| return "factual_question" if "factual" in label else label | |
| print("Starting comprehensive evaluation...") | |
| results: List[Dict[str, Any]] = [] | |
| total_fixtures = len(test_fixtures) | |
| print(f"\nπ STARTING EVALUATION on {total_fixtures} test cases...") | |
| for i, fx in enumerate(test_fixtures): | |
| test_id = fx.get("test_id", "N/A") | |
| print(f"--- Processing Test Case {i+1}/{total_fixtures}: ID = {test_id} ---") | |
| turns = fx.get("turns") or [] | |
| api_chat_history = [{"role": t.get("role"), "content": t.get("text")} for t in turns] | |
| query = next((t["content"] for t in reversed(api_chat_history) if (t.get("role") or "user").lower() == "user"), "") | |
| if not query: continue | |
| print(f'Query: "{query}"') | |
| ground_truth = fx.get("ground_truth", {}) | |
| expected_route = _norm(ground_truth.get("expected_route", "caregiving_scenario")) | |
| expected_tags = ground_truth.get("expected_tags", {}) | |
| actual_route = _norm(route_query_type(query)) | |
| route_correct = (actual_route == expected_route) | |
| actual_tags: Dict[str, Any] = {} | |
| if "caregiving_scenario" in actual_route: | |
| actual_tags = detect_tags_from_query( | |
| query, nlu_vectorstore=nlu_vectorstore, | |
| behavior_options=config["behavior_tags"], emotion_options=config["emotion_tags"], | |
| topic_options=config["topic_tags"], context_options=config["context_tags"], | |
| ) | |
| behavior_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_behaviors") | |
| emotion_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_emotion") | |
| topic_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_topics") | |
| context_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_contexts") | |
| final_tags = {} | |
| if "caregiving_scenario" in actual_route: | |
| final_tags = { | |
| "scenario_tag": (actual_tags.get("detected_behaviors") or [None])[0], | |
| "emotion_tag": actual_tags.get("detected_emotion"), | |
| "topic_tag": (actual_tags.get("detected_topics") or [None])[0], | |
| "context_tags": actual_tags.get("detected_contexts", []) | |
| } | |
| current_test_role = fx.get("test_role", "patient") | |
| rag_chain = make_rag_chain( | |
| vs_general, | |
| vs_personal, | |
| role=current_test_role, | |
| for_evaluation=True | |
| ) | |
| t0 = time.time() | |
| response = answer_query(rag_chain, query, query_type=actual_route, chat_history=api_chat_history, **final_tags) | |
| latency_ms = round((time.time() - t0) * 1000.0, 1) | |
| answer_text = response.get("answer", "ERROR") | |
| ground_truth_answer = ground_truth.get("ground_truth_answer") | |
| category = _categorize_test(test_id) | |
| error_class = _classify_error(ground_truth_answer, answer_text) | |
| expected_sources_set = set(map(str, ground_truth.get("expected_sources", []))) | |
| raw_sources = response.get("sources", []) | |
| actual_sources_set = set(map(str, raw_sources if isinstance(raw_sources, (list, tuple)) else [raw_sources])) | |
| print("\n" + "-"*20 + " SOURCE EVALUATION " + "-"*20) | |
| print(f" - Expected: {sorted(list(expected_sources_set))}") | |
| print(f" - Actual: {sorted(list(actual_sources_set))}") | |
| true_positives = expected_sources_set.intersection(actual_sources_set) | |
| false_positives = actual_sources_set - expected_sources_set | |
| false_negatives = expected_sources_set - actual_sources_set | |
| if not false_positives and not false_negatives: | |
| print(" - Result: β Perfect Match!") | |
| else: | |
| if false_positives: | |
| print(f" - π» False Positives (hurts precision): {sorted(list(false_positives))}") | |
| if false_negatives: | |
| print(f" - π» False Negatives (hurts recall): {sorted(list(false_negatives))}") | |
| print("-"*59 + "\n") | |
| context_precision, context_recall = 0.0, 0.0 | |
| if expected_sources_set or actual_sources_set: | |
| tp = len(expected_sources_set.intersection(actual_sources_set)) | |
| if len(actual_sources_set) > 0: context_precision = tp / len(actual_sources_set) | |
| if len(expected_sources_set) > 0: context_recall = tp / len(expected_sources_set) | |
| elif not expected_sources_set and not actual_sources_set: | |
| context_precision, context_recall = 1.0, 1.0 | |
| # TURN DEBUG on Answer Correctness | |
| # print("\n" + "-"*20 + " ANSWER & CORRECTNESS EVALUATION " + "-"*20) | |
| # print(f" - Ground Truth Answer: {ground_truth_answer}") | |
| # print(f" - Generated Answer: {answer_text}") | |
| # print("-" * 59) | |
| answer_correctness_score = None | |
| if ground_truth_answer and "ERROR" not in answer_text: | |
| try: | |
| judge_msg = ANSWER_CORRECTNESS_JUDGE_PROMPT.format(ground_truth_answer=ground_truth_answer, generated_answer=answer_text) | |
| print(f" - Judge Prompt Sent:\n{judge_msg}") | |
| raw_correctness = call_llm([{"role": "user", "content": judge_msg}], temperature=0.0) | |
| print(f" - Judge Raw Response: {raw_correctness}") | |
| correctness_data = _parse_judge_json(raw_correctness) | |
| if correctness_data and "correctness_score" in correctness_data: | |
| answer_correctness_score = float(correctness_data["correctness_score"]) | |
| print(f" - Final Score: {answer_correctness_score}") | |
| except Exception as e: | |
| print(f"ERROR during answer correctness judging: {e}") | |
| faithfulness = None | |
| hallucination_rate = None | |
| source_docs = response.get("source_documents", []) | |
| if source_docs and "ERROR" not in answer_text: | |
| context_blob = "\n---\n".join([doc.page_content for doc in source_docs]) | |
| judge_msg = FAITHFULNESS_JUDGE_PROMPT.format(query=query, answer=answer_text, sources=context_blob) | |
| try: | |
| if context_blob.strip(): | |
| raw = call_llm([{"role": "user", "content": judge_msg}], temperature=0.0) | |
| data = _parse_judge_json(raw) | |
| if data: | |
| denom = data.get("supported", 0) + data.get("contradicted", 0) + data.get("not_enough_info", 0) | |
| if denom > 0: | |
| faithfulness = round(data.get("supported", 0) / denom, 3) | |
| hallucination_rate = 1.0 - faithfulness | |
| elif data.get("ignored", 0) > 0: | |
| faithfulness = 1.0 | |
| hallucination_rate = 0.0 | |
| except Exception as e: | |
| print(f"ERROR during faithfulness judging: {e}") | |
| sources_pretty = ", ".join(sorted(s)) if (s:=actual_sources_set) else "" | |
| results.append({ | |
| "test_id": fx.get("test_id", "N/A"), "title": fx.get("title", "N/A"), | |
| "route_correct": "β " if route_correct else "β", "expected_route": expected_route, "actual_route": actual_route, | |
| "behavior_f1": f"{behavior_metrics['f1_score']:.2f}", "emotion_f1": f"{emotion_metrics['f1_score']:.2f}", | |
| "topic_f1": f"{topic_metrics['f1_score']:.2f}", "context_f1": f"{context_metrics['f1_score']:.2f}", | |
| "generated_answer": answer_text, "sources": sources_pretty, "source_count": len(actual_sources_set), | |
| "context_precision": context_precision, "context_recall": context_recall, | |
| "faithfulness": faithfulness, "hallucination_rate": hallucination_rate, | |
| "answer_correctness": answer_correctness_score, | |
| "category": category, "error_class": error_class, | |
| "latency_ms": latency_ms | |
| }) | |
| df = pd.DataFrame(results) | |
| summary_text, table_rows, headers = "No valid test fixtures found to evaluate.", [], [] | |
| if not df.empty: | |
| # Add "hallucination_rate" to this list of columns to ensure it is not dropped. | |
| cols = [ | |
| "test_id", "title", "route_correct", "expected_route", "actual_route", | |
| "behavior_f1", "emotion_f1", "topic_f1", "context_f1", | |
| "generated_answer", "sources", "source_count", | |
| "context_precision", "context_recall", | |
| "faithfulness", "hallucination_rate", | |
| "answer_correctness", | |
| "category", "error_class", "latency_ms", | |
| ] | |
| df = df[[c for c in cols if c in df.columns]] | |
| # --- START OF MODIFICATION --- | |
| pct = df["route_correct"].value_counts(normalize=True).get("β ", 0) * 100 | |
| to_f = lambda s: pd.to_numeric(s, errors="coerce") | |
| # Calculate the mean for the NLU F1 scores | |
| bf1_mean = to_f(df["behavior_f1"]).mean() * 100 | |
| ef1_mean = to_f(df["emotion_f1"]).mean() * 100 | |
| tf1_mean = to_f(df["topic_f1"]).mean() * 100 | |
| cf1_mean = to_f(df["context_f1"]).mean() * 100 | |
| # Calculate the mean for Faithfulness | |
| faith_mean = to_f(df["faithfulness"]).mean() * 100 | |
| # --- CHANGE 6: Calculate the mean for the new metric --- | |
| halluc_mean = to_f(df["hallucination_rate"]).mean() * 100 | |
| rag_with_sources_pct = (df["source_count"] > 0).mean() * 100 if "source_count" in df else 0 | |
| # Add the NLU metrics to the summary f-string | |
| # Choose to use Hallucination - **RAG: Faithfulness**: {faith_mean:.1f}% | |
| summary_text = f"""## Evaluation Summary | |
| - **Routing Accuracy**: {pct:.2f}% | |
| - **Behaviour F1 (avg)**: {bf1_mean:.2f}% | |
| - **Emotion F1 (avg)**: {ef1_mean:.2f}% | |
| - **Topic F1 (avg)**: {tf1_mean:.2f}% | |
| - **Context F1 (avg)**: {cf1_mean:.2f}% | |
| - **RAG: Context Precision**: {(to_f(df["context_precision"]).mean() * 100):.1f}% | |
| - **RAG: Context Recall**: {(to_f(df["context_recall"]).mean() * 100):.1f}% | |
| - **RAG Answers w/ Sources**: {rag_with_sources_pct:.1f}% | |
| - **RAG: Hallucination Rate**: {halluc_mean:.1f}% (Lower is better) | |
| - **RAG: Answer Correctness (LLM-judge)**: {(to_f(df["answer_correctness"]).mean() * 100):.1f}% | |
| - **RAG: Avg Latency (ms)**: {to_f(df["latency_ms"]).mean():.1f} | |
| """ | |
| # --- END OF MODIFICATION --- | |
| print(summary_text) | |
| df_display = df.rename(columns={"context_precision": "Ctx. Precision", "context_recall": "Ctx. Recall"}) | |
| table_rows = df_display.values.tolist() | |
| headers = df_display.columns.tolist() | |
| # --- NEW: per-category averages --- | |
| try: | |
| cat_means = df.groupby("category")["answer_correctness"].mean().reset_index() | |
| print("\nπ Correctness by Category:") | |
| print(cat_means.to_string(index=False)) | |
| except Exception as e: | |
| print(f"WARNING: Could not compute category breakdown: {e}") | |
| # --- NEW: confusion-style matrix --- | |
| try: | |
| confusion = pd.crosstab(df.get("category", []), df.get("error_class", []), | |
| rownames=["Category"], colnames=["Error Class"], dropna=False) | |
| print("\nπ Error Class Distribution by Category:") | |
| print(confusion.to_string()) | |
| except Exception as e: | |
| print(f"WARNING: Could not build confusion matrix: {e}") | |
| # END | |
| else: | |
| summary_text = "No valid test fixtures found to evaluate." | |
| table_rows, headers = [], [] | |
| return summary_text, table_rows, headers | |
| # return summary_text, table_rows | |
| ## END |