|
|
|
|
|
""" |
|
|
Benchmark Data Reader for LLM Inference Performance Dashboard |
|
|
|
|
|
This module provides functionality to read benchmark result files and convert them |
|
|
into a flattened Polars DataFrame for analysis and visualization. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import polars as pl |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Optional |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class BenchmarkDataReader: |
|
|
"""Reader for benchmark result JSON files that flattens data into a Polars DataFrame.""" |
|
|
|
|
|
def __init__(self, benchmark_dir: str = "benchmark_results"): |
|
|
""" |
|
|
Initialize the benchmark data reader. |
|
|
|
|
|
Args: |
|
|
benchmark_dir: Directory containing benchmark result files |
|
|
""" |
|
|
self.benchmark_dir = Path(benchmark_dir) |
|
|
|
|
|
def read_benchmark_files(self) -> pl.DataFrame: |
|
|
""" |
|
|
Read all benchmark files and return a flattened Polars DataFrame. |
|
|
|
|
|
Returns: |
|
|
Polars DataFrame where each row represents a benchmark scenario with all metrics |
|
|
""" |
|
|
all_records = [] |
|
|
|
|
|
|
|
|
benchmark_files = list(self.benchmark_dir.rglob("*_benchmark_*.json")) |
|
|
benchmark_files = [f for f in benchmark_files if "summary" not in f.name] |
|
|
|
|
|
logger.info(f"Found {len(benchmark_files)} benchmark files") |
|
|
|
|
|
for file_path in benchmark_files: |
|
|
try: |
|
|
records = self._process_benchmark_file(file_path) |
|
|
all_records.extend(records) |
|
|
logger.debug(f"Processed {len(records)} scenarios from {file_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing {file_path}: {e}") |
|
|
continue |
|
|
|
|
|
if not all_records: |
|
|
logger.warning("No benchmark data found") |
|
|
return pl.DataFrame() |
|
|
|
|
|
|
|
|
df = pl.DataFrame(all_records) |
|
|
logger.info(f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns") |
|
|
|
|
|
return df |
|
|
|
|
|
def _process_benchmark_file(self, file_path: Path) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Process a single benchmark file and extract all scenarios. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the benchmark JSON file |
|
|
|
|
|
Returns: |
|
|
List of flattened records, one per benchmark scenario |
|
|
""" |
|
|
with open(file_path, 'r') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
records = [] |
|
|
model_name = data.get("model_name", "unknown") |
|
|
|
|
|
for scenario in data.get("benchmark_scenarios", []): |
|
|
record = self._flatten_scenario(scenario, model_name, file_path) |
|
|
records.append(record) |
|
|
|
|
|
return records |
|
|
|
|
|
def _flatten_scenario(self, scenario: Dict[str, Any], model_name: str, file_path: Path) -> Dict[str, Any]: |
|
|
""" |
|
|
Flatten a single benchmark scenario into a flat record. |
|
|
|
|
|
Args: |
|
|
scenario: Scenario data from benchmark file |
|
|
model_name: Name of the model being benchmarked |
|
|
file_path: Path to the original file |
|
|
|
|
|
Returns: |
|
|
Flattened dictionary with all metrics and metadata |
|
|
""" |
|
|
record = { |
|
|
|
|
|
"file_path": str(file_path), |
|
|
"model_name": model_name, |
|
|
|
|
|
|
|
|
"scenario_name": scenario.get("scenario_name", "unknown"), |
|
|
} |
|
|
|
|
|
|
|
|
metadata = scenario.get("metadata", {}) |
|
|
record.update({ |
|
|
"timestamp": metadata.get("timestamp"), |
|
|
"commit_id": metadata.get("commit_id"), |
|
|
}) |
|
|
|
|
|
|
|
|
hw_info = metadata.get("hardware_info", {}) |
|
|
record.update({ |
|
|
"gpu_name": hw_info.get("gpu_name"), |
|
|
"gpu_memory_total_mb": hw_info.get("gpu_memory_total_mb"), |
|
|
"cpu_count": hw_info.get("cpu_count"), |
|
|
"memory_total_mb": hw_info.get("memory_total_mb"), |
|
|
"python_version": hw_info.get("python_version"), |
|
|
"torch_version": hw_info.get("torch_version"), |
|
|
"cuda_version": hw_info.get("cuda_version"), |
|
|
}) |
|
|
|
|
|
|
|
|
config = metadata.get("config", {}) |
|
|
record.update({ |
|
|
"config_name": config.get("name"), |
|
|
"model_id": config.get("model_id"), |
|
|
"variant": config.get("variant"), |
|
|
"warmup_iterations": config.get("warmup_iterations"), |
|
|
"measurement_iterations": config.get("measurement_iterations"), |
|
|
"num_tokens_to_generate": config.get("num_tokens_to_generate"), |
|
|
"device": config.get("device"), |
|
|
"torch_dtype": config.get("torch_dtype"), |
|
|
"compile_mode": config.get("compile_mode"), |
|
|
"use_cache": config.get("use_cache"), |
|
|
"batch_size": config.get("batch_size"), |
|
|
"sequence_length": config.get("sequence_length"), |
|
|
"attn_implementation": config.get("attn_implementation"), |
|
|
"sdpa_backend": config.get("sdpa_backend"), |
|
|
}) |
|
|
|
|
|
|
|
|
measurements = scenario.get("measurements", {}) |
|
|
for metric_name, metric_data in measurements.items(): |
|
|
if isinstance(metric_data, dict): |
|
|
|
|
|
for stat_name, stat_value in metric_data.items(): |
|
|
if stat_name != "measurements": |
|
|
record[f"{metric_name}_{stat_name}"] = stat_value |
|
|
|
|
|
|
|
|
gpu_metrics = scenario.get("gpu_metrics", {}) |
|
|
for gpu_metric, value in gpu_metrics.items(): |
|
|
record[f"gpu_{gpu_metric}"] = value |
|
|
|
|
|
return record |
|
|
|
|
|
def get_summary_statistics(self, df: pl.DataFrame) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate summary statistics from the benchmark DataFrame. |
|
|
|
|
|
Args: |
|
|
df: Benchmark DataFrame |
|
|
|
|
|
Returns: |
|
|
Dictionary with summary statistics |
|
|
""" |
|
|
if df.is_empty(): |
|
|
return {} |
|
|
|
|
|
return { |
|
|
"total_scenarios": len(df), |
|
|
"unique_models": df["model_name"].n_unique(), |
|
|
"unique_scenarios": df["scenario_name"].n_unique(), |
|
|
"unique_hardware": df["gpu_name"].n_unique(), |
|
|
"date_range": { |
|
|
"earliest": df["timestamp"].min(), |
|
|
"latest": df["timestamp"].max(), |
|
|
}, |
|
|
"performance_metrics": { |
|
|
"avg_latency_seconds": df.select(pl.col("latency_seconds_mean").mean()).item(), |
|
|
"avg_tokens_per_second": df.select(pl.col("tokens_per_second_mean").mean()).item(), |
|
|
"avg_time_to_first_token": df.select(pl.col("time_to_first_token_seconds_mean").mean()).item(), |
|
|
} if "latency_seconds_mean" in df.columns else None |
|
|
} |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Example usage of the BenchmarkDataReader.""" |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
|
|
|
reader = BenchmarkDataReader() |
|
|
df = reader.read_benchmark_files() |
|
|
|
|
|
if df.is_empty(): |
|
|
print("No benchmark data found!") |
|
|
return |
|
|
|
|
|
|
|
|
print(f"\nLoaded benchmark data: {len(df)} scenarios") |
|
|
print(f"Columns: {len(df.columns)}") |
|
|
print("\nColumn names:") |
|
|
for col in sorted(df.columns): |
|
|
print(f" - {col}") |
|
|
|
|
|
|
|
|
summary = reader.get_summary_statistics(df) |
|
|
print(f"\nSummary Statistics:") |
|
|
for key, value in summary.items(): |
|
|
print(f" {key}: {value}") |
|
|
|
|
|
|
|
|
print(f"\nSample data (first 3 rows):") |
|
|
print(df.head(3)) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
df = main() |