File size: 7,790 Bytes
4046334 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
#!/usr/bin/env python3
"""
Benchmark Data Reader for LLM Inference Performance Dashboard
This module provides functionality to read benchmark result files and convert them
into a flattened Polars DataFrame for analysis and visualization.
"""
import json
import polars as pl
from pathlib import Path
from typing import List, Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class BenchmarkDataReader:
"""Reader for benchmark result JSON files that flattens data into a Polars DataFrame."""
def __init__(self, benchmark_dir: str = "benchmark_results"):
"""
Initialize the benchmark data reader.
Args:
benchmark_dir: Directory containing benchmark result files
"""
self.benchmark_dir = Path(benchmark_dir)
def read_benchmark_files(self) -> pl.DataFrame:
"""
Read all benchmark files and return a flattened Polars DataFrame.
Returns:
Polars DataFrame where each row represents a benchmark scenario with all metrics
"""
all_records = []
# Find all individual model benchmark files (exclude summary files)
benchmark_files = list(self.benchmark_dir.rglob("*_benchmark_*.json"))
benchmark_files = [f for f in benchmark_files if "summary" not in f.name]
logger.info(f"Found {len(benchmark_files)} benchmark files")
for file_path in benchmark_files:
try:
records = self._process_benchmark_file(file_path)
all_records.extend(records)
logger.debug(f"Processed {len(records)} scenarios from {file_path}")
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
continue
if not all_records:
logger.warning("No benchmark data found")
return pl.DataFrame()
# Create DataFrame from all records
df = pl.DataFrame(all_records)
logger.info(f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns")
return df
def _process_benchmark_file(self, file_path: Path) -> List[Dict[str, Any]]:
"""
Process a single benchmark file and extract all scenarios.
Args:
file_path: Path to the benchmark JSON file
Returns:
List of flattened records, one per benchmark scenario
"""
with open(file_path, 'r') as f:
data = json.load(f)
records = []
model_name = data.get("model_name", "unknown")
for scenario in data.get("benchmark_scenarios", []):
record = self._flatten_scenario(scenario, model_name, file_path)
records.append(record)
return records
def _flatten_scenario(self, scenario: Dict[str, Any], model_name: str, file_path: Path) -> Dict[str, Any]:
"""
Flatten a single benchmark scenario into a flat record.
Args:
scenario: Scenario data from benchmark file
model_name: Name of the model being benchmarked
file_path: Path to the original file
Returns:
Flattened dictionary with all metrics and metadata
"""
record = {
# File metadata
"file_path": str(file_path),
"model_name": model_name,
# Scenario metadata
"scenario_name": scenario.get("scenario_name", "unknown"),
}
# Add metadata fields
metadata = scenario.get("metadata", {})
record.update({
"timestamp": metadata.get("timestamp"),
"commit_id": metadata.get("commit_id"),
})
# Add hardware info
hw_info = metadata.get("hardware_info", {})
record.update({
"gpu_name": hw_info.get("gpu_name"),
"gpu_memory_total_mb": hw_info.get("gpu_memory_total_mb"),
"cpu_count": hw_info.get("cpu_count"),
"memory_total_mb": hw_info.get("memory_total_mb"),
"python_version": hw_info.get("python_version"),
"torch_version": hw_info.get("torch_version"),
"cuda_version": hw_info.get("cuda_version"),
})
# Add config info
config = metadata.get("config", {})
record.update({
"config_name": config.get("name"),
"model_id": config.get("model_id"),
"variant": config.get("variant"),
"warmup_iterations": config.get("warmup_iterations"),
"measurement_iterations": config.get("measurement_iterations"),
"num_tokens_to_generate": config.get("num_tokens_to_generate"),
"device": config.get("device"),
"torch_dtype": config.get("torch_dtype"),
"compile_mode": config.get("compile_mode"),
"use_cache": config.get("use_cache"),
"batch_size": config.get("batch_size"),
"sequence_length": config.get("sequence_length"),
"attn_implementation": config.get("attn_implementation"),
"sdpa_backend": config.get("sdpa_backend"),
})
# Add measurement statistics for each metric
measurements = scenario.get("measurements", {})
for metric_name, metric_data in measurements.items():
if isinstance(metric_data, dict):
# Add statistics for this metric
for stat_name, stat_value in metric_data.items():
if stat_name != "measurements": # Skip raw measurements array
record[f"{metric_name}_{stat_name}"] = stat_value
# Add GPU metrics
gpu_metrics = scenario.get("gpu_metrics", {})
for gpu_metric, value in gpu_metrics.items():
record[f"gpu_{gpu_metric}"] = value
return record
def get_summary_statistics(self, df: pl.DataFrame) -> Dict[str, Any]:
"""
Generate summary statistics from the benchmark DataFrame.
Args:
df: Benchmark DataFrame
Returns:
Dictionary with summary statistics
"""
if df.is_empty():
return {}
return {
"total_scenarios": len(df),
"unique_models": df["model_name"].n_unique(),
"unique_scenarios": df["scenario_name"].n_unique(),
"unique_hardware": df["gpu_name"].n_unique(),
"date_range": {
"earliest": df["timestamp"].min(),
"latest": df["timestamp"].max(),
},
"performance_metrics": {
"avg_latency_seconds": df.select(pl.col("latency_seconds_mean").mean()).item(),
"avg_tokens_per_second": df.select(pl.col("tokens_per_second_mean").mean()).item(),
"avg_time_to_first_token": df.select(pl.col("time_to_first_token_seconds_mean").mean()).item(),
} if "latency_seconds_mean" in df.columns else None
}
def main():
"""Example usage of the BenchmarkDataReader."""
logging.basicConfig(level=logging.INFO)
# Create reader and load data
reader = BenchmarkDataReader()
df = reader.read_benchmark_files()
if df.is_empty():
print("No benchmark data found!")
return
# Display basic info
print(f"\nLoaded benchmark data: {len(df)} scenarios")
print(f"Columns: {len(df.columns)}")
print("\nColumn names:")
for col in sorted(df.columns):
print(f" - {col}")
# Show summary statistics
summary = reader.get_summary_statistics(df)
print(f"\nSummary Statistics:")
for key, value in summary.items():
print(f" {key}: {value}")
# Show sample data
print(f"\nSample data (first 3 rows):")
print(df.head(3))
return df
if __name__ == "__main__":
df = main() |