#!/usr/bin/env python3 """ LLM Inference Performance Dashboard A Gradio-based dashboard for visualizing and analyzing LLM inference benchmark results. Provides filtering, comparison, and historical analysis capabilities. """ import gradio as gr import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots import pandas as pd import polars as pl from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import logging import json from benchmark_data_reader import BenchmarkDataReader logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BenchmarkDashboard: """Main dashboard class for LLM inference performance visualization.""" def __init__(self): """Initialize the dashboard and load data.""" self.reader = BenchmarkDataReader() self.df = None self.scenario_mappings = self.load_scenario_mappings() self.metric_mappings = self.get_metric_mappings() self.load_data() def load_data(self) -> None: """Load benchmark data from files.""" try: self.df = self.reader.read_benchmark_files() if not self.df.is_empty(): # Convert to pandas for easier plotting with plotly self.df_pandas = self.df.to_pandas() # Convert timestamp to datetime self.df_pandas['timestamp'] = pd.to_datetime(self.df_pandas['timestamp']) logger.info(f"Loaded {len(self.df_pandas)} benchmark scenarios") else: logger.warning("No benchmark data loaded") self.df_pandas = pd.DataFrame() except Exception as e: logger.error(f"Error loading data: {e}") self.df_pandas = pd.DataFrame() def load_scenario_mappings(self) -> Dict[str, str]: """Load scenario name mappings from JSON file.""" try: with open('scenario_mappings.json', 'r') as f: return json.load(f) except Exception as e: logger.warning(f"Could not load scenario mappings: {e}") return {} def get_readable_scenario_name(self, scenario_name: str) -> str: """Get human-readable scenario name or return original if not mapped.""" return self.scenario_mappings.get(scenario_name, scenario_name) def get_raw_scenario_name(self, readable_name: str) -> str: """Convert human-readable scenario name back to raw scenario name.""" # Find the raw name that maps to this readable name for raw_name, mapped_name in self.scenario_mappings.items(): if mapped_name == readable_name: return raw_name # If not found in mappings, assume it's already a raw name return readable_name def get_metric_mappings(self) -> Dict[str, str]: """Get metric name mappings from technical to human-readable names.""" return { "tokens_per_second_mean": "Tokens per Second", "latency_seconds_mean": "Latency (seconds)", "time_to_first_token_seconds_mean": "Time to First Token (seconds)", "time_per_output_token_seconds_mean": "Time per Output Token (seconds)" } def get_readable_metric_name(self, metric_name: str) -> str: """Get human-readable metric name or return original if not mapped.""" return self.metric_mappings.get(metric_name, metric_name) def get_raw_metric_name(self, readable_name: str) -> str: """Convert human-readable metric name back to raw metric name.""" for raw_name, mapped_name in self.metric_mappings.items(): if mapped_name == readable_name: return raw_name return readable_name def get_best_scenario_for_model(self, model_name: str, metric: str = "tokens_per_second_mean") -> str: """Get the best performing scenario for a given model.""" if self.df_pandas.empty: return "" # Filter data for this model model_data = self.df_pandas[self.df_pandas['model_name'] == model_name] if model_data.empty: return "" # Define priority order for scenarios (preference for kernelized/compiled) priority_order = [ "eager_sdpa_flash_attention", "eager_sdpa_efficient_attention", "compiled_compile_max-autotune_sdpa_efficient_attention", "compiled_compile_max-autotune_sdpa_default", "compiled_compile_max-autotune_sdpa_math", "compiled_compile_max-autotune_eager_attn", "eager_sdpa_default", "eager_sdpa_math", "eager_eager_attn" ] # Check if metric exists if metric not in model_data.columns: # Fallback to first available scenario in priority order for scenario in priority_order: if scenario in model_data['scenario_name'].values: return self.get_readable_scenario_name(scenario) return self.get_readable_scenario_name(model_data['scenario_name'].iloc[0]) # Find best performing scenario (highest value for throughput metrics, lowest for latency) is_latency_metric = 'latency' in metric.lower() or 'time' in metric.lower() if is_latency_metric: best_row = model_data.loc[model_data[metric].idxmin()] else: best_row = model_data.loc[model_data[metric].idxmax()] return self.get_readable_scenario_name(best_row['scenario_name']) def get_organized_scenarios(self, available_raw_scenarios: List[str]) -> Tuple[List[str], List[str]]: """Organize scenarios into priority groups with separators.""" # Define priority scenarios (main recommended scenarios) priority_raw_scenarios = [ "eager_sdpa_flash_attention", "compiled_compile_max-autotune_sdpa_default" ] # Define expert/advanced scenarios (including efficient attention) expert_raw_scenarios = [ "eager_sdpa_efficient_attention", "compiled_compile_max-autotune_sdpa_efficient_attention", "compiled_compile_max-autotune_eager_attn", "compiled_compile_max-autotune_sdpa_math", "eager_sdpa_default", "eager_eager_attn", "eager_sdpa_math" ] # Get available scenarios in priority order priority_scenarios = [] expert_scenarios = [] # Add priority scenarios that are available for raw_scenario in priority_raw_scenarios: if raw_scenario in available_raw_scenarios: readable_name = self.get_readable_scenario_name(raw_scenario) priority_scenarios.append(readable_name) # Add expert scenarios that are available for raw_scenario in expert_raw_scenarios: if raw_scenario in available_raw_scenarios: readable_name = self.get_readable_scenario_name(raw_scenario) expert_scenarios.append(readable_name) # Combine with separator all_scenarios = priority_scenarios.copy() if expert_scenarios: all_scenarios.append("─── Advanced/Developer Options ───") all_scenarios.extend(expert_scenarios) # Return all scenarios (no default selections for multi-select anymore) return all_scenarios, [] def get_filter_options(self) -> Tuple[List[str], List[str], List[str], List[str], List[str], str, str]: """Get unique values for filter dropdowns and date range.""" if self.df_pandas.empty: return [], [], [], [], [], "", "" models = sorted(self.df_pandas['model_name'].dropna().unique().tolist()) # Get organized scenarios with priority ordering and default selections raw_scenarios = sorted(self.df_pandas['scenario_name'].dropna().unique().tolist()) scenarios, default_scenarios = self.get_organized_scenarios(raw_scenarios) gpus = sorted(self.df_pandas['gpu_name'].dropna().unique().tolist()) # Get benchmark runs grouped by date (or commit_id if available) benchmark_runs = [] # Group by commit_id if available, otherwise group by date if self.df_pandas['commit_id'].notna().any(): # Group by commit_id for commit_id in self.df_pandas['commit_id'].dropna().unique(): commit_data = self.df_pandas[self.df_pandas['commit_id'] == commit_id] date_str = commit_data['timestamp'].min().strftime('%Y-%m-%d') models_count = len(commit_data['model_name'].unique()) scenarios_count = len(commit_data['scenario_name'].unique()) run_id = f"Commit {commit_id[:8]} ({date_str}) - {models_count} models, {scenarios_count} scenarios" benchmark_runs.append(run_id) else: # Group by date since commit_id is not available self.df_pandas['date'] = self.df_pandas['timestamp'].dt.date for date in sorted(self.df_pandas['date'].unique()): date_data = self.df_pandas[self.df_pandas['date'] == date] models_count = len(date_data['model_name'].unique()) scenarios_count = len(date_data['scenario_name'].unique()) # Check if any commit_id exists for this date (even if null) unique_commits = date_data['commit_id'].dropna().unique() if len(unique_commits) > 0: commit_display = f"Commit {unique_commits[0][:8]}" else: commit_display = "No commit ID" run_id = f"{date} - {commit_display} - {models_count} models, {scenarios_count} scenarios" benchmark_runs.append(run_id) benchmark_runs = sorted(benchmark_runs) # Get date range min_date = self.df_pandas['timestamp'].min().strftime('%Y-%m-%d') max_date = self.df_pandas['timestamp'].max().strftime('%Y-%m-%d') return models, scenarios, gpus, benchmark_runs, default_scenarios, min_date, max_date def filter_data(self, selected_model: str, selected_scenarios: List[str], selected_gpus: List[str], selected_run: str = None, start_date: str = None, end_date: str = None) -> pd.DataFrame: """Filter data based on user selections.""" if self.df_pandas.empty: return pd.DataFrame() filtered_df = self.df_pandas.copy() if selected_model: filtered_df = filtered_df[filtered_df['model_name'] == selected_model] if selected_scenarios: # Filter out separator lines and convert human-readable scenario names back to raw names for filtering valid_scenarios = [scenario for scenario in selected_scenarios if not scenario.startswith("───")] raw_scenarios = [self.get_raw_scenario_name(scenario) for scenario in valid_scenarios] filtered_df = filtered_df[filtered_df['scenario_name'].isin(raw_scenarios)] if selected_gpus: filtered_df = filtered_df[filtered_df['gpu_name'].isin(selected_gpus)] # Filter by date range if start_date and end_date: start_datetime = pd.to_datetime(start_date) end_datetime = pd.to_datetime(end_date) + pd.Timedelta(days=1) # Include end date filtered_df = filtered_df[ (filtered_df['timestamp'] >= start_datetime) & (filtered_df['timestamp'] < end_datetime) ] # Filter by specific benchmark run (commit or date-based grouping) if selected_run: if selected_run.startswith("Commit "): # Extract commit_id from the run_id format: "Commit 12345678 (2025-09-16) - models" try: commit_id_part = selected_run.split('Commit ')[1].split(' ')[0] # Get commit hash # Find all data with this commit_id filtered_df = filtered_df[filtered_df['commit_id'] == commit_id_part] except (IndexError, ValueError): # Fallback if parsing fails logger.warning(f"Failed to parse commit from: {selected_run}") else: # Date-based grouping format: "2025-09-16 - X models, Y scenarios" try: date_str = selected_run.split(' - ')[0] selected_date = pd.to_datetime(date_str).date() # Add date column if not exists if 'date' not in filtered_df.columns: filtered_df = filtered_df.copy() filtered_df['date'] = filtered_df['timestamp'].dt.date # Filter by date filtered_df = filtered_df[filtered_df['date'] == selected_date] except (IndexError, ValueError) as e: logger.warning(f"Failed to parse date from: {selected_run}, error: {e}") # Return empty dataframe if parsing fails filtered_df = filtered_df.iloc[0:0] return filtered_df def create_performance_comparison_chart(self, filtered_df: pd.DataFrame, metric: str = "tokens_per_second_mean") -> go.Figure: """Create performance comparison chart.""" if filtered_df.empty: fig = go.Figure() fig.add_annotation(text="No data available for selected filters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig # Add human-readable scenario names for display plot_df = filtered_df.copy() plot_df['scenario_display'] = plot_df['scenario_name'].apply(self.get_readable_scenario_name) # Create bar chart comparing performance across models and scenarios fig = px.bar( plot_df, x='scenario_display', y=metric, color='model_name', title=f'Performance Comparison: {self.get_readable_metric_name(metric)}', labels={ metric: self.get_readable_metric_name(metric), 'scenario_display': 'Benchmark Scenario', 'model_name': 'Model' }, hover_data=['gpu_name', 'timestamp'] ) fig.update_layout( xaxis_tickangle=-45, height=500, showlegend=True, plot_bgcolor='rgba(235, 242, 250, 1.0)', paper_bgcolor='rgba(245, 248, 252, 0.7)' ) return fig def create_historical_trend_chart(self, filtered_df: pd.DataFrame, metric: str = "tokens_per_second_mean") -> go.Figure: """Create historical trend chart showing performance across different benchmark runs for the same scenarios.""" if filtered_df.empty: fig = go.Figure() fig.add_annotation(text="No data available for selected filters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig fig = go.Figure() # Group by model and scenario combination to show trends across benchmark runs for model in filtered_df['model_name'].unique(): model_data = filtered_df[filtered_df['model_name'] == model] for scenario in model_data['scenario_name'].unique(): scenario_data = model_data[model_data['scenario_name'] == scenario] # Sort by timestamp to show chronological progression scenario_data = scenario_data.sort_values('timestamp') # Only show trends if we have multiple data points for this model-scenario combination if len(scenario_data) > 1: # Use human-readable scenario name for display readable_scenario = self.get_readable_scenario_name(scenario) fig.add_trace(go.Scatter( x=scenario_data['timestamp'], y=scenario_data[metric], mode='lines+markers', name=f'{model} - {readable_scenario}', line=dict(width=2), marker=dict(size=6), hovertemplate=f'{model}
' + f'Scenario: {readable_scenario}
' + 'Time: %{x}
' + f'{self.get_readable_metric_name(metric)}: %{{y}}
' + '' )) # If no trends found (all scenarios have only single runs), show a message if len(fig.data) == 0: fig.add_annotation( text="No historical trends available.
Each scenario only has one benchmark run.
Historical trends require multiple runs of the same scenario over time.", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=14) ) fig.update_layout( title=f'Historical Trends Across Benchmark Runs: {self.get_readable_metric_name(metric)}', xaxis_title='Timestamp', yaxis_title=self.get_readable_metric_name(metric), height=500, hovermode='closest', showlegend=True, plot_bgcolor='rgba(235, 242, 250, 1.0)', paper_bgcolor='rgba(245, 248, 252, 0.7)' ) return fig def create_gpu_comparison_chart(self, filtered_df: pd.DataFrame) -> go.Figure: """Create GPU utilization and memory usage comparison.""" if filtered_df.empty: fig = go.Figure() fig.add_annotation(text="No data available for selected filters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig # Create subplots for GPU metrics fig = make_subplots( rows=1, cols=2, subplot_titles=('GPU Utilization Mean (%)', 'GPU Memory Used (MB)'), specs=[[{"secondary_y": False}, {"secondary_y": False}]] ) # GPU Utilization bar chart gpu_util_data = filtered_df.groupby(['model_name', 'gpu_name'])['gpu_gpu_utilization_mean'].mean().reset_index() for model in gpu_util_data['model_name'].unique(): model_data = gpu_util_data[gpu_util_data['model_name'] == model] fig.add_trace( go.Bar(x=model_data['gpu_name'], y=model_data['gpu_gpu_utilization_mean'], name=f'{model} - Utilization', showlegend=True), row=1, col=1 ) # GPU Memory Usage bar chart gpu_mem_data = filtered_df.groupby(['model_name', 'gpu_name'])['gpu_gpu_memory_used_mean'].mean().reset_index() for model in gpu_mem_data['model_name'].unique(): model_data = gpu_mem_data[gpu_mem_data['model_name'] == model] fig.add_trace( go.Bar(x=model_data['gpu_name'], y=model_data['gpu_gpu_memory_used_mean'], name=f'{model} - Memory', showlegend=True), row=1, col=2 ) fig.update_layout( height=500, title_text="GPU Performance Analysis", plot_bgcolor='rgba(235, 242, 250, 1.0)', paper_bgcolor='rgba(245, 248, 252, 0.7)' ) return fig def create_metrics_summary_table(self, filtered_df: pd.DataFrame) -> pd.DataFrame: """Create summary statistics table with each scenario as a separate row.""" if filtered_df.empty: return pd.DataFrame({'Message': ['No data available for selected filters']}) # Key performance metrics metrics_cols = [ 'tokens_per_second_mean', 'latency_seconds_mean', 'time_to_first_token_seconds_mean', 'time_per_output_token_seconds_mean' ] summary_data = [] # Group by scenario instead of model (since we're now single-model focused) for scenario in filtered_df['scenario_name'].unique(): scenario_data = filtered_df[filtered_df['scenario_name'] == scenario] # Get human-readable scenario name readable_scenario = self.get_readable_scenario_name(scenario) row = {'Scenario': readable_scenario} # Add metrics for this scenario for metric in metrics_cols: if metric in scenario_data.columns and not scenario_data[metric].isna().all(): readable_metric = self.get_readable_metric_name(metric) # For scenarios, show the mean value (since each scenario should have one value per run) mean_value = scenario_data[metric].mean() row[readable_metric] = f"{mean_value:.2f}" summary_data.append(row) return pd.DataFrame(summary_data) def update_dashboard(self, selected_model: str, selected_scenarios: List[str], selected_gpus: List[str], selected_run: str, metric: str): """Update all dashboard components based on current filters.""" filtered_df = self.filter_data( selected_model, selected_scenarios, selected_gpus, selected_run ) # Create charts perf_chart = self.create_performance_comparison_chart(filtered_df, metric) gpu_chart = self.create_gpu_comparison_chart(filtered_df) summary_table = self.create_metrics_summary_table(filtered_df) # Summary stats if not filtered_df.empty: model_name = filtered_df['model_name'].iloc[0] # Get list of scenario names (raw) and convert to readable names raw_scenario_names = sorted(filtered_df['scenario_name'].unique()) readable_scenario_names = [self.get_readable_scenario_name(scenario) for scenario in raw_scenario_names] scenarios_list = ", ".join(readable_scenario_names) date_range = f"{filtered_df['timestamp'].min().strftime('%Y-%m-%d')} to {filtered_df['timestamp'].max().strftime('%Y-%m-%d')}" benchmark_runs = len(filtered_df.groupby(['timestamp', 'file_path'])) summary_text = f""" **Analysis Summary for {model_name}:** - Date Range: {date_range} - Benchmark Runs: {benchmark_runs} - Total Data Points: {len(filtered_df)} **Selected Scenarios:** {scenarios_list} """ else: summary_text = "No data available for current selection." return perf_chart, gpu_chart, summary_table, summary_text def update_historical_trends(self, selected_model: str, selected_scenarios: List[str], selected_gpus: List[str], start_date: str, end_date: str, metric: str): """Update historical trends chart with date filtering.""" filtered_df = self.filter_data( selected_model, selected_scenarios, selected_gpus, start_date=start_date, end_date=end_date ) trend_chart = self.create_historical_trend_chart(filtered_df, metric) return trend_chart def create_gradio_interface() -> gr.Interface: """Create the Gradio interface.""" dashboard = BenchmarkDashboard() models, scenarios, gpus, benchmark_runs, default_scenarios, min_date, max_date = dashboard.get_filter_options() # Performance metrics options (human-readable) raw_metric_options = [ "tokens_per_second_mean", "latency_seconds_mean", "time_to_first_token_seconds_mean", "time_per_output_token_seconds_mean" ] metric_options = [dashboard.get_readable_metric_name(metric) for metric in raw_metric_options] with gr.Blocks(title="LLM Inference Performance Dashboard", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🚀 LLM Inference Performance Dashboard") gr.Markdown("Analyze and compare LLM inference performance across models, scenarios, and hardware configurations.") gr.Markdown("*💡 **Smart Defaults**: The best performing scenario is automatically selected for each model based on throughput analysis.*") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Filters") model_filter = gr.Dropdown( choices=models, value=models[0] if models else None, label="Select Model", interactive=True ) scenario_filter = gr.Dropdown( choices=scenarios, value=[dashboard.get_best_scenario_for_model(models[0], "tokens_per_second_mean")] if models else [], label="Select Scenarios", info="💡 The best performing scenario is automatically selected when you change models", multiselect=True, interactive=True ) gpu_filter = gr.CheckboxGroup( choices=gpus, value=gpus, label="Select GPUs", interactive=True ) metric_selector = gr.Dropdown( choices=metric_options, value=dashboard.get_readable_metric_name("tokens_per_second_mean"), label="Primary Metric", interactive=True ) gr.Markdown("### Benchmark Run Selection") # Search field for filtering benchmark runs run_search = gr.Textbox( value="", label="Search Benchmark Runs", placeholder="Search by date, commit ID, etc.", interactive=True ) # Filtered benchmark run selector benchmark_run_selector = gr.Dropdown( choices=benchmark_runs, value=benchmark_runs[0] if benchmark_runs else None, label="Select Benchmark Run", info="Choose specific daily run (all models from same commit/date)", interactive=True, allow_custom_value=False ) with gr.Column(scale=3): with gr.Tabs(): with gr.TabItem("Performance Comparison"): perf_plot = gr.Plot(label="Performance Comparison") with gr.TabItem("Historical Trends"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Date Range for Historical Analysis") start_date = gr.Textbox( value=min_date, label="Start Date (YYYY-MM-DD)", placeholder="2025-01-01", interactive=True ) end_date = gr.Textbox( value=max_date, label="End Date (YYYY-MM-DD)", placeholder="2025-12-31", interactive=True ) with gr.Column(scale=3): trend_plot = gr.Plot(label="Historical Trends") with gr.TabItem("GPU Analysis"): gpu_plot = gr.Plot(label="GPU Performance Analysis") with gr.TabItem("Summary Statistics"): summary_table = gr.Dataframe(label="Performance Summary") with gr.Row(): summary_text = gr.Markdown("", label="Summary") # Function to filter benchmark runs based on search def filter_benchmark_runs(search_text): if not search_text: return gr.Dropdown(choices=benchmark_runs, value=benchmark_runs[0] if benchmark_runs else None) # Filter runs that contain the search text (case insensitive) filtered_runs = [run for run in benchmark_runs if search_text.lower() in run.lower()] return gr.Dropdown(choices=filtered_runs, value=filtered_runs[0] if filtered_runs else None) # Function to update scenarios when model changes def update_scenarios_for_model(selected_model, current_metric): if not selected_model: return [] # Convert readable metric name back to raw name raw_metric = dashboard.get_raw_metric_name(current_metric) best_scenario = dashboard.get_best_scenario_for_model(selected_model, raw_metric) return [best_scenario] if best_scenario else [] # Update function for main dashboard (excluding historical trends) def update_main(model_selected, scenarios_selected, gpus_selected, run_selected, metric): # Convert readable metric name back to raw name raw_metric = dashboard.get_raw_metric_name(metric) return dashboard.update_dashboard( model_selected, scenarios_selected, gpus_selected, run_selected, raw_metric ) # Update function for historical trends def update_trends(model_selected, scenarios_selected, gpus_selected, start_dt, end_dt, metric): # Convert readable metric name back to raw name raw_metric = dashboard.get_raw_metric_name(metric) return dashboard.update_historical_trends( model_selected, scenarios_selected, gpus_selected, start_dt, end_dt, raw_metric ) # Set up interactivity for main dashboard main_inputs = [model_filter, scenario_filter, gpu_filter, benchmark_run_selector, metric_selector] main_outputs = [perf_plot, gpu_plot, summary_table, summary_text] # Set up interactivity for historical trends trends_inputs = [model_filter, scenario_filter, gpu_filter, start_date, end_date, metric_selector] trends_outputs = [trend_plot] # Update main dashboard on filter changes for input_component in main_inputs: input_component.change(fn=update_main, inputs=main_inputs, outputs=main_outputs) # Update historical trends on filter changes for input_component in trends_inputs: input_component.change(fn=update_trends, inputs=trends_inputs, outputs=trends_outputs) # Connect search field to filter benchmark runs run_search.change(fn=filter_benchmark_runs, inputs=[run_search], outputs=[benchmark_run_selector]) # Auto-update scenarios when model changes model_filter.change( fn=update_scenarios_for_model, inputs=[model_filter, metric_selector], outputs=[scenario_filter] ) # Initial load demo.load(fn=update_main, inputs=main_inputs, outputs=main_outputs) demo.load(fn=update_trends, inputs=trends_inputs, outputs=trends_outputs) return demo def main(): """Launch the dashboard.""" logger.info("Starting LLM Inference Performance Dashboard") try: demo = create_gradio_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True ) except Exception as e: logger.error(f"Error launching dashboard: {e}") raise if __name__ == "__main__": main()