import pandas as pd from typing import Dict, Any, List, Optional from comparison import AnswerComparator import phoenix as px from phoenix.trace import SpanEvaluations class GAIAPhoenixEvaluator: """Phoenix evaluator for GAIA dataset ground truth comparison.""" def __init__(self, metadata_path: str = "data/metadata.jsonl"): self.comparator = AnswerComparator(metadata_path) self.eval_name = "gaia_ground_truth" def evaluate_spans(self, spans_df: pd.DataFrame) -> List[SpanEvaluations]: """Evaluate spans and return Phoenix SpanEvaluations.""" evaluations = [] for _, span in spans_df.iterrows(): # Extract task_id and answer from span task_id = self._extract_task_id(span) predicted_answer = self._extract_predicted_answer(span) span_id = span.get("context.span_id") if task_id and predicted_answer is not None and span_id: evaluation = self.comparator.evaluate_answer(task_id, predicted_answer) # Create evaluation record for Phoenix eval_record = { "span_id": span_id, "score": 1.0 if evaluation["exact_match"] else evaluation["similarity_score"], "label": "correct" if evaluation["exact_match"] else "incorrect", "explanation": self._create_explanation(evaluation), "task_id": task_id, "predicted_answer": evaluation["predicted_answer"], "ground_truth": evaluation["actual_answer"], "exact_match": evaluation["exact_match"], "similarity_score": evaluation["similarity_score"], "contains_answer": evaluation["contains_answer"] } evaluations.append(eval_record) if evaluations: # Create SpanEvaluations object eval_df = pd.DataFrame(evaluations) return [SpanEvaluations(eval_name=self.eval_name, dataframe=eval_df)] return [] def _extract_task_id(self, span) -> Optional[str]: """Extract task_id from span data.""" # Try span attributes first attributes = span.get("attributes", {}) if isinstance(attributes, dict): if "task_id" in attributes: return attributes["task_id"] # Try input data input_data = span.get("input", {}) if isinstance(input_data, dict): if "task_id" in input_data: return input_data["task_id"] # Try to extract from input value if it's a string input_value = span.get("input.value", "") if isinstance(input_value, str): # Look for UUID pattern in input import re uuid_pattern = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' match = re.search(uuid_pattern, input_value) if match: return match.group(0) # Try span name span_name = span.get("name", "") if isinstance(span_name, str): import re uuid_pattern = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' match = re.search(uuid_pattern, span_name) if match: return match.group(0) return None def _extract_predicted_answer(self, span) -> Optional[str]: """Extract predicted answer from span output.""" # Try different output fields output_fields = ["output.value", "output", "response", "result"] for field in output_fields: value = span.get(field) if value is not None: return str(value) return None def _create_explanation(self, evaluation: Dict[str, Any]) -> str: """Create human-readable explanation of the evaluation.""" predicted = evaluation["predicted_answer"] actual = evaluation["actual_answer"] exact_match = evaluation["exact_match"] similarity = evaluation["similarity_score"] contains = evaluation["contains_answer"] if actual is None: return "❓ No ground truth available for comparison" explanation = f"Predicted: '{predicted}' | Ground Truth: '{actual}' | " if exact_match: explanation += "✅ Exact match" elif contains: explanation += f"⚠️ Contains correct answer (similarity: {similarity:.3f})" else: explanation += f"❌ Incorrect (similarity: {similarity:.3f})" return explanation def add_gaia_evaluations_to_phoenix(spans_df: pd.DataFrame, metadata_path: str = "data/metadata.jsonl") -> List[SpanEvaluations]: """Add GAIA evaluation results to Phoenix spans.""" evaluator = GAIAPhoenixEvaluator(metadata_path) return evaluator.evaluate_spans(spans_df) def log_evaluations_to_phoenix(evaluations_df: pd.DataFrame, session_id: Optional[str] = None) -> Optional[pd.DataFrame]: """Log evaluation results directly to Phoenix.""" try: client = px.Client() # Get current spans to match evaluations to span_ids spans_df = client.get_spans_dataframe() if spans_df is None or spans_df.empty: print("No spans found to attach evaluations to") return None # Create evaluation records for Phoenix evaluation_records = [] spans_with_evals = [] for _, eval_row in evaluations_df.iterrows(): task_id = eval_row["task_id"] # Try to find matching span by searching for task_id in span input matching_spans = spans_df[ spans_df['input.value'].astype(str).str.contains(task_id, na=False, case=False) ] if len(matching_spans) == 0: # Try alternative search in span attributes or name matching_spans = spans_df[ spans_df['name'].astype(str).str.contains(task_id, na=False, case=False) ] if len(matching_spans) > 0: span_id = matching_spans.iloc[0]['context.span_id'] # Create evaluation record in Phoenix format evaluation_record = { "span_id": span_id, "name": "gaia_ground_truth", "score": eval_row["similarity_score"], "label": "correct" if bool(eval_row["exact_match"]) else "incorrect", "explanation": f"Predicted: '{eval_row['predicted_answer']}' | Ground Truth: '{eval_row['actual_answer']}' | Similarity: {eval_row['similarity_score']:.3f} | Exact Match: {eval_row['exact_match']}", "annotator_kind": "HUMAN", "metadata": { "task_id": task_id, "exact_match": eval_row["exact_match"], "similarity_score": eval_row["similarity_score"], "contains_answer": eval_row["contains_answer"], "predicted_answer": eval_row["predicted_answer"], "ground_truth": eval_row["actual_answer"] } } evaluation_records.append(evaluation_record) spans_with_evals.append(span_id) if evaluation_records: # Convert to DataFrame for Phoenix eval_df = pd.DataFrame(evaluation_records) # Create SpanEvaluations object span_evaluations = SpanEvaluations( eval_name="gaia_ground_truth", dataframe=eval_df ) # Log evaluations to Phoenix try: # Try the newer Phoenix API px.log_evaluations(span_evaluations) print(f"✅ Successfully logged {len(evaluation_records)} evaluations to Phoenix") except AttributeError: # Fallback for older Phoenix versions client.log_evaluations(span_evaluations) print(f"✅ Successfully logged {len(evaluation_records)} evaluations to Phoenix (fallback)") return eval_df else: print("⚠️ No matching spans found for evaluations") if spans_df is not None: print(f"Available spans: {len(spans_df)}") if len(spans_df) > 0: print("Sample span names:", spans_df['name'].head(3).tolist()) return None except Exception as e: print(f"❌ Could not log evaluations to Phoenix: {e}") import traceback traceback.print_exc() return None