inference-performance-dashboard / benchmark_data_reader.py
Ákos Hadnagy
WIP version
4046334
raw
history blame
7.79 kB
#!/usr/bin/env python3
"""
Benchmark Data Reader for LLM Inference Performance Dashboard
This module provides functionality to read benchmark result files and convert them
into a flattened Polars DataFrame for analysis and visualization.
"""
import json
import polars as pl
from pathlib import Path
from typing import List, Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class BenchmarkDataReader:
"""Reader for benchmark result JSON files that flattens data into a Polars DataFrame."""
def __init__(self, benchmark_dir: str = "benchmark_results"):
"""
Initialize the benchmark data reader.
Args:
benchmark_dir: Directory containing benchmark result files
"""
self.benchmark_dir = Path(benchmark_dir)
def read_benchmark_files(self) -> pl.DataFrame:
"""
Read all benchmark files and return a flattened Polars DataFrame.
Returns:
Polars DataFrame where each row represents a benchmark scenario with all metrics
"""
all_records = []
# Find all individual model benchmark files (exclude summary files)
benchmark_files = list(self.benchmark_dir.rglob("*_benchmark_*.json"))
benchmark_files = [f for f in benchmark_files if "summary" not in f.name]
logger.info(f"Found {len(benchmark_files)} benchmark files")
for file_path in benchmark_files:
try:
records = self._process_benchmark_file(file_path)
all_records.extend(records)
logger.debug(f"Processed {len(records)} scenarios from {file_path}")
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
continue
if not all_records:
logger.warning("No benchmark data found")
return pl.DataFrame()
# Create DataFrame from all records
df = pl.DataFrame(all_records)
logger.info(f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns")
return df
def _process_benchmark_file(self, file_path: Path) -> List[Dict[str, Any]]:
"""
Process a single benchmark file and extract all scenarios.
Args:
file_path: Path to the benchmark JSON file
Returns:
List of flattened records, one per benchmark scenario
"""
with open(file_path, 'r') as f:
data = json.load(f)
records = []
model_name = data.get("model_name", "unknown")
for scenario in data.get("benchmark_scenarios", []):
record = self._flatten_scenario(scenario, model_name, file_path)
records.append(record)
return records
def _flatten_scenario(self, scenario: Dict[str, Any], model_name: str, file_path: Path) -> Dict[str, Any]:
"""
Flatten a single benchmark scenario into a flat record.
Args:
scenario: Scenario data from benchmark file
model_name: Name of the model being benchmarked
file_path: Path to the original file
Returns:
Flattened dictionary with all metrics and metadata
"""
record = {
# File metadata
"file_path": str(file_path),
"model_name": model_name,
# Scenario metadata
"scenario_name": scenario.get("scenario_name", "unknown"),
}
# Add metadata fields
metadata = scenario.get("metadata", {})
record.update({
"timestamp": metadata.get("timestamp"),
"commit_id": metadata.get("commit_id"),
})
# Add hardware info
hw_info = metadata.get("hardware_info", {})
record.update({
"gpu_name": hw_info.get("gpu_name"),
"gpu_memory_total_mb": hw_info.get("gpu_memory_total_mb"),
"cpu_count": hw_info.get("cpu_count"),
"memory_total_mb": hw_info.get("memory_total_mb"),
"python_version": hw_info.get("python_version"),
"torch_version": hw_info.get("torch_version"),
"cuda_version": hw_info.get("cuda_version"),
})
# Add config info
config = metadata.get("config", {})
record.update({
"config_name": config.get("name"),
"model_id": config.get("model_id"),
"variant": config.get("variant"),
"warmup_iterations": config.get("warmup_iterations"),
"measurement_iterations": config.get("measurement_iterations"),
"num_tokens_to_generate": config.get("num_tokens_to_generate"),
"device": config.get("device"),
"torch_dtype": config.get("torch_dtype"),
"compile_mode": config.get("compile_mode"),
"use_cache": config.get("use_cache"),
"batch_size": config.get("batch_size"),
"sequence_length": config.get("sequence_length"),
"attn_implementation": config.get("attn_implementation"),
"sdpa_backend": config.get("sdpa_backend"),
})
# Add measurement statistics for each metric
measurements = scenario.get("measurements", {})
for metric_name, metric_data in measurements.items():
if isinstance(metric_data, dict):
# Add statistics for this metric
for stat_name, stat_value in metric_data.items():
if stat_name != "measurements": # Skip raw measurements array
record[f"{metric_name}_{stat_name}"] = stat_value
# Add GPU metrics
gpu_metrics = scenario.get("gpu_metrics", {})
for gpu_metric, value in gpu_metrics.items():
record[f"gpu_{gpu_metric}"] = value
return record
def get_summary_statistics(self, df: pl.DataFrame) -> Dict[str, Any]:
"""
Generate summary statistics from the benchmark DataFrame.
Args:
df: Benchmark DataFrame
Returns:
Dictionary with summary statistics
"""
if df.is_empty():
return {}
return {
"total_scenarios": len(df),
"unique_models": df["model_name"].n_unique(),
"unique_scenarios": df["scenario_name"].n_unique(),
"unique_hardware": df["gpu_name"].n_unique(),
"date_range": {
"earliest": df["timestamp"].min(),
"latest": df["timestamp"].max(),
},
"performance_metrics": {
"avg_latency_seconds": df.select(pl.col("latency_seconds_mean").mean()).item(),
"avg_tokens_per_second": df.select(pl.col("tokens_per_second_mean").mean()).item(),
"avg_time_to_first_token": df.select(pl.col("time_to_first_token_seconds_mean").mean()).item(),
} if "latency_seconds_mean" in df.columns else None
}
def main():
"""Example usage of the BenchmarkDataReader."""
logging.basicConfig(level=logging.INFO)
# Create reader and load data
reader = BenchmarkDataReader()
df = reader.read_benchmark_files()
if df.is_empty():
print("No benchmark data found!")
return
# Display basic info
print(f"\nLoaded benchmark data: {len(df)} scenarios")
print(f"Columns: {len(df.columns)}")
print("\nColumn names:")
for col in sorted(df.columns):
print(f" - {col}")
# Show summary statistics
summary = reader.get_summary_statistics(df)
print(f"\nSummary Statistics:")
for key, value in summary.items():
print(f" {key}: {value}")
# Show sample data
print(f"\nSample data (first 3 rows):")
print(df.head(3))
return df
if __name__ == "__main__":
df = main()