#!/usr/bin/env python3 """ LLM Inference Performance Dashboard A Gradio-based dashboard for visualizing and analyzing LLM inference benchmark results. Provides filtering, comparison, and historical analysis capabilities. """ import gradio as gr import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots import pandas as pd import polars as pl from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import logging from benchmark_data_reader import BenchmarkDataReader logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BenchmarkDashboard: """Main dashboard class for LLM inference performance visualization.""" def __init__(self): """Initialize the dashboard and load data.""" self.reader = BenchmarkDataReader() self.df = None self.load_data() def load_data(self) -> None: """Load benchmark data from files.""" try: self.df = self.reader.read_benchmark_files() if not self.df.is_empty(): # Convert to pandas for easier plotting with plotly self.df_pandas = self.df.to_pandas() # Convert timestamp to datetime self.df_pandas['timestamp'] = pd.to_datetime(self.df_pandas['timestamp']) logger.info(f"Loaded {len(self.df_pandas)} benchmark scenarios") else: logger.warning("No benchmark data loaded") self.df_pandas = pd.DataFrame() except Exception as e: logger.error(f"Error loading data: {e}") self.df_pandas = pd.DataFrame() def get_filter_options(self) -> Tuple[List[str], List[str], List[str], List[str], str, str]: """Get unique values for filter dropdowns and date range.""" if self.df_pandas.empty: return [], [], [], [], "", "" models = sorted(self.df_pandas['model_name'].dropna().unique().tolist()) scenarios = sorted(self.df_pandas['scenario_name'].dropna().unique().tolist()) gpus = sorted(self.df_pandas['gpu_name'].dropna().unique().tolist()) # Get benchmark runs grouped by date (or commit_id if available) benchmark_runs = [] # Group by commit_id if available, otherwise group by date if self.df_pandas['commit_id'].notna().any(): # Group by commit_id for commit_id in self.df_pandas['commit_id'].dropna().unique(): commit_data = self.df_pandas[self.df_pandas['commit_id'] == commit_id] date_str = commit_data['timestamp'].min().strftime('%Y-%m-%d') models_count = len(commit_data['model_name'].unique()) scenarios_count = len(commit_data['scenario_name'].unique()) run_id = f"Commit {commit_id[:8]} ({date_str}) - {models_count} models, {scenarios_count} scenarios" benchmark_runs.append(run_id) else: # Group by date since commit_id is not available self.df_pandas['date'] = self.df_pandas['timestamp'].dt.date for date in sorted(self.df_pandas['date'].unique()): date_data = self.df_pandas[self.df_pandas['date'] == date] models_count = len(date_data['model_name'].unique()) scenarios_count = len(date_data['scenario_name'].unique()) # Check if any commit_id exists for this date (even if null) unique_commits = date_data['commit_id'].dropna().unique() if len(unique_commits) > 0: commit_display = f"Commit {unique_commits[0][:8]}" else: commit_display = "No commit ID" run_id = f"{date} - {commit_display} - {models_count} models, {scenarios_count} scenarios" benchmark_runs.append(run_id) benchmark_runs = sorted(benchmark_runs) # Get date range min_date = self.df_pandas['timestamp'].min().strftime('%Y-%m-%d') max_date = self.df_pandas['timestamp'].max().strftime('%Y-%m-%d') return models, scenarios, gpus, benchmark_runs, min_date, max_date def filter_data(self, selected_models: List[str], selected_scenarios: List[str], selected_gpus: List[str], selected_run: str = None, start_date: str = None, end_date: str = None) -> pd.DataFrame: """Filter data based on user selections.""" if self.df_pandas.empty: return pd.DataFrame() filtered_df = self.df_pandas.copy() if selected_models: filtered_df = filtered_df[filtered_df['model_name'].isin(selected_models)] if selected_scenarios: filtered_df = filtered_df[filtered_df['scenario_name'].isin(selected_scenarios)] if selected_gpus: filtered_df = filtered_df[filtered_df['gpu_name'].isin(selected_gpus)] # Filter by date range if start_date and end_date: start_datetime = pd.to_datetime(start_date) end_datetime = pd.to_datetime(end_date) + pd.Timedelta(days=1) # Include end date filtered_df = filtered_df[ (filtered_df['timestamp'] >= start_datetime) & (filtered_df['timestamp'] < end_datetime) ] # Filter by specific benchmark run (commit or date-based grouping) if selected_run: if selected_run.startswith("Commit "): # Extract commit_id from the run_id format: "Commit 12345678 (2025-09-16) - models" try: commit_id_part = selected_run.split('Commit ')[1].split(' ')[0] # Get commit hash # Find all data with this commit_id filtered_df = filtered_df[filtered_df['commit_id'] == commit_id_part] except (IndexError, ValueError): # Fallback if parsing fails logger.warning(f"Failed to parse commit from: {selected_run}") else: # Date-based grouping format: "2025-09-16 - X models, Y scenarios" try: date_str = selected_run.split(' - ')[0] selected_date = pd.to_datetime(date_str).date() # Add date column if not exists if 'date' not in filtered_df.columns: filtered_df = filtered_df.copy() filtered_df['date'] = filtered_df['timestamp'].dt.date # Filter by date filtered_df = filtered_df[filtered_df['date'] == selected_date] except (IndexError, ValueError) as e: logger.warning(f"Failed to parse date from: {selected_run}, error: {e}") # Return empty dataframe if parsing fails filtered_df = filtered_df.iloc[0:0] return filtered_df def create_performance_comparison_chart(self, filtered_df: pd.DataFrame, metric: str = "tokens_per_second_mean") -> go.Figure: """Create performance comparison chart.""" if filtered_df.empty: fig = go.Figure() fig.add_annotation(text="No data available for selected filters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig # Create bar chart comparing performance across models and scenarios fig = px.bar( filtered_df, x='scenario_name', y=metric, color='model_name', title=f'Performance Comparison: {metric.replace("_", " ").title()}', labels={ metric: metric.replace("_", " ").title(), 'scenario_name': 'Benchmark Scenario', 'model_name': 'Model' }, hover_data=['gpu_name', 'timestamp'] ) fig.update_layout( xaxis_tickangle=-45, height=500, showlegend=True, plot_bgcolor='rgba(235, 242, 250, 1.0)', paper_bgcolor='rgba(245, 248, 252, 0.7)' ) return fig def create_historical_trend_chart(self, filtered_df: pd.DataFrame, metric: str = "tokens_per_second_mean") -> go.Figure: """Create historical trend chart showing performance across different benchmark runs for the same scenarios.""" if filtered_df.empty: fig = go.Figure() fig.add_annotation(text="No data available for selected filters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig fig = go.Figure() # Group by model and scenario combination to show trends across benchmark runs for model in filtered_df['model_name'].unique(): model_data = filtered_df[filtered_df['model_name'] == model] for scenario in model_data['scenario_name'].unique(): scenario_data = model_data[model_data['scenario_name'] == scenario] # Sort by timestamp to show chronological progression scenario_data = scenario_data.sort_values('timestamp') # Only show trends if we have multiple data points for this model-scenario combination if len(scenario_data) > 1: fig.add_trace(go.Scatter( x=scenario_data['timestamp'], y=scenario_data[metric], mode='lines+markers', name=f'{model} - {scenario}', line=dict(width=2), marker=dict(size=6), hovertemplate=f'{model}
' + f'Scenario: {scenario}
' + 'Time: %{x}
' + f'{metric.replace("_", " ").title()}: %{{y}}
' + '' )) # If no trends found (all scenarios have only single runs), show a message if len(fig.data) == 0: fig.add_annotation( text="No historical trends available.
Each scenario only has one benchmark run.
Historical trends require multiple runs of the same scenario over time.", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=14) ) fig.update_layout( title=f'Historical Trends Across Benchmark Runs: {metric.replace("_", " ").title()}', xaxis_title='Timestamp', yaxis_title=metric.replace("_", " ").title(), height=500, hovermode='closest', showlegend=True, plot_bgcolor='rgba(235, 242, 250, 1.0)', paper_bgcolor='rgba(245, 248, 252, 0.7)' ) return fig def create_gpu_comparison_chart(self, filtered_df: pd.DataFrame) -> go.Figure: """Create GPU utilization and memory usage comparison.""" if filtered_df.empty: fig = go.Figure() fig.add_annotation(text="No data available for selected filters", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig # Create subplots for GPU metrics fig = make_subplots( rows=2, cols=2, subplot_titles=('GPU Utilization Mean (%)', 'GPU Memory Used (MB)', 'GPU Utilization vs Performance', 'Memory Usage vs Performance'), specs=[[{"secondary_y": False}, {"secondary_y": False}], [{"secondary_y": False}, {"secondary_y": False}]] ) # GPU Utilization bar chart gpu_util_data = filtered_df.groupby(['model_name', 'gpu_name'])['gpu_gpu_utilization_mean'].mean().reset_index() for model in gpu_util_data['model_name'].unique(): model_data = gpu_util_data[gpu_util_data['model_name'] == model] fig.add_trace( go.Bar(x=model_data['gpu_name'], y=model_data['gpu_gpu_utilization_mean'], name=f'{model} - Utilization', showlegend=True), row=1, col=1 ) # GPU Memory Usage bar chart gpu_mem_data = filtered_df.groupby(['model_name', 'gpu_name'])['gpu_gpu_memory_used_mean'].mean().reset_index() for model in gpu_mem_data['model_name'].unique(): model_data = gpu_mem_data[gpu_mem_data['model_name'] == model] fig.add_trace( go.Bar(x=model_data['gpu_name'], y=model_data['gpu_gpu_memory_used_mean'], name=f'{model} - Memory', showlegend=True), row=1, col=2 ) # GPU Utilization vs Performance scatter fig.add_trace( go.Scatter(x=filtered_df['gpu_gpu_utilization_mean'], y=filtered_df['tokens_per_second_mean'], mode='markers', text=filtered_df['model_name'], name='Util vs Performance', showlegend=True), row=2, col=1 ) # Memory Usage vs Performance scatter fig.add_trace( go.Scatter(x=filtered_df['gpu_gpu_memory_used_mean'], y=filtered_df['tokens_per_second_mean'], mode='markers', text=filtered_df['model_name'], name='Memory vs Performance', showlegend=True), row=2, col=2 ) fig.update_layout( height=800, title_text="GPU Performance Analysis", plot_bgcolor='rgba(235, 242, 250, 1.0)', paper_bgcolor='rgba(245, 248, 252, 0.7)' ) return fig def create_metrics_summary_table(self, filtered_df: pd.DataFrame) -> pd.DataFrame: """Create summary statistics table.""" if filtered_df.empty: return pd.DataFrame({'Message': ['No data available for selected filters']}) # Key performance metrics metrics_cols = [ 'tokens_per_second_mean', 'latency_seconds_mean', 'time_to_first_token_seconds_mean', 'time_per_output_token_seconds_mean' ] summary_data = [] for model in filtered_df['model_name'].unique(): model_data = filtered_df[filtered_df['model_name'] == model] row = {'Model': model, 'Scenarios': len(model_data)} for metric in metrics_cols: if metric in model_data.columns: row[f'{metric.replace("_", " ").title()} (Avg)'] = f"{model_data[metric].mean():.2f}" row[f'{metric.replace("_", " ").title()} (Best)'] = f"{model_data[metric].min() if 'latency' in metric or 'time' in metric else model_data[metric].max():.2f}" summary_data.append(row) return pd.DataFrame(summary_data) def update_dashboard(self, selected_models: List[str], selected_scenarios: List[str], selected_gpus: List[str], selected_run: str, metric: str): """Update all dashboard components based on current filters.""" filtered_df = self.filter_data( selected_models, selected_scenarios, selected_gpus, selected_run ) # Create charts perf_chart = self.create_performance_comparison_chart(filtered_df, metric) gpu_chart = self.create_gpu_comparison_chart(filtered_df) summary_table = self.create_metrics_summary_table(filtered_df) # Summary stats if not filtered_df.empty: summary_text = f""" **Data Summary:** - Total Scenarios: {len(filtered_df)} - Models: {', '.join(filtered_df['model_name'].unique())} - Date Range: {filtered_df['timestamp'].min().strftime('%Y-%m-%d')} to {filtered_df['timestamp'].max().strftime('%Y-%m-%d')} - Benchmark Runs: {len(filtered_df.groupby(['timestamp', 'file_path']))} """ else: summary_text = "No data available for current selection." return perf_chart, gpu_chart, summary_table, summary_text def update_historical_trends(self, selected_models: List[str], selected_scenarios: List[str], selected_gpus: List[str], start_date: str, end_date: str, metric: str): """Update historical trends chart with date filtering.""" filtered_df = self.filter_data( selected_models, selected_scenarios, selected_gpus, start_date=start_date, end_date=end_date ) trend_chart = self.create_historical_trend_chart(filtered_df, metric) return trend_chart def create_gradio_interface() -> gr.Interface: """Create the Gradio interface.""" dashboard = BenchmarkDashboard() models, scenarios, gpus, benchmark_runs, min_date, max_date = dashboard.get_filter_options() # Performance metrics options metric_options = [ "tokens_per_second_mean", "latency_seconds_mean", "time_to_first_token_seconds_mean", "time_per_output_token_seconds_mean" ] with gr.Blocks(title="LLM Inference Performance Dashboard", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🚀 LLM Inference Performance Dashboard") gr.Markdown("Analyze and compare LLM inference performance across models, scenarios, and hardware configurations.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Filters") model_filter = gr.CheckboxGroup( choices=models, value=models, label="Select Models", interactive=True ) scenario_filter = gr.CheckboxGroup( choices=scenarios, value=scenarios[:5] if len(scenarios) > 5 else scenarios, # Limit initial selection label="Select Scenarios", interactive=True ) gpu_filter = gr.CheckboxGroup( choices=gpus, value=gpus, label="Select GPUs", interactive=True ) metric_selector = gr.Dropdown( choices=metric_options, value="tokens_per_second_mean", label="Primary Metric", interactive=True ) gr.Markdown("### Benchmark Run Selection") # Search field for filtering benchmark runs run_search = gr.Textbox( value="", label="Search Benchmark Runs", placeholder="Search by date, commit ID, etc.", interactive=True ) # Filtered benchmark run selector benchmark_run_selector = gr.Dropdown( choices=benchmark_runs, value=benchmark_runs[0] if benchmark_runs else None, label="Select Benchmark Run", info="Choose specific daily run (all models from same commit/date)", interactive=True, allow_custom_value=False ) with gr.Column(scale=3): with gr.Tabs(): with gr.TabItem("Performance Comparison"): perf_plot = gr.Plot(label="Performance Comparison") with gr.TabItem("Historical Trends"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Date Range for Historical Analysis") start_date = gr.Textbox( value=min_date, label="Start Date (YYYY-MM-DD)", placeholder="2025-01-01", interactive=True ) end_date = gr.Textbox( value=max_date, label="End Date (YYYY-MM-DD)", placeholder="2025-12-31", interactive=True ) with gr.Column(scale=3): trend_plot = gr.Plot(label="Historical Trends") with gr.TabItem("GPU Analysis"): gpu_plot = gr.Plot(label="GPU Performance Analysis") with gr.TabItem("Summary Statistics"): summary_table = gr.Dataframe(label="Performance Summary") with gr.Row(): summary_text = gr.Markdown("", label="Summary") # Function to filter benchmark runs based on search def filter_benchmark_runs(search_text): if not search_text: return gr.Dropdown(choices=benchmark_runs, value=benchmark_runs[0] if benchmark_runs else None) # Filter runs that contain the search text (case insensitive) filtered_runs = [run for run in benchmark_runs if search_text.lower() in run.lower()] return gr.Dropdown(choices=filtered_runs, value=filtered_runs[0] if filtered_runs else None) # Update function for main dashboard (excluding historical trends) def update_main(models_selected, scenarios_selected, gpus_selected, run_selected, metric): return dashboard.update_dashboard( models_selected, scenarios_selected, gpus_selected, run_selected, metric ) # Update function for historical trends def update_trends(models_selected, scenarios_selected, gpus_selected, start_dt, end_dt, metric): return dashboard.update_historical_trends( models_selected, scenarios_selected, gpus_selected, start_dt, end_dt, metric ) # Set up interactivity for main dashboard main_inputs = [model_filter, scenario_filter, gpu_filter, benchmark_run_selector, metric_selector] main_outputs = [perf_plot, gpu_plot, summary_table, summary_text] # Set up interactivity for historical trends trends_inputs = [model_filter, scenario_filter, gpu_filter, start_date, end_date, metric_selector] trends_outputs = [trend_plot] # Update main dashboard on filter changes for input_component in main_inputs: input_component.change(fn=update_main, inputs=main_inputs, outputs=main_outputs) # Update historical trends on filter changes for input_component in trends_inputs: input_component.change(fn=update_trends, inputs=trends_inputs, outputs=trends_outputs) # Connect search field to filter benchmark runs run_search.change(fn=filter_benchmark_runs, inputs=[run_search], outputs=[benchmark_run_selector]) # Initial load demo.load(fn=update_main, inputs=main_inputs, outputs=main_outputs) demo.load(fn=update_trends, inputs=trends_inputs, outputs=trends_outputs) return demo def main(): """Launch the dashboard.""" logger.info("Starting LLM Inference Performance Dashboard") try: demo = create_gradio_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True ) except Exception as e: logger.error(f"Error launching dashboard: {e}") raise if __name__ == "__main__": main()