|
|
import pandas as pd |
|
|
from typing import Dict, Any, List, Optional |
|
|
from comparison import AnswerComparator |
|
|
import phoenix as px |
|
|
from phoenix.trace import SpanEvaluations |
|
|
|
|
|
|
|
|
class GAIAPhoenixEvaluator: |
|
|
"""Phoenix evaluator for GAIA dataset ground truth comparison.""" |
|
|
|
|
|
def __init__(self, metadata_path: str = "data/metadata.jsonl"): |
|
|
self.comparator = AnswerComparator(metadata_path) |
|
|
self.eval_name = "gaia_ground_truth" |
|
|
|
|
|
def evaluate_spans(self, spans_df: pd.DataFrame) -> List[SpanEvaluations]: |
|
|
"""Evaluate spans and return Phoenix SpanEvaluations.""" |
|
|
evaluations = [] |
|
|
|
|
|
for _, span in spans_df.iterrows(): |
|
|
|
|
|
task_id = self._extract_task_id(span) |
|
|
predicted_answer = self._extract_predicted_answer(span) |
|
|
span_id = span.get("context.span_id") |
|
|
|
|
|
if task_id and predicted_answer is not None and span_id: |
|
|
evaluation = self.comparator.evaluate_answer(task_id, predicted_answer) |
|
|
|
|
|
|
|
|
eval_record = { |
|
|
"span_id": span_id, |
|
|
"score": 1.0 if evaluation["exact_match"] else evaluation["similarity_score"], |
|
|
"label": "correct" if evaluation["exact_match"] else "incorrect", |
|
|
"explanation": self._create_explanation(evaluation), |
|
|
"task_id": task_id, |
|
|
"predicted_answer": evaluation["predicted_answer"], |
|
|
"ground_truth": evaluation["actual_answer"], |
|
|
"exact_match": evaluation["exact_match"], |
|
|
"similarity_score": evaluation["similarity_score"], |
|
|
"contains_answer": evaluation["contains_answer"] |
|
|
} |
|
|
|
|
|
evaluations.append(eval_record) |
|
|
|
|
|
if evaluations: |
|
|
|
|
|
eval_df = pd.DataFrame(evaluations) |
|
|
return [SpanEvaluations(eval_name=self.eval_name, dataframe=eval_df)] |
|
|
|
|
|
return [] |
|
|
|
|
|
def _extract_task_id(self, span) -> Optional[str]: |
|
|
"""Extract task_id from span data.""" |
|
|
|
|
|
attributes = span.get("attributes", {}) |
|
|
if isinstance(attributes, dict): |
|
|
if "task_id" in attributes: |
|
|
return attributes["task_id"] |
|
|
|
|
|
|
|
|
input_data = span.get("input", {}) |
|
|
if isinstance(input_data, dict): |
|
|
if "task_id" in input_data: |
|
|
return input_data["task_id"] |
|
|
|
|
|
|
|
|
input_value = span.get("input.value", "") |
|
|
if isinstance(input_value, str): |
|
|
|
|
|
import re |
|
|
uuid_pattern = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' |
|
|
match = re.search(uuid_pattern, input_value) |
|
|
if match: |
|
|
return match.group(0) |
|
|
|
|
|
|
|
|
span_name = span.get("name", "") |
|
|
if isinstance(span_name, str): |
|
|
import re |
|
|
uuid_pattern = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' |
|
|
match = re.search(uuid_pattern, span_name) |
|
|
if match: |
|
|
return match.group(0) |
|
|
|
|
|
return None |
|
|
|
|
|
def _extract_predicted_answer(self, span) -> Optional[str]: |
|
|
"""Extract predicted answer from span output.""" |
|
|
|
|
|
output_fields = ["output.value", "output", "response", "result"] |
|
|
|
|
|
for field in output_fields: |
|
|
value = span.get(field) |
|
|
if value is not None: |
|
|
return str(value) |
|
|
|
|
|
return None |
|
|
|
|
|
def _create_explanation(self, evaluation: Dict[str, Any]) -> str: |
|
|
"""Create human-readable explanation of the evaluation.""" |
|
|
predicted = evaluation["predicted_answer"] |
|
|
actual = evaluation["actual_answer"] |
|
|
exact_match = evaluation["exact_match"] |
|
|
similarity = evaluation["similarity_score"] |
|
|
contains = evaluation["contains_answer"] |
|
|
|
|
|
if actual is None: |
|
|
return "❓ No ground truth available for comparison" |
|
|
|
|
|
explanation = f"Predicted: '{predicted}' | Ground Truth: '{actual}' | " |
|
|
|
|
|
if exact_match: |
|
|
explanation += "✅ Exact match" |
|
|
elif contains: |
|
|
explanation += f"⚠️ Contains correct answer (similarity: {similarity:.3f})" |
|
|
else: |
|
|
explanation += f"❌ Incorrect (similarity: {similarity:.3f})" |
|
|
|
|
|
return explanation |
|
|
|
|
|
|
|
|
def add_gaia_evaluations_to_phoenix(spans_df: pd.DataFrame, metadata_path: str = "data/metadata.jsonl") -> List[SpanEvaluations]: |
|
|
"""Add GAIA evaluation results to Phoenix spans.""" |
|
|
evaluator = GAIAPhoenixEvaluator(metadata_path) |
|
|
return evaluator.evaluate_spans(spans_df) |
|
|
|
|
|
|
|
|
def log_evaluations_to_phoenix(evaluations_df: pd.DataFrame, session_id: Optional[str] = None) -> Optional[pd.DataFrame]: |
|
|
"""Log evaluation results directly to Phoenix.""" |
|
|
try: |
|
|
client = px.Client() |
|
|
|
|
|
|
|
|
spans_df = client.get_spans_dataframe() |
|
|
|
|
|
if spans_df is None or spans_df.empty: |
|
|
print("No spans found to attach evaluations to") |
|
|
return None |
|
|
|
|
|
|
|
|
evaluation_records = [] |
|
|
spans_with_evals = [] |
|
|
|
|
|
for _, eval_row in evaluations_df.iterrows(): |
|
|
task_id = eval_row["task_id"] |
|
|
|
|
|
|
|
|
matching_spans = spans_df[ |
|
|
spans_df['input.value'].astype(str).str.contains(task_id, na=False, case=False) |
|
|
] |
|
|
|
|
|
if len(matching_spans) == 0: |
|
|
|
|
|
matching_spans = spans_df[ |
|
|
spans_df['name'].astype(str).str.contains(task_id, na=False, case=False) |
|
|
] |
|
|
|
|
|
if len(matching_spans) > 0: |
|
|
span_id = matching_spans.iloc[0]['context.span_id'] |
|
|
|
|
|
|
|
|
evaluation_record = { |
|
|
"span_id": span_id, |
|
|
"name": "gaia_ground_truth", |
|
|
"score": eval_row["similarity_score"], |
|
|
"label": "correct" if bool(eval_row["exact_match"]) else "incorrect", |
|
|
"explanation": f"Predicted: '{eval_row['predicted_answer']}' | Ground Truth: '{eval_row['actual_answer']}' | Similarity: {eval_row['similarity_score']:.3f} | Exact Match: {eval_row['exact_match']}", |
|
|
"annotator_kind": "HUMAN", |
|
|
"metadata": { |
|
|
"task_id": task_id, |
|
|
"exact_match": eval_row["exact_match"], |
|
|
"similarity_score": eval_row["similarity_score"], |
|
|
"contains_answer": eval_row["contains_answer"], |
|
|
"predicted_answer": eval_row["predicted_answer"], |
|
|
"ground_truth": eval_row["actual_answer"] |
|
|
} |
|
|
} |
|
|
|
|
|
evaluation_records.append(evaluation_record) |
|
|
spans_with_evals.append(span_id) |
|
|
|
|
|
if evaluation_records: |
|
|
|
|
|
eval_df = pd.DataFrame(evaluation_records) |
|
|
|
|
|
|
|
|
span_evaluations = SpanEvaluations( |
|
|
eval_name="gaia_ground_truth", |
|
|
dataframe=eval_df |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
px.log_evaluations(span_evaluations) |
|
|
print(f"✅ Successfully logged {len(evaluation_records)} evaluations to Phoenix") |
|
|
except AttributeError: |
|
|
|
|
|
client.log_evaluations(span_evaluations) |
|
|
print(f"✅ Successfully logged {len(evaluation_records)} evaluations to Phoenix (fallback)") |
|
|
|
|
|
return eval_df |
|
|
else: |
|
|
print("⚠️ No matching spans found for evaluations") |
|
|
if spans_df is not None: |
|
|
print(f"Available spans: {len(spans_df)}") |
|
|
if len(spans_df) > 0: |
|
|
print("Sample span names:", spans_df['name'].head(3).tolist()) |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Could not log evaluations to Phoenix: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|