|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Reusable benchmarking utilities for performance testing.""" |
|
|
import time |
|
|
import numpy as np |
|
|
from contextlib import contextmanager |
|
|
from typing import Callable, Dict, Tuple, Any, Optional |
|
|
import torch |
|
|
|
|
|
def to_dtype(dtype_str: str): |
|
|
"""Convert string to torch dtype.""" |
|
|
if dtype_str == "float16": |
|
|
return torch.float16 |
|
|
if dtype_str == "bfloat16": |
|
|
return torch.bfloat16 |
|
|
return torch.float32 |
|
|
|
|
|
def _sync(device: str): |
|
|
"""Synchronize device if CUDA.""" |
|
|
if device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
def _compute_stats(times_s, tokens: Optional[int] = None) -> Dict[str, float]: |
|
|
"""Compute comprehensive latency and throughput statistics.""" |
|
|
lat_ms = np.array([t * 1000.0 for t in times_s]) |
|
|
lat_ms_sorted = np.sort(lat_ms) |
|
|
n = len(lat_ms) |
|
|
|
|
|
stats = { |
|
|
"avg_ms": np.mean(lat_ms), |
|
|
"min_ms": np.min(lat_ms), |
|
|
"max_ms": np.max(lat_ms), |
|
|
"std_ms": np.std(lat_ms), |
|
|
"p50_ms": np.percentile(lat_ms, 50), |
|
|
"p95_ms": np.percentile(lat_ms, 95), |
|
|
"p99_ms": np.percentile(lat_ms, 99), |
|
|
"num_iters": n |
|
|
} |
|
|
|
|
|
if tokens is not None and n > 0: |
|
|
avg_s = np.mean(times_s) |
|
|
stats["tokens_per_s"] = tokens / avg_s if avg_s > 0 else float("inf") |
|
|
stats["throughput_variance"] = np.std([tokens / t for t in times_s if t > 0]) |
|
|
|
|
|
return stats |
|
|
|
|
|
def _format_timing_stats(stats: Dict[str, float], tokens: Optional[int] = None) -> str: |
|
|
"""Format timing statistics for display.""" |
|
|
lines = [ |
|
|
"\n━━━━━━━━━━━━━━━━━━━━ Benchmark Results ━━━━━━━━━━━━━━━━━━━━", |
|
|
f"Iterations: {stats.get('num_iters', 0)}", |
|
|
"\nLatency Statistics:", |
|
|
f" Average: {stats['avg_ms']:.3f} ms", |
|
|
f" Min: {stats['min_ms']:.3f} ms", |
|
|
f" Max: {stats['max_ms']:.3f} ms", |
|
|
f" Std Dev: {stats['std_ms']:.3f} ms", |
|
|
"\nPercentiles:", |
|
|
f" P50 (median): {stats['p50_ms']:.3f} ms", |
|
|
f" P95: {stats['p95_ms']:.3f} ms", |
|
|
f" P99: {stats['p99_ms']:.3f} ms", |
|
|
] |
|
|
|
|
|
if tokens is not None and 'tokens_per_s' in stats: |
|
|
lines.extend([ |
|
|
"\nThroughput:", |
|
|
f" Tokens/sec: {stats['tokens_per_s']:.1f}", |
|
|
f" Std Dev: {stats.get('throughput_variance', 0):.1f}", |
|
|
]) |
|
|
|
|
|
lines.append("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━") |
|
|
return "\n".join(lines) |
|
|
|
|
|
def _bench_engine( |
|
|
call: Callable[[], Any], *, warmup: int, iters: int, device: str, dtype, input_gen: Callable[[], Any] = None |
|
|
) -> Tuple[Any, list]: |
|
|
"""Core benchmarking engine with warmup and timing.""" |
|
|
use_autocast = device == "cuda" and dtype in (torch.float16, torch.bfloat16) |
|
|
|
|
|
|
|
|
print(f"\nWarming up ({warmup} iterations)...") |
|
|
with torch.inference_mode(): |
|
|
for _ in range(max(0, warmup)): |
|
|
if use_autocast: |
|
|
with torch.autocast(device_type="cuda", dtype=dtype): |
|
|
if input_gen is not None: |
|
|
_ = call(input_gen()) |
|
|
else: |
|
|
_ = call() |
|
|
else: |
|
|
if input_gen is not None: |
|
|
_ = call(input_gen()) |
|
|
else: |
|
|
_ = call() |
|
|
_sync(device) |
|
|
|
|
|
|
|
|
print(f"Benchmarking ({iters} iterations)...") |
|
|
times_s = [] |
|
|
last = None |
|
|
with torch.inference_mode(): |
|
|
for i in range(max(1, iters)): |
|
|
start = time.perf_counter() |
|
|
if use_autocast: |
|
|
with torch.autocast(device_type="cuda", dtype=dtype): |
|
|
if input_gen is not None: |
|
|
last = call(input_gen()) |
|
|
else: |
|
|
last = call() |
|
|
else: |
|
|
if input_gen is not None: |
|
|
last = call(input_gen()) |
|
|
else: |
|
|
last = call() |
|
|
_sync(device) |
|
|
end = time.perf_counter() |
|
|
times_s.append(end - start) |
|
|
|
|
|
|
|
|
if i > 0 and i % max(1, iters // 5) == 0: |
|
|
pct = (i / iters) * 100 |
|
|
avg_so_far = np.mean(times_s[:i]) * 1000 |
|
|
print(f" Progress: {pct:.0f}% complete (avg: {avg_so_far:.3f} ms)") |
|
|
|
|
|
return last, times_s |
|
|
|
|
|
def tensor_stats(t: torch.Tensor) -> str: |
|
|
"""Generate comprehensive stats string for a tensor.""" |
|
|
return (f"shape={tuple(t.shape)}, " |
|
|
f"dtype={t.dtype}, " |
|
|
f"device={t.device}, " |
|
|
f"range=[{t.min().item():.6f}, {t.max().item():.6f}], " |
|
|
f"mean={t.mean().item():.6f}, " |
|
|
f"std={t.std().item():.6f}, " |
|
|
f"norm={t.norm().item():.6f}") |
|
|
|
|
|
@contextmanager |
|
|
def bench_context( |
|
|
*, warmup: int = 25, iters: int = 100, device: str = "cuda", dtype=torch.float32, tokens: Optional[int] = None, verbose: bool = True, save_json: Optional[str] = None, vary_inputs: bool = True |
|
|
): |
|
|
"""Context that yields a runner: runner(fn, *args, **kwargs) -> (result, stats). |
|
|
|
|
|
If vary_inputs=True, the first argument should be a base tensor that will be varied each iteration |
|
|
by adding a small deterministic increment to prevent caching artifacts. |
|
|
""" |
|
|
|
|
|
def runner(fn: Callable[..., Any], *args, **kwargs) -> Tuple[Any, Dict[str, float]]: |
|
|
|
|
|
if verbose: |
|
|
print(f"\n┌─ Benchmark Configuration ─────────────────────────────┐") |
|
|
|
|
|
print(f"│ Warmup: {warmup:<15} Iters: {iters} │") |
|
|
if tokens: |
|
|
print(f"│ Tokens: {tokens} │") |
|
|
if vary_inputs: |
|
|
print(f"│ Input Variation: Enabled (prevents caching artifacts) │") |
|
|
print(f"└────────────────────────────────────────────────────────┘") |
|
|
|
|
|
|
|
|
input_gen = None |
|
|
if vary_inputs and args and isinstance(args[0], torch.Tensor): |
|
|
base_input = args[0].clone() |
|
|
iteration_counter = [0] |
|
|
|
|
|
def generate_varied_input(): |
|
|
"""Generate input tensor varied by iteration to prevent caching.""" |
|
|
|
|
|
varied_input = base_input + (iteration_counter[0] * 0.001) |
|
|
iteration_counter[0] += 1 |
|
|
return varied_input |
|
|
|
|
|
input_gen = generate_varied_input |
|
|
call = lambda x: fn(x, *args[1:], **kwargs) |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print(f"\nBase Input: {tensor_stats(base_input)}") |
|
|
print(f"Input Variation: +{0.001:.3f} * iteration (deterministic)") |
|
|
else: |
|
|
|
|
|
call = lambda: fn(*args, **kwargs) |
|
|
if verbose and args and isinstance(args[0], torch.Tensor): |
|
|
print(f"\nInput: {tensor_stats(args[0])}") |
|
|
|
|
|
result, times_s = _bench_engine(call, warmup=warmup, iters=iters, device=device, dtype=dtype, input_gen=input_gen) |
|
|
|
|
|
|
|
|
if verbose: |
|
|
print("\nOutput tensors:") |
|
|
if isinstance(result, torch.Tensor): |
|
|
print(f" Primary: {tensor_stats(result)}") |
|
|
elif isinstance(result, tuple) and len(result) > 0 and isinstance(result[0], torch.Tensor): |
|
|
print(f" Primary: {tensor_stats(result[0])}") |
|
|
if len(result) > 1: |
|
|
if isinstance(result[1], torch.Tensor): |
|
|
print(f" Auxiliary: {tensor_stats(result[1])}") |
|
|
else: |
|
|
print(f" Auxiliary: {type(result[1]).__name__}") |
|
|
|
|
|
|
|
|
stats = _compute_stats(times_s, tokens=tokens) |
|
|
if verbose: |
|
|
print(_format_timing_stats(stats, tokens)) |
|
|
|
|
|
|
|
|
if save_json: |
|
|
import json |
|
|
json_data = { |
|
|
"implementation": save_json.replace(".json", ""), |
|
|
"config": { |
|
|
"warmup": warmup, |
|
|
"iters": iters, |
|
|
"device": str(device), |
|
|
"dtype": str(dtype), |
|
|
"tokens": tokens, |
|
|
"vary_inputs": vary_inputs |
|
|
}, |
|
|
"stats": stats, |
|
|
"output_sum": float(result[0].sum().item()) if isinstance(result, tuple) and len(result) > 0 else float(result.sum().item()) if isinstance(result, torch.Tensor) else None |
|
|
} |
|
|
with open(save_json, 'w') as f: |
|
|
json.dump(json_data, f, indent=2) |
|
|
if verbose: |
|
|
print(f"\nSaved benchmark results to {save_json}") |
|
|
|
|
|
return result, stats |
|
|
|
|
|
yield runner |
|
|
|
|
|
def set_seed(seed: int): |
|
|
"""Set seeds for reproducibility.""" |
|
|
torch.manual_seed(seed) |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.manual_seed(seed) |
|
|
torch.cuda.manual_seed_all(seed) |
|
|
torch.backends.cudnn.deterministic = True |
|
|
torch.backends.cudnn.benchmark = False |