""" Weave Test Result Aggregation Utility This module handles the aggregation and logging of test results to Weave/W&B for the MCP Server test suite. It's designed to work with pytest-xdist by ensuring that Weave evaluation logging happens only once across multiple worker processes. The main entry point is `aggregate_and_log_test_results()` which should be called from the master pytest process after all tests have completed. """ import json import logging import os from collections import defaultdict from pathlib import Path from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # Try to import Weave dependencies try: import weave from weave import EvaluationLogger from weave.trace.context.weave_client_context import WeaveInitError WEAVE_AVAILABLE = True except ImportError: weave = None EvaluationLogger = None WeaveInitError = Exception WEAVE_AVAILABLE = False logger.warning("Weave SDK not available. Weave evaluation logging will be skipped.") class WeaveTestAggregator: """Handles aggregation and logging of test results to Weave.""" def __init__(self, entity: str, project: str, results_dir_name: str = "weave_eval_results_json"): self.entity = entity self.project = project self.results_dir_name = results_dir_name self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") def _initialize_weave(self, invocation_id: str) -> bool: """Initialize Weave connection. Returns True if successful.""" if not WEAVE_AVAILABLE: self.logger.warning(f"(ID: {invocation_id}) Weave SDK not available.") return False if not self.entity or not self.project: self.logger.warning(f"(ID: {invocation_id}) Entity or project not set.") return False try: self.logger.info(f"(ID: {invocation_id}) Initializing Weave: {self.entity}/{self.project}") weave.init(f"{self.entity}/{self.project}") self.logger.info(f"(ID: {invocation_id}) Weave initialized successfully.") return True except WeaveInitError as e: self.logger.error(f"(ID: {invocation_id}) WeaveInitError: {e}", exc_info=True) return False except Exception as e: self.logger.error(f"(ID: {invocation_id}) Error initializing Weave: {e}", exc_info=True) return False def _discover_json_files(self, base_tmp_dir: Path, invocation_id: str) -> List[Path]: """Discover JSON result files in the temporary directory structure.""" json_files = [] try: self.logger.info(f"(ID: {invocation_id}) Searching base directory: {base_tmp_dir}") for item in base_tmp_dir.iterdir(): if item.is_dir(): # Check for results directory within subdirectories (xdist workers) target_results_dir = item / self.results_dir_name if target_results_dir.is_dir(): self.logger.info(f"(ID: {invocation_id}) Found results dir: {target_results_dir}") json_files.extend(list(target_results_dir.glob("*.json"))) # Check if the item itself is the results directory (non-xdist) elif item.name == self.results_dir_name and item.is_dir(): self.logger.info(f"(ID: {invocation_id}) Found non-xdist results dir: {item}") json_files.extend(list(item.glob("*.json"))) # Deduplicate and sort json_files = sorted(list(set(json_files))) self.logger.info(f"(ID: {invocation_id}) Found {len(json_files)} JSON files total.") except Exception as e: self.logger.error(f"(ID: {invocation_id}) Error discovering JSON files: {e}", exc_info=True) return json_files def _load_test_data(self, json_files: List[Path], invocation_id: str) -> List[Dict]: """Load and parse test data from JSON files.""" all_test_data = [] for json_file in json_files: try: with open(json_file, "r") as f: data = json.load(f) all_test_data.append(data) except Exception as e: self.logger.error(f"(ID: {invocation_id}) Error reading {json_file}: {e}", exc_info=True) return all_test_data def _group_test_data_by_source(self, test_data: List[Dict]) -> Dict[str, List[Dict]]: """Group test data by source test file name.""" grouped_data = defaultdict(list) for item in test_data: source_file = item.get("metadata", {}).get("source_test_file_name", "unknown_source_file") grouped_data[source_file].append(item) return dict(grouped_data) def _create_eval_names(self, source_file: str, metadata: Dict) -> Tuple[str, str]: """Create evaluation and dataset names from source file and metadata.""" git_commit = metadata.get("git_commit_id", "unknown_git_commit") sanitized_source = source_file.replace("_", "-") eval_name = f"mcp-eval_{sanitized_source}_{git_commit}" dataset_name = f"{sanitized_source}_tests" return eval_name, dataset_name def _log_test_group_to_weave(self, source_file: str, test_data: List[Dict], invocation_id: str) -> bool: """Log a group of tests from the same source file to Weave.""" if not test_data: self.logger.warning(f"(ID: {invocation_id}) No test data for '{source_file}'") return False # Create evaluation logger first_metadata = test_data[0].get("metadata", {}) eval_name, dataset_name = self._create_eval_names(source_file, first_metadata) git_commit = first_metadata.get("git_commit_id", "unknown_git_commit") self.logger.info(f"(ID: {invocation_id}) Logging {len(test_data)} tests from '{source_file}' as '{eval_name}'") try: eval_logger = EvaluationLogger( name=eval_name, model=git_commit, dataset=dataset_name, ) except Exception as e: self.logger.error(f"(ID: {invocation_id}) Failed to create EvaluationLogger for '{source_file}': {e}", exc_info=True) return False # Log individual test predictions total_logged = 0 passed_logged = 0 all_latencies = [] for test_item in test_data: try: metadata = test_item.get("metadata", {}) inputs = dict(test_item.get("inputs", {})) output = test_item.get("output", {}) score_value = test_item.get("score", False) metrics = test_item.get("metrics", {}) # Enrich inputs with metadata if "test_case_index" in metadata: inputs["_test_case_index"] = metadata["test_case_index"] if "sample_name" in metadata: inputs["_sample_name"] = metadata["sample_name"] inputs["_source_test_file_name"] = metadata.get("source_test_file_name", source_file) inputs["_original_test_query_text"] = metadata.get("test_query_text", "N/A") # Log prediction score_logger = eval_logger.log_prediction(inputs=inputs, output=output) score_logger.log_score(scorer="test_passed", score=bool(score_value)) # Log execution latency if available execution_latency = metrics.get("execution_latency_seconds") if execution_latency is not None: score_logger.log_score(scorer="execution_latency_seconds", score=float(execution_latency)) all_latencies.append(float(execution_latency)) score_logger.finish() total_logged += 1 if score_value: passed_logged += 1 except Exception as e: test_id = metadata.get("test_case_index", metadata.get("sample_name", "unknown")) self.logger.error(f"(ID: {invocation_id}) Error logging test '{test_id}': {e}", exc_info=True) # Log summary metrics if total_logged > 0: summary_metrics = { "count_tests_logged": total_logged, "pass_rate": passed_logged / total_logged if total_logged else 0, } if all_latencies: summary_metrics.update({ "avg_execution_latency_s": sum(all_latencies) / len(all_latencies), "min_execution_latency_s": min(all_latencies), "max_execution_latency_s": max(all_latencies), "total_execution_latency_s": sum(all_latencies), }) try: eval_logger.log_summary(summary_metrics) self.logger.info(f"(ID: {invocation_id}) Successfully logged summary for '{eval_name}': {summary_metrics}") return True except Exception as e: self.logger.error(f"(ID: {invocation_id}) Failed to log summary for '{eval_name}': {e}", exc_info=True) else: self.logger.info(f"(ID: {invocation_id}) No tests logged for '{eval_name}'") return False def aggregate_and_log_results(self, base_tmp_dir: Path, invocation_id: str, session_config: Optional[object] = None) -> bool: """ Main entry point for aggregating and logging test results to Weave. Args: base_tmp_dir: Base temporary directory containing test result files invocation_id: Unique identifier for this aggregation run session_config: Optional pytest session config for additional metadata Returns: True if aggregation was successful, False otherwise """ self.logger.info(f"(ID: {invocation_id}) Starting Weave test result aggregation") # Initialize Weave if not self._initialize_weave(invocation_id): return False # Discover JSON result files json_files = self._discover_json_files(base_tmp_dir, invocation_id) if not json_files: self.logger.info(f"(ID: {invocation_id}) No JSON result files found") return False # Load test data all_test_data = self._load_test_data(json_files, invocation_id) if not all_test_data: self.logger.info(f"(ID: {invocation_id}) No valid test data loaded") return False # Group by source file and log to Weave grouped_data = self._group_test_data_by_source(all_test_data) self.logger.info(f"(ID: {invocation_id}) Processing {len(grouped_data)} test file groups") success_count = 0 for source_file, file_test_data in grouped_data.items(): if self._log_test_group_to_weave(source_file, file_test_data, invocation_id): success_count += 1 self.logger.info(f"(ID: {invocation_id}) Successfully logged {success_count}/{len(grouped_data)} test groups") return success_count > 0 def aggregate_and_log_test_results(entity: str, project: str, base_tmp_dir: Path, invocation_id: str, session_config: Optional[object] = None, results_dir_name: str = "weave_eval_results_json") -> bool: """ Convenience function for aggregating and logging test results to Weave. This is the main entry point that should be called from pytest hooks. """ aggregator = WeaveTestAggregator(entity, project, results_dir_name) return aggregator.aggregate_and_log_results(base_tmp_dir, invocation_id, session_config)