File size: 7,790 Bytes
4046334
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#!/usr/bin/env python3
"""
Benchmark Data Reader for LLM Inference Performance Dashboard

This module provides functionality to read benchmark result files and convert them
into a flattened Polars DataFrame for analysis and visualization.
"""

import json
import polars as pl
from pathlib import Path
from typing import List, Dict, Any, Optional
import logging

logger = logging.getLogger(__name__)


class BenchmarkDataReader:
    """Reader for benchmark result JSON files that flattens data into a Polars DataFrame."""

    def __init__(self, benchmark_dir: str = "benchmark_results"):
        """
        Initialize the benchmark data reader.

        Args:
            benchmark_dir: Directory containing benchmark result files
        """
        self.benchmark_dir = Path(benchmark_dir)

    def read_benchmark_files(self) -> pl.DataFrame:
        """
        Read all benchmark files and return a flattened Polars DataFrame.

        Returns:
            Polars DataFrame where each row represents a benchmark scenario with all metrics
        """
        all_records = []

        # Find all individual model benchmark files (exclude summary files)
        benchmark_files = list(self.benchmark_dir.rglob("*_benchmark_*.json"))
        benchmark_files = [f for f in benchmark_files if "summary" not in f.name]

        logger.info(f"Found {len(benchmark_files)} benchmark files")

        for file_path in benchmark_files:
            try:
                records = self._process_benchmark_file(file_path)
                all_records.extend(records)
                logger.debug(f"Processed {len(records)} scenarios from {file_path}")
            except Exception as e:
                logger.error(f"Error processing {file_path}: {e}")
                continue

        if not all_records:
            logger.warning("No benchmark data found")
            return pl.DataFrame()

        # Create DataFrame from all records
        df = pl.DataFrame(all_records)
        logger.info(f"Created DataFrame with {len(df)} rows and {len(df.columns)} columns")

        return df

    def _process_benchmark_file(self, file_path: Path) -> List[Dict[str, Any]]:
        """
        Process a single benchmark file and extract all scenarios.

        Args:
            file_path: Path to the benchmark JSON file

        Returns:
            List of flattened records, one per benchmark scenario
        """
        with open(file_path, 'r') as f:
            data = json.load(f)

        records = []
        model_name = data.get("model_name", "unknown")

        for scenario in data.get("benchmark_scenarios", []):
            record = self._flatten_scenario(scenario, model_name, file_path)
            records.append(record)

        return records

    def _flatten_scenario(self, scenario: Dict[str, Any], model_name: str, file_path: Path) -> Dict[str, Any]:
        """
        Flatten a single benchmark scenario into a flat record.

        Args:
            scenario: Scenario data from benchmark file
            model_name: Name of the model being benchmarked
            file_path: Path to the original file

        Returns:
            Flattened dictionary with all metrics and metadata
        """
        record = {
            # File metadata
            "file_path": str(file_path),
            "model_name": model_name,

            # Scenario metadata
            "scenario_name": scenario.get("scenario_name", "unknown"),
        }

        # Add metadata fields
        metadata = scenario.get("metadata", {})
        record.update({
            "timestamp": metadata.get("timestamp"),
            "commit_id": metadata.get("commit_id"),
        })

        # Add hardware info
        hw_info = metadata.get("hardware_info", {})
        record.update({
            "gpu_name": hw_info.get("gpu_name"),
            "gpu_memory_total_mb": hw_info.get("gpu_memory_total_mb"),
            "cpu_count": hw_info.get("cpu_count"),
            "memory_total_mb": hw_info.get("memory_total_mb"),
            "python_version": hw_info.get("python_version"),
            "torch_version": hw_info.get("torch_version"),
            "cuda_version": hw_info.get("cuda_version"),
        })

        # Add config info
        config = metadata.get("config", {})
        record.update({
            "config_name": config.get("name"),
            "model_id": config.get("model_id"),
            "variant": config.get("variant"),
            "warmup_iterations": config.get("warmup_iterations"),
            "measurement_iterations": config.get("measurement_iterations"),
            "num_tokens_to_generate": config.get("num_tokens_to_generate"),
            "device": config.get("device"),
            "torch_dtype": config.get("torch_dtype"),
            "compile_mode": config.get("compile_mode"),
            "use_cache": config.get("use_cache"),
            "batch_size": config.get("batch_size"),
            "sequence_length": config.get("sequence_length"),
            "attn_implementation": config.get("attn_implementation"),
            "sdpa_backend": config.get("sdpa_backend"),
        })

        # Add measurement statistics for each metric
        measurements = scenario.get("measurements", {})
        for metric_name, metric_data in measurements.items():
            if isinstance(metric_data, dict):
                # Add statistics for this metric
                for stat_name, stat_value in metric_data.items():
                    if stat_name != "measurements":  # Skip raw measurements array
                        record[f"{metric_name}_{stat_name}"] = stat_value

        # Add GPU metrics
        gpu_metrics = scenario.get("gpu_metrics", {})
        for gpu_metric, value in gpu_metrics.items():
            record[f"gpu_{gpu_metric}"] = value

        return record

    def get_summary_statistics(self, df: pl.DataFrame) -> Dict[str, Any]:
        """
        Generate summary statistics from the benchmark DataFrame.

        Args:
            df: Benchmark DataFrame

        Returns:
            Dictionary with summary statistics
        """
        if df.is_empty():
            return {}

        return {
            "total_scenarios": len(df),
            "unique_models": df["model_name"].n_unique(),
            "unique_scenarios": df["scenario_name"].n_unique(),
            "unique_hardware": df["gpu_name"].n_unique(),
            "date_range": {
                "earliest": df["timestamp"].min(),
                "latest": df["timestamp"].max(),
            },
            "performance_metrics": {
                "avg_latency_seconds": df.select(pl.col("latency_seconds_mean").mean()).item(),
                "avg_tokens_per_second": df.select(pl.col("tokens_per_second_mean").mean()).item(),
                "avg_time_to_first_token": df.select(pl.col("time_to_first_token_seconds_mean").mean()).item(),
            } if "latency_seconds_mean" in df.columns else None
        }


def main():
    """Example usage of the BenchmarkDataReader."""
    logging.basicConfig(level=logging.INFO)

    # Create reader and load data
    reader = BenchmarkDataReader()
    df = reader.read_benchmark_files()

    if df.is_empty():
        print("No benchmark data found!")
        return

    # Display basic info
    print(f"\nLoaded benchmark data: {len(df)} scenarios")
    print(f"Columns: {len(df.columns)}")
    print("\nColumn names:")
    for col in sorted(df.columns):
        print(f"  - {col}")

    # Show summary statistics
    summary = reader.get_summary_statistics(df)
    print(f"\nSummary Statistics:")
    for key, value in summary.items():
        print(f"  {key}: {value}")

    # Show sample data
    print(f"\nSample data (first 3 rows):")
    print(df.head(3))

    return df


if __name__ == "__main__":
    df = main()