#!/usr/bin/env python3 """ Benchmark Data Reader for LLM Inference Performance Dashboard This module provides functionality to read benchmark result files and convert them into a flattened Polars DataFrame for analysis and visualization. """ import json import polars as pl from pathlib import Path from typing import List, Dict, Any, Optional import logging logger = logging.getLogger(__name__) class BenchmarkDataReader: """Reader for benchmark result JSON files that flattens data into a Polars DataFrame.""" def __init__(self, benchmark_dir: str = "benchmark_results"): """ Initialize the benchmark data reader. Args: benchmark_dir: Directory containing benchmark result files """ self.benchmark_dir = Path(benchmark_dir) def read_benchmark_files(self) -> pl.DataFrame: """ Read all benchmark files and return a flattened Polars DataFrame. Returns: Polars DataFrame where each row represents a benchmark scenario with all metrics """ all_records = [] # Find all individual model benchmark files (exclude summary files) benchmark_files = list(self.benchmark_dir.rglob("*_benchmark_*.json")) benchmark_files = [f for f in benchmark_files if "summary" not in f.name] logger.info(f"Found {len(benchmark_files)} benchmark files") for file_path in benchmark_files: try: records = self._process_benchmark_file(file_path) all_records.extend(records) logger.debug(f"Processed {len(records)} scenarios from {file_path}") except Exception as e: logger.error(f"Error processing {file_path}: {e}") continue if not all_records: logger.warning("No benchmark data found") return pl.DataFrame() # Create DataFrame from all records df = pl.DataFrame(all_records) logger.info(f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns") return df def _process_benchmark_file(self, file_path: Path) -> List[Dict[str, Any]]: """ Process a single benchmark file and extract all scenarios. Args: file_path: Path to the benchmark JSON file Returns: List of flattened records, one per benchmark scenario """ with open(file_path, 'r') as f: data = json.load(f) records = [] model_name = data.get("model_name", "unknown") for scenario in data.get("benchmark_scenarios", []): record = self._flatten_scenario(scenario, model_name, file_path) records.append(record) return records def _flatten_scenario(self, scenario: Dict[str, Any], model_name: str, file_path: Path) -> Dict[str, Any]: """ Flatten a single benchmark scenario into a flat record. Args: scenario: Scenario data from benchmark file model_name: Name of the model being benchmarked file_path: Path to the original file Returns: Flattened dictionary with all metrics and metadata """ record = { # File metadata "file_path": str(file_path), "model_name": model_name, # Scenario metadata "scenario_name": scenario.get("scenario_name", "unknown"), } # Add metadata fields metadata = scenario.get("metadata", {}) record.update({ "timestamp": metadata.get("timestamp"), "commit_id": metadata.get("commit_id"), }) # Add hardware info hw_info = metadata.get("hardware_info", {}) record.update({ "gpu_name": hw_info.get("gpu_name"), "gpu_memory_total_mb": hw_info.get("gpu_memory_total_mb"), "cpu_count": hw_info.get("cpu_count"), "memory_total_mb": hw_info.get("memory_total_mb"), "python_version": hw_info.get("python_version"), "torch_version": hw_info.get("torch_version"), "cuda_version": hw_info.get("cuda_version"), }) # Add config info config = metadata.get("config", {}) record.update({ "config_name": config.get("name"), "model_id": config.get("model_id"), "variant": config.get("variant"), "warmup_iterations": config.get("warmup_iterations"), "measurement_iterations": config.get("measurement_iterations"), "num_tokens_to_generate": config.get("num_tokens_to_generate"), "device": config.get("device"), "torch_dtype": config.get("torch_dtype"), "compile_mode": config.get("compile_mode"), "use_cache": config.get("use_cache"), "batch_size": config.get("batch_size"), "sequence_length": config.get("sequence_length"), "attn_implementation": config.get("attn_implementation"), "sdpa_backend": config.get("sdpa_backend"), }) # Add measurement statistics for each metric measurements = scenario.get("measurements", {}) for metric_name, metric_data in measurements.items(): if isinstance(metric_data, dict): # Add statistics for this metric for stat_name, stat_value in metric_data.items(): if stat_name != "measurements": # Skip raw measurements array record[f"{metric_name}_{stat_name}"] = stat_value # Add GPU metrics gpu_metrics = scenario.get("gpu_metrics", {}) for gpu_metric, value in gpu_metrics.items(): record[f"gpu_{gpu_metric}"] = value return record def get_summary_statistics(self, df: pl.DataFrame) -> Dict[str, Any]: """ Generate summary statistics from the benchmark DataFrame. Args: df: Benchmark DataFrame Returns: Dictionary with summary statistics """ if df.is_empty(): return {} return { "total_scenarios": len(df), "unique_models": df["model_name"].n_unique(), "unique_scenarios": df["scenario_name"].n_unique(), "unique_hardware": df["gpu_name"].n_unique(), "date_range": { "earliest": df["timestamp"].min(), "latest": df["timestamp"].max(), }, "performance_metrics": { "avg_latency_seconds": df.select(pl.col("latency_seconds_mean").mean()).item(), "avg_tokens_per_second": df.select(pl.col("tokens_per_second_mean").mean()).item(), "avg_time_to_first_token": df.select(pl.col("time_to_first_token_seconds_mean").mean()).item(), } if "latency_seconds_mean" in df.columns else None } def main(): """Example usage of the BenchmarkDataReader.""" logging.basicConfig(level=logging.INFO) # Create reader and load data reader = BenchmarkDataReader() df = reader.read_benchmark_files() if df.is_empty(): print("No benchmark data found!") return # Display basic info print(f"\nLoaded benchmark data: {len(df)} scenarios") print(f"Columns: {len(df.columns)}") print("\nColumn names:") for col in sorted(df.columns): print(f" - {col}") # Show summary statistics summary = reader.get_summary_statistics(df) print(f"\nSummary Statistics:") for key, value in summary.items(): print(f" {key}: {value}") # Show sample data print(f"\nSample data (first 3 rows):") print(df.head(3)) return df if __name__ == "__main__": df = main()