import pandas as pd from typing import Dict, Any, List, Optional from comparison import AnswerComparator import phoenix as px from phoenix.trace import SpanEvaluations class GAIAPhoenixEvaluator: """Phoenix evaluator for GAIA dataset ground truth comparison.""" def __init__(self, metadata_path: str = "data/metadata.jsonl"): self.comparator = AnswerComparator(metadata_path) self.eval_name = "gaia_ground_truth" def evaluate_spans(self, spans_df: pd.DataFrame) -> List[SpanEvaluations]: """Evaluate spans and return Phoenix SpanEvaluations.""" evaluations = [] for _, span in spans_df.iterrows(): # Extract task_id and answer from span task_id = self._extract_task_id(span) predicted_answer = self._extract_predicted_answer(span) span_id = span.get("context.span_id") if task_id and predicted_answer is not None and span_id: evaluation = self.comparator.evaluate_answer(task_id, predicted_answer) # Create evaluation record for Phoenix eval_record = { "span_id": span_id, "score": 1.0 if evaluation["exact_match"] else evaluation["similarity_score"], "label": "correct" if evaluation["exact_match"] else "incorrect", "explanation": self._create_explanation(evaluation), "task_id": task_id, "predicted_answer": evaluation["predicted_answer"], "ground_truth": evaluation["actual_answer"], "exact_match": evaluation["exact_match"], "similarity_score": evaluation["similarity_score"], "contains_answer": evaluation["contains_answer"] } evaluations.append(eval_record) if evaluations: # Create SpanEvaluations object eval_df = pd.DataFrame(evaluations) return [SpanEvaluations(eval_name=self.eval_name, dataframe=eval_df)] return [] def _extract_task_id(self, span) -> Optional[str]: """Extract task_id from span data.""" # Try span attributes first attributes = span.get("attributes", {}) if isinstance(attributes, dict): if "task_id" in attributes: return attributes["task_id"] # Try input data input_data = span.get("input", {}) if isinstance(input_data, dict): if "task_id" in input_data: return input_data["task_id"] # Try to extract from input value if it's a string input_value = span.get("input.value", "") if isinstance(input_value, str): # Look for UUID pattern in input import re uuid_pattern = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' match = re.search(uuid_pattern, input_value) if match: return match.group(0) # Try span name span_name = span.get("name", "") if isinstance(span_name, str): import re uuid_pattern = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' match = re.search(uuid_pattern, span_name) if match: return match.group(0) return None def _extract_predicted_answer(self, span) -> Optional[str]: """Extract predicted answer from span output.""" # Try different output fields output_fields = ["output.value", "output", "response", "result"] for field in output_fields: value = span.get(field) if value is not None: return str(value) return None def _create_explanation(self, evaluation: Dict[str, Any]) -> str: """Create human-readable explanation of the evaluation.""" predicted = evaluation["predicted_answer"] actual = evaluation["actual_answer"] exact_match = evaluation["exact_match"] similarity = evaluation["similarity_score"] contains = evaluation["contains_answer"] if actual is None: return "❓ No ground truth available for comparison" explanation = f"Predicted: '{predicted}' | Ground Truth: '{actual}' | " if exact_match: explanation += "✅ Exact match" elif contains: explanation += f"⚠️ Contains correct answer (similarity: {similarity:.3f})" else: explanation += f"❌ Incorrect (similarity: {similarity:.3f})" return explanation def add_gaia_evaluations_to_phoenix(spans_df: pd.DataFrame, metadata_path: str = "data/metadata.jsonl") -> List[SpanEvaluations]: """Add GAIA evaluation results to Phoenix spans.""" evaluator = GAIAPhoenixEvaluator(metadata_path) return evaluator.evaluate_spans(spans_df) def log_evaluations_to_phoenix(evaluations_df: pd.DataFrame, session_id: Optional[str] = None) -> Optional[pd.DataFrame]: """Log evaluation results directly to Phoenix.""" try: client = px.Client() # Get current spans to match evaluations to span_ids spans_df = client.get_spans_dataframe() if spans_df is None or spans_df.empty: print("No spans found to attach evaluations to") return None # Debug: Show available columns print(f"📊 Available span columns: {list(spans_df.columns)}") # Get possible input/output column names input_columns = [col for col in spans_df.columns if 'input' in col.lower()] output_columns = [col for col in spans_df.columns if 'output' in col.lower()] name_columns = [col for col in spans_df.columns if 'name' in col.lower()] print(f"📊 Input columns found: {input_columns}") print(f"📊 Output columns found: {output_columns}") print(f"📊 Name columns found: {name_columns}") # Create evaluation records for Phoenix evaluation_records = [] spans_with_evals = [] for _, eval_row in evaluations_df.iterrows(): task_id = eval_row["task_id"] matching_spans = pd.DataFrame() # Try different strategies to find matching spans # Strategy 1: Search in all string columns for task_id for col in spans_df.columns: if spans_df[col].dtype == 'object': # String-like columns try: matches = spans_df[ spans_df[col].astype(str).str.contains(task_id, na=False, case=False) ] if len(matches) > 0: matching_spans = matches print(f"✅ Found match for {task_id} in column '{col}'") break except Exception as e: continue # Strategy 2: If no matches found, try searching in input columns specifically if len(matching_spans) == 0 and input_columns: for input_col in input_columns: try: matches = spans_df[ spans_df[input_col].astype(str).str.contains(task_id, na=False, case=False) ] if len(matches) > 0: matching_spans = matches print(f"✅ Found match for {task_id} in input column '{input_col}'") break except Exception as e: continue # Strategy 3: If still no matches, try with partial task_id (last 8 characters) if len(matching_spans) == 0: short_task_id = task_id[-8:] if len(task_id) > 8 else task_id for col in spans_df.columns: if spans_df[col].dtype == 'object': try: matches = spans_df[ spans_df[col].astype(str).str.contains(short_task_id, na=False, case=False) ] if len(matches) > 0: matching_spans = matches print(f"✅ Found match for {task_id} using short ID in column '{col}'") break except Exception as e: continue if len(matching_spans) > 0: span_id = matching_spans.iloc[0].get('context.span_id') or matching_spans.iloc[0].get('span_id') if span_id: # Create evaluation record in Phoenix format evaluation_record = { "span_id": span_id, "name": "gaia_ground_truth", "score": eval_row["similarity_score"], "label": "correct" if bool(eval_row["exact_match"]) else "incorrect", "explanation": f"Predicted: '{eval_row['predicted_answer']}' | Ground Truth: '{eval_row['actual_answer']}' | Similarity: {eval_row['similarity_score']:.3f} | Exact Match: {eval_row['exact_match']}", "annotator_kind": "HUMAN", "metadata": { "task_id": task_id, "exact_match": bool(eval_row["exact_match"]), "similarity_score": float(eval_row["similarity_score"]), "contains_answer": bool(eval_row["contains_answer"]), "predicted_answer": str(eval_row["predicted_answer"]), "ground_truth": str(eval_row["actual_answer"]) } } evaluation_records.append(evaluation_record) spans_with_evals.append(span_id) else: print(f"⚠️ No span_id found for matching span with task {task_id}") else: print(f"⚠️ No matching span found for task {task_id}") if evaluation_records: # Convert to DataFrame for Phoenix eval_df = pd.DataFrame(evaluation_records) # Create SpanEvaluations object span_evaluations = SpanEvaluations( eval_name="gaia_ground_truth", dataframe=eval_df ) # Log evaluations to Phoenix try: # Try the newer Phoenix API px.log_evaluations(span_evaluations) print(f"✅ Successfully logged {len(evaluation_records)} evaluations to Phoenix using px.log_evaluations") except AttributeError: try: # Fallback for older Phoenix versions client.log_evaluations(span_evaluations) print(f"✅ Successfully logged {len(evaluation_records)} evaluations to Phoenix using client.log_evaluations") except Exception as e: print(f"⚠️ Could not log evaluations using either method: {e}") # Still return the DataFrame so we know what would have been logged print("Evaluation records created but not logged to Phoenix") return eval_df else: print("⚠️ No matching spans found for any evaluations") if spans_df is not None: print(f"Available spans: {len(spans_df)}") if len(spans_df) > 0: available_cols = [col for col in spans_df.columns if spans_df[col].dtype == 'object'][:5] print(f"Sample searchable columns: {available_cols}") return None except Exception as e: print(f"❌ Could not log evaluations to Phoenix: {e}") import traceback traceback.print_exc() return None