AD_Multimodal_Chatbot / evaluate.py
KeenWoo's picture
Update evaluate.py
47ca460 verified
raw
history blame
20.1 kB
# evaluate.py
import os
import json
import time
import re # <-- ADD THIS IMPORT
import pandas as pd
from typing import List, Dict, Any
from pathlib import Path
# --- Imports from the main application ---
# In evaluate.py
try:
from alz_companion.agent import (
make_rag_chain, route_query_type, detect_tags_from_query,
answer_query, call_llm, build_or_load_vectorstore
)
from alz_companion.prompts import FAITHFULNESS_JUDGE_PROMPT
from langchain_community.vectorstores import FAISS
# --- Also move this import inside the try block for consistency ---
from langchain.schema import Document
except ImportError:
# --- START: FALLBACK DEFINITIONS ---
class FAISS:
def __init__(self): self.docstore = type('obj', (object,), {'_dict': {}})()
def add_documents(self, docs): pass
def save_local(self, path): pass
@classmethod
def from_documents(cls, docs, embeddings=None): return cls()
class Document:
def __init__(self, page_content, metadata=None):
self.page_content = page_content
self.metadata = metadata or {}
def make_rag_chain(*args, **kwargs): return lambda q, **k: {"answer": f"(Eval Fallback) You asked: {q}", "sources": []}
def route_query_type(q, **kwargs): return "general_conversation"
def detect_tags_from_query(*args, **kwargs): return {}
def answer_query(chain, q, **kwargs): return chain(q, **kwargs)
def call_llm(*args, **kwargs): return "{}"
# --- ADD FALLBACK DEFINITION FOR THE MISSING FUNCTION ---
def build_or_load_vectorstore(docs, index_path, is_personal=False):
return FAISS()
# --- END OF ADDITION ---
FAITHFULNESS_JUDGE_PROMPT = ""
print("WARNING: Could not import from alz_companion. Evaluation functions will use fallbacks.")
# --- END: FALLBACK DEFINITIONS ---
# --- LLM-as-a-Judge Prompt for Answer Correctness ---
ANSWER_CORRECTNESS_JUDGE_PROMPT = """You are an expert evaluator. Your task is to assess the factual correctness of a generated answer against a ground truth answer.
- GROUND_TRUTH_ANSWER: This is the gold-standard, correct answer.
- GENERATED_ANSWER: This is the answer produced by the AI model.
Evaluate if the GENERATED_ANSWER is factually aligned with the GROUND_TRUTH_ANSWER. Ignore minor differences in phrasing, tone, or structure. The key is factual accuracy.
Respond with a single JSON object containing a float score from 0.0 to 1.0.
- 1.0: The generated answer is factually correct and aligns perfectly with the ground truth.
- 0.5: The generated answer is partially correct but misses key information or contains minor inaccuracies.
- 0.0: The generated answer is factually incorrect or contradicts the ground truth.
--- DATA TO EVALUATE ---
GROUND_TRUTH_ANSWER:
{ground_truth_answer}
GENERATED_ANSWER:
{generated_answer}
---
Return a single JSON object with your score:
{{
"correctness_score": <float>
}}
"""
test_fixtures = []
def load_test_fixtures():
"""Loads fixtures into the test_fixtures list."""
global test_fixtures
test_fixtures = []
env_path = os.environ.get("TEST_FIXTURES_PATH", "").strip()
# --- START: DEFINITIVE FIX ---
# The old code used a relative path, which is unreliable.
# This new code builds an absolute path to the fixture file based on
# the location of this (evaluate.py) script.
script_dir = Path(__file__).parent
default_fixture_file = script_dir / "small_test_cases_v10.jsonl"
candidates = [env_path] if env_path else [str(default_fixture_file)]
# --- END: DEFINITIVE FIX ---
# candidates = [env_path] if env_path else ["conversation_test_fixtures_v10.jsonl"]
# candidates = [env_path] if env_path else ["small_test_cases_v10.jsonl"]
path = next((p for p in candidates if p and os.path.exists(p)), None)
if not path:
print("Warning: No test fixtures file found for evaluation.")
return
# Use the corrected v10 file if available
# if "conversation_test_fixtures_v10.jsonl" in path:
if "small_test_cases_v10.jsonl" in path:
print(f"Using corrected test fixtures: {path}")
with open(path, "r", encoding="utf-8") as f:
for line in f:
try:
test_fixtures.append(json.loads(line))
except json.JSONDecodeError:
print(f"Skipping malformed JSON line in {path}")
print(f"Loaded {len(test_fixtures)} fixtures for evaluation from {path}")
def evaluate_nlu_tags(expected: Dict[str, Any], actual: Dict[str, Any], tag_key: str, expected_key_override: str = None) -> Dict[str, float]:
lookup_key = expected_key_override or tag_key
expected_raw = expected.get(lookup_key, [])
expected_set = set(expected_raw if isinstance(expected_raw, list) else [expected_raw]) if expected_raw and expected_raw != "None" else set()
actual_raw = actual.get(tag_key, [])
actual_set = set(actual_raw if isinstance(actual_raw, list) else [actual_raw]) if actual_raw and actual_raw != "None" else set()
if not expected_set and not actual_set:
return {"precision": 1.0, "recall": 1.0, "f1_score": 1.0}
true_positives = len(expected_set.intersection(actual_set))
precision = true_positives / len(actual_set) if actual_set else 0.0
recall = true_positives / len(expected_set) if expected_set else 0.0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
return {"precision": precision, "recall": recall, "f1_score": f1_score}
def _parse_judge_json(raw_str: str) -> dict | None:
try:
start_brace = raw_str.find('{')
end_brace = raw_str.rfind('}')
if start_brace != -1 and end_brace > start_brace:
json_str = raw_str[start_brace : end_brace + 1]
return json.loads(json_str)
return None
except (json.JSONDecodeError, AttributeError):
return None
# --- NEW: helpers for categorisation and error-class labelling ---
def _categorize_test(test_id: str) -> str:
tid = (test_id or "").lower()
if "synonym" in tid: return "synonym"
if "multi_fact" in tid or "multi-hop" in tid or "multihop" in tid: return "multi_fact"
if "omission" in tid: return "omission"
if "hallucination" in tid: return "hallucination"
if "time" in tid or "temporal" in tid: return "temporal"
if "context" in tid: return "context_disambig"
return "baseline"
def _classify_error(gt: str, gen: str) -> str:
import re
gt = (gt or "").strip().lower()
gen = (gen or "").strip().lower()
if not gen:
return "empty"
if not gt:
return "hallucination" if gen else "empty"
if gt in gen:
return "paraphrase"
gt_tokens = set([t for t in re.split(r'\W+', gt) if t])
gen_tokens = set([t for t in re.split(r'\W+', gen) if t])
overlap = len(gt_tokens & gen_tokens) / max(1, len(gt_tokens))
if overlap >= 0.3:
return "omission"
return "contradiction"
## NEW
# In evaluate.py
def run_comprehensive_evaluation(
vs_general: FAISS,
vs_personal: FAISS,
nlu_vectorstore: FAISS,
config: Dict[str, Any],
storage_path: Path # <-- ADD THIS PARAMETER
):
global test_fixtures
if not test_fixtures:
# The return signature is now back to 3 items.
return "No test fixtures loaded.", [], []
vs_personal_test = None
personal_context_docs = []
personal_context_file = "sample_data/1 Complaints of a Dutiful Daughter.txt"
if os.path.exists(personal_context_file):
print(f"Found personal context file for evaluation: '{personal_context_file}'")
with open(personal_context_file, "r", encoding="utf-8") as f:
content = f.read()
doc = Document(page_content=content, metadata={"source": os.path.basename(personal_context_file)})
personal_context_docs.append(doc)
else:
print(f"WARNING: Personal context file not found at '{personal_context_file}'. Factual tests will likely fail.")
vs_personal_test = build_or_load_vectorstore(
personal_context_docs,
index_path="tmp/eval_personal_index",
is_personal=True
)
print(f"Successfully created temporary personal vectorstore with {len(personal_context_docs)} document(s) for this evaluation run.")
def _norm(label: str) -> str:
label = (label or "").strip().lower()
return "factual_question" if "factual" in label else label
print("Starting comprehensive evaluation...")
results: List[Dict[str, Any]] = []
total_fixtures = len(test_fixtures)
print(f"\nπŸš€ STARTING EVALUATION on {total_fixtures} test cases...")
for i, fx in enumerate(test_fixtures):
test_id = fx.get("test_id", "N/A")
print(f"--- Processing Test Case {i+1}/{total_fixtures}: ID = {test_id} ---")
turns = fx.get("turns") or []
api_chat_history = [{"role": t.get("role"), "content": t.get("text")} for t in turns]
query = next((t["content"] for t in reversed(api_chat_history) if (t.get("role") or "user").lower() == "user"), "")
if not query: continue
print(f'Query: "{query}"')
ground_truth = fx.get("ground_truth", {})
expected_route = _norm(ground_truth.get("expected_route", "caregiving_scenario"))
expected_tags = ground_truth.get("expected_tags", {})
actual_route = _norm(route_query_type(query))
route_correct = (actual_route == expected_route)
actual_tags: Dict[str, Any] = {}
if "caregiving_scenario" in actual_route:
actual_tags = detect_tags_from_query(
query, nlu_vectorstore=nlu_vectorstore,
behavior_options=config["behavior_tags"], emotion_options=config["emotion_tags"],
topic_options=config["topic_tags"], context_options=config["context_tags"],
)
behavior_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_behaviors")
emotion_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_emotion")
topic_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_topics")
context_metrics = evaluate_nlu_tags(expected_tags, actual_tags, "detected_contexts")
final_tags = {}
if "caregiving_scenario" in actual_route:
final_tags = {
"scenario_tag": (actual_tags.get("detected_behaviors") or [None])[0],
"emotion_tag": actual_tags.get("detected_emotion"),
"topic_tag": (actual_tags.get("detected_topics") or [None])[0],
"context_tags": actual_tags.get("detected_contexts", [])
}
current_test_role = fx.get("test_role", "patient")
rag_chain = make_rag_chain(
vs_general,
vs_personal,
role=current_test_role,
for_evaluation=True
)
t0 = time.time()
response = answer_query(rag_chain, query, query_type=actual_route, chat_history=api_chat_history, **final_tags)
latency_ms = round((time.time() - t0) * 1000.0, 1)
answer_text = response.get("answer", "ERROR")
ground_truth_answer = ground_truth.get("ground_truth_answer")
category = _categorize_test(test_id)
error_class = _classify_error(ground_truth_answer, answer_text)
expected_sources_set = set(map(str, ground_truth.get("expected_sources", [])))
raw_sources = response.get("sources", [])
actual_sources_set = set(map(str, raw_sources if isinstance(raw_sources, (list, tuple)) else [raw_sources]))
print("\n" + "-"*20 + " SOURCE EVALUATION " + "-"*20)
print(f" - Expected: {sorted(list(expected_sources_set))}")
print(f" - Actual: {sorted(list(actual_sources_set))}")
true_positives = expected_sources_set.intersection(actual_sources_set)
false_positives = actual_sources_set - expected_sources_set
false_negatives = expected_sources_set - actual_sources_set
if not false_positives and not false_negatives:
print(" - Result: βœ… Perfect Match!")
else:
if false_positives:
print(f" - πŸ”» False Positives (hurts precision): {sorted(list(false_positives))}")
if false_negatives:
print(f" - πŸ”» False Negatives (hurts recall): {sorted(list(false_negatives))}")
print("-"*59 + "\n")
context_precision, context_recall = 0.0, 0.0
if expected_sources_set or actual_sources_set:
tp = len(expected_sources_set.intersection(actual_sources_set))
if len(actual_sources_set) > 0: context_precision = tp / len(actual_sources_set)
if len(expected_sources_set) > 0: context_recall = tp / len(expected_sources_set)
elif not expected_sources_set and not actual_sources_set:
context_precision, context_recall = 1.0, 1.0
# TURN DEBUG on Answer Correctness
# print("\n" + "-"*20 + " ANSWER & CORRECTNESS EVALUATION " + "-"*20)
# print(f" - Ground Truth Answer: {ground_truth_answer}")
# print(f" - Generated Answer: {answer_text}")
# print("-" * 59)
answer_correctness_score = None
if ground_truth_answer and "ERROR" not in answer_text:
try:
judge_msg = ANSWER_CORRECTNESS_JUDGE_PROMPT.format(ground_truth_answer=ground_truth_answer, generated_answer=answer_text)
print(f" - Judge Prompt Sent:\n{judge_msg}")
raw_correctness = call_llm([{"role": "user", "content": judge_msg}], temperature=0.0)
print(f" - Judge Raw Response: {raw_correctness}")
correctness_data = _parse_judge_json(raw_correctness)
if correctness_data and "correctness_score" in correctness_data:
answer_correctness_score = float(correctness_data["correctness_score"])
print(f" - Final Score: {answer_correctness_score}")
except Exception as e:
print(f"ERROR during answer correctness judging: {e}")
faithfulness = None
hallucination_rate = None
source_docs = response.get("source_documents", [])
if source_docs and "ERROR" not in answer_text:
context_blob = "\n---\n".join([doc.page_content for doc in source_docs])
judge_msg = FAITHFULNESS_JUDGE_PROMPT.format(query=query, answer=answer_text, sources=context_blob)
try:
if context_blob.strip():
raw = call_llm([{"role": "user", "content": judge_msg}], temperature=0.0)
data = _parse_judge_json(raw)
if data:
denom = data.get("supported", 0) + data.get("contradicted", 0) + data.get("not_enough_info", 0)
if denom > 0:
faithfulness = round(data.get("supported", 0) / denom, 3)
hallucination_rate = 1.0 - faithfulness
elif data.get("ignored", 0) > 0:
faithfulness = 1.0
hallucination_rate = 0.0
except Exception as e:
print(f"ERROR during faithfulness judging: {e}")
sources_pretty = ", ".join(sorted(s)) if (s:=actual_sources_set) else ""
results.append({
"test_id": fx.get("test_id", "N/A"), "title": fx.get("title", "N/A"),
"route_correct": "βœ…" if route_correct else "❌", "expected_route": expected_route, "actual_route": actual_route,
"behavior_f1": f"{behavior_metrics['f1_score']:.2f}", "emotion_f1": f"{emotion_metrics['f1_score']:.2f}",
"topic_f1": f"{topic_metrics['f1_score']:.2f}", "context_f1": f"{context_metrics['f1_score']:.2f}",
"generated_answer": answer_text, "sources": sources_pretty, "source_count": len(actual_sources_set),
"context_precision": context_precision, "context_recall": context_recall,
"faithfulness": faithfulness, "hallucination_rate": hallucination_rate,
"answer_correctness": answer_correctness_score,
"category": category, "error_class": error_class,
"latency_ms": latency_ms
})
df = pd.DataFrame(results)
summary_text, table_rows, headers = "No valid test fixtures found to evaluate.", [], []
if not df.empty:
# Add "hallucination_rate" to this list of columns to ensure it is not dropped.
cols = [
"test_id", "title", "route_correct", "expected_route", "actual_route",
"behavior_f1", "emotion_f1", "topic_f1", "context_f1",
"generated_answer", "sources", "source_count",
"context_precision", "context_recall",
"faithfulness", "hallucination_rate",
"answer_correctness",
"category", "error_class", "latency_ms",
]
df = df[[c for c in cols if c in df.columns]]
# --- START OF MODIFICATION ---
pct = df["route_correct"].value_counts(normalize=True).get("βœ…", 0) * 100
to_f = lambda s: pd.to_numeric(s, errors="coerce")
# Calculate the mean for the NLU F1 scores
bf1_mean = to_f(df["behavior_f1"]).mean() * 100
ef1_mean = to_f(df["emotion_f1"]).mean() * 100
tf1_mean = to_f(df["topic_f1"]).mean() * 100
cf1_mean = to_f(df["context_f1"]).mean() * 100
# Calculate the mean for Faithfulness
faith_mean = to_f(df["faithfulness"]).mean() * 100
# --- CHANGE 6: Calculate the mean for the new metric ---
halluc_mean = to_f(df["hallucination_rate"]).mean() * 100
rag_with_sources_pct = (df["source_count"] > 0).mean() * 100 if "source_count" in df else 0
# Add the NLU metrics to the summary f-string
# Choose to use Hallucination - **RAG: Faithfulness**: {faith_mean:.1f}%
summary_text = f"""## Evaluation Summary
- **Routing Accuracy**: {pct:.2f}%
- **Behaviour F1 (avg)**: {bf1_mean:.2f}%
- **Emotion F1 (avg)**: {ef1_mean:.2f}%
- **Topic F1 (avg)**: {tf1_mean:.2f}%
- **Context F1 (avg)**: {cf1_mean:.2f}%
- **RAG: Context Precision**: {(to_f(df["context_precision"]).mean() * 100):.1f}%
- **RAG: Context Recall**: {(to_f(df["context_recall"]).mean() * 100):.1f}%
- **RAG Answers w/ Sources**: {rag_with_sources_pct:.1f}%
- **RAG: Hallucination Rate**: {halluc_mean:.1f}% (Lower is better)
- **RAG: Answer Correctness (LLM-judge)**: {(to_f(df["answer_correctness"]).mean() * 100):.1f}%
- **RAG: Avg Latency (ms)**: {to_f(df["latency_ms"]).mean():.1f}
"""
# --- END OF MODIFICATION ---
print(summary_text)
df_display = df.rename(columns={"context_precision": "Ctx. Precision", "context_recall": "Ctx. Recall"})
table_rows = df_display.values.tolist()
headers = df_display.columns.tolist()
# --- NEW: per-category averages ---
try:
cat_means = df.groupby("category")["answer_correctness"].mean().reset_index()
print("\nπŸ“Š Correctness by Category:")
print(cat_means.to_string(index=False))
except Exception as e:
print(f"WARNING: Could not compute category breakdown: {e}")
# --- NEW: confusion-style matrix ---
try:
confusion = pd.crosstab(df.get("category", []), df.get("error_class", []),
rownames=["Category"], colnames=["Error Class"], dropna=False)
print("\nπŸ“Š Error Class Distribution by Category:")
print(confusion.to_string())
except Exception as e:
print(f"WARNING: Could not build confusion matrix: {e}")
# END
else:
summary_text = "No valid test fixtures found to evaluate."
table_rows, headers = [], []
return summary_text, table_rows, headers
# return summary_text, table_rows
## END