Spaces:
Sleeping
Sleeping
| import matplotlib.pyplot as plt | |
| import matplotlib | |
| import pandas as pd | |
| import gradio as gr | |
| import threading | |
| from data import CIResults | |
| from utils import logger, generate_underlined_line | |
| from summary_page import create_summary_page | |
| # Configure matplotlib to prevent memory warnings and set dark background | |
| matplotlib.rcParams['figure.facecolor'] = '#000000' | |
| matplotlib.rcParams['axes.facecolor'] = '#000000' | |
| matplotlib.rcParams['savefig.facecolor'] = '#000000' | |
| plt.ioff() # Turn off interactive mode to prevent figure accumulation | |
| # Load data once at startup | |
| Ci_results = CIResults() | |
| Ci_results.load_data() | |
| # Start the auto-reload scheduler | |
| Ci_results.schedule_data_reload() | |
| def plot_model_stats(model_name: str) -> tuple[plt.Figure, str, str]: | |
| """Draws a pie chart of model's passed, failed, skipped, and error stats.""" | |
| if Ci_results.df.empty or model_name not in Ci_results.df.index: | |
| # Handle case where model data is not available | |
| fig, ax = plt.subplots(figsize=(10, 8), facecolor='#000000') | |
| ax.set_facecolor('#000000') | |
| ax.text(0.5, 0.5, f'No data available for {model_name}', | |
| horizontalalignment='center', verticalalignment='center', | |
| transform=ax.transAxes, fontsize=16, color='#888888', | |
| fontfamily='monospace', weight='normal') | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| ax.axis('off') | |
| return fig, "No data available", "No data available" | |
| row = Ci_results.df.loc[model_name] | |
| # Handle missing values and get counts directly from dataframe | |
| success_amd = int(row.get('success_amd', 0)) if pd.notna(row.get('success_amd', 0)) else 0 | |
| success_nvidia = int(row.get('success_nvidia', 0)) if pd.notna(row.get('success_nvidia', 0)) else 0 | |
| failed_multi_amd = int(row.get('failed_multi_no_amd', 0)) if pd.notna(row.get('failed_multi_no_amd', 0)) else 0 | |
| failed_multi_nvidia = int(row.get('failed_multi_no_nvidia', 0)) if pd.notna(row.get('failed_multi_no_nvidia', 0)) else 0 | |
| failed_single_amd = int(row.get('failed_single_no_amd', 0)) if pd.notna(row.get('failed_single_no_amd', 0)) else 0 | |
| failed_single_nvidia = int(row.get('failed_single_no_nvidia', 0)) if pd.notna(row.get('failed_single_no_nvidia', 0)) else 0 | |
| # Calculate total failures | |
| total_failed_amd = failed_multi_amd + failed_single_amd | |
| total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia | |
| # Softer color palette - less pastel, more vibrant | |
| colors = { | |
| 'passed': '#4CAF50', # Medium green | |
| 'failed': '#E53E3E', # More red | |
| 'skipped': '#FFD54F', # Medium yellow | |
| 'error': '#8B0000' # Dark red | |
| } | |
| # Create stats dictionaries directly from dataframe values | |
| amd_stats = { | |
| 'passed': success_amd, | |
| 'failed': total_failed_amd, | |
| 'skipped': 0, # Not available in this dataset | |
| 'error': 0 # Not available in this dataset | |
| } | |
| nvidia_stats = { | |
| 'passed': success_nvidia, | |
| 'failed': total_failed_nvidia, | |
| 'skipped': 0, # Not available in this dataset | |
| 'error': 0 # Not available in this dataset | |
| } | |
| # Filter out categories with 0 values for cleaner visualization | |
| amd_filtered = {k: v for k, v in amd_stats.items() if v > 0} | |
| nvidia_filtered = {k: v for k, v in nvidia_stats.items() if v > 0} | |
| if not amd_filtered and not nvidia_filtered: | |
| # Handle case where all values are 0 - minimal empty state | |
| fig, ax = plt.subplots(figsize=(10, 8), facecolor='#000000') | |
| ax.set_facecolor('#000000') | |
| ax.text(0.5, 0.5, 'No test results available', | |
| horizontalalignment='center', verticalalignment='center', | |
| transform=ax.transAxes, fontsize=16, color='#888888', | |
| fontfamily='monospace', weight='normal') | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| ax.axis('off') | |
| return fig, "", "" | |
| # Create figure with two subplots side by side with padding | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 9), facecolor='#000000') | |
| ax1.set_facecolor('#000000') | |
| ax2.set_facecolor('#000000') | |
| def create_pie_chart(ax, device_label, filtered_stats): | |
| if not filtered_stats: | |
| ax.text(0.5, 0.5, 'No test results', | |
| horizontalalignment='center', verticalalignment='center', | |
| transform=ax.transAxes, fontsize=14, color='#888888', | |
| fontfamily='monospace', weight='normal') | |
| ax.set_title(device_label, | |
| fontsize=28, weight='bold', pad=2, color='#FFFFFF', | |
| fontfamily='monospace') | |
| ax.axis('off') | |
| return | |
| chart_colors = [colors[category] for category in filtered_stats.keys()] | |
| # Create minimal pie chart - full pie, no donut effect | |
| wedges, texts, autotexts = ax.pie( | |
| filtered_stats.values(), | |
| labels=[label.lower() for label in filtered_stats.keys()], # Lowercase for minimal look | |
| colors=chart_colors, | |
| autopct=lambda pct: f'{int(pct/100*sum(filtered_stats.values()))}', | |
| startangle=90, | |
| explode=None, # No separation | |
| shadow=False, | |
| wedgeprops=dict(edgecolor='#1a1a1a', linewidth=0.5), # Minimal borders | |
| textprops={'fontsize': 12, 'weight': 'normal', 'color': '#CCCCCC', 'fontfamily': 'monospace'} | |
| ) | |
| # Enhanced percentage text styling for better readability | |
| for autotext in autotexts: | |
| autotext.set_color('#000000') # Black text for better contrast | |
| autotext.set_weight('bold') | |
| autotext.set_fontsize(14) | |
| autotext.set_fontfamily('monospace') | |
| # Minimal category labels | |
| for text in texts: | |
| text.set_color('#AAAAAA') | |
| text.set_weight('normal') | |
| text.set_fontsize(13) | |
| text.set_fontfamily('monospace') | |
| # Device label closer to chart and bigger | |
| ax.set_title(device_label, | |
| fontsize=28, weight='normal', pad=2, color='#FFFFFF', | |
| fontfamily='monospace') | |
| # Create both pie charts with device labels | |
| create_pie_chart(ax1, "amd", amd_filtered) | |
| create_pie_chart(ax2, "nvidia", nvidia_filtered) | |
| # Add subtle separation line between charts - stops at device labels level | |
| line_x = 0.5 | |
| fig.add_artist(plt.Line2D([line_x, line_x], [0.0, 0.85], | |
| color='#333333', linewidth=1, alpha=0.5, | |
| transform=fig.transFigure)) | |
| # Add central shared title for model name | |
| fig.suptitle(f'{model_name.lower()}', | |
| fontsize=32, weight='bold', color='#CCCCCC', | |
| fontfamily='monospace', y=1) | |
| # Clean layout with padding and space for central title | |
| plt.tight_layout() | |
| plt.subplots_adjust(top=0.85, wspace=0.4) # Added wspace for padding between charts | |
| # Generate failure info directly from dataframe | |
| failures_amd = row.get('failures_amd', {}) | |
| failures_nvidia = row.get('failures_nvidia', {}) | |
| amd_failed_info = extract_failure_info(failures_amd, 'AMD', failed_multi_amd, failed_single_amd) | |
| nvidia_failed_info = extract_failure_info(failures_nvidia, 'NVIDIA', failed_multi_nvidia, failed_single_nvidia) | |
| return fig, amd_failed_info, nvidia_failed_info | |
| def extract_failure_info(failures_obj, device: str, multi_count: int, single_count: int) -> str: | |
| """Extract failure information from failures object.""" | |
| if (not failures_obj or pd.isna(failures_obj)) and multi_count == 0 and single_count == 0: | |
| return f"No failures on {device}" | |
| info_lines = [] | |
| # Add counts summary | |
| if multi_count > 0 or single_count > 0: | |
| info_lines.append(generate_underlined_line(f"Failure Summary for {device}:")) | |
| if multi_count > 0: | |
| info_lines.append(f"Multi GPU failures: {multi_count}") | |
| if single_count > 0: | |
| info_lines.append(f"Single GPU failures: {single_count}") | |
| info_lines.append("") | |
| # Try to extract detailed failure information | |
| try: | |
| if isinstance(failures_obj, dict): | |
| # Check for multi and single failure categories | |
| if 'multi' in failures_obj and failures_obj['multi']: | |
| info_lines.append(generate_underlined_line(f"Multi GPU failure details:")) | |
| if isinstance(failures_obj['multi'], list): | |
| # Handle list of failures (could be strings or dicts) | |
| for i, failure in enumerate(failures_obj['multi'][:10]): # Limit to first 10 | |
| if isinstance(failure, dict): | |
| # Extract meaningful info from dict (e.g., test name, line, etc.) | |
| failure_str = failure.get('line', failure.get('test', failure.get('name', str(failure)))) | |
| info_lines.append(f" {i+1}. {failure_str}") | |
| else: | |
| info_lines.append(f" {i+1}. {str(failure)}") | |
| if len(failures_obj['multi']) > 10: | |
| info_lines.append(f"... and {len(failures_obj['multi']) - 10} more") | |
| else: | |
| info_lines.append(str(failures_obj['multi'])) | |
| info_lines.append("") | |
| if 'single' in failures_obj and failures_obj['single']: | |
| info_lines.append(generate_underlined_line(f"Single GPU failure details:")) | |
| if isinstance(failures_obj['single'], list): | |
| # Handle list of failures (could be strings or dicts) | |
| for i, failure in enumerate(failures_obj['single'][:10]): # Limit to first 10 | |
| if isinstance(failure, dict): | |
| # Extract meaningful info from dict (e.g., test name, line, etc.) | |
| failure_str = failure.get('line', failure.get('test', failure.get('name', str(failure)))) | |
| info_lines.append(f" {i+1}. {failure_str}") | |
| else: | |
| info_lines.append(f" {i+1}. {str(failure)}") | |
| if len(failures_obj['single']) > 10: | |
| info_lines.append(f"... and {len(failures_obj['single']) - 10} more") | |
| else: | |
| info_lines.append(str(failures_obj['single'])) | |
| return "\n".join(info_lines) if info_lines else f"No detailed failure info for {device}" | |
| except Exception as e: | |
| if multi_count > 0 or single_count > 0: | |
| return f"Failures detected on {device} (Multi: {multi_count}, Single: {single_count})\nDetails unavailable: {str(e)}" | |
| return f"Error processing failure info for {device}: {str(e)}" | |
| # Load CSS from external file | |
| def load_css(): | |
| try: | |
| with open("styles.css", "r") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| logger.warning("styles.css not found, using minimal default styles") | |
| return "body { background: #000; color: #fff; }" | |
| # Create the Gradio interface with sidebar and dark theme | |
| with gr.Blocks(title="Model Test Results Dashboard", css=load_css()) as demo: | |
| with gr.Row(): | |
| # Sidebar for model selection | |
| with gr.Column(scale=1, elem_classes=["sidebar"]): | |
| gr.Markdown("# π€ TCID", elem_classes=["sidebar-title"]) | |
| # Description with integrated last update time | |
| if Ci_results.last_update_time: | |
| description_text = f"**Transformer CI Dashboard**\n\n*Result overview by model and hardware (last updated: {Ci_results.last_update_time})*\n" | |
| else: | |
| description_text = f"**Transformer CI Dashboard**\n\n*Result overview by model and hardware (loading...)*\n" | |
| description_display = gr.Markdown(description_text, elem_classes=["sidebar-description"]) | |
| # Summary button at the top | |
| summary_button = gr.Button( | |
| "summary\nπ", | |
| variant="primary", | |
| size="lg", | |
| elem_classes=["summary-button"] | |
| ) | |
| # Model selection header | |
| gr.Markdown(f"**Select model ({len(Ci_results.available_models)}):**", elem_classes=["model-header"]) | |
| # Scrollable container for model buttons | |
| with gr.Column(scale=1, elem_classes=["model-container"]): | |
| # Create individual buttons for each model | |
| model_buttons = [] | |
| model_choices = [model.lower() for model in Ci_results.available_models] if Ci_results.available_models else ["auto", "bert", "clip", "llama"] | |
| for model_name in model_choices: | |
| btn = gr.Button( | |
| model_name, | |
| variant="secondary", | |
| size="sm", | |
| elem_classes=["model-button"] | |
| ) | |
| model_buttons.append(btn) | |
| # CI job links at bottom of sidebar | |
| ci_links_display = gr.Markdown("π **CI Jobs:** *Loading...*", elem_classes=["sidebar-links"]) | |
| # Main content area | |
| with gr.Column(scale=4, elem_classes=["main-content"]): | |
| # Summary display (default view) | |
| summary_display = gr.Plot( | |
| value=create_summary_page(Ci_results.df, Ci_results.available_models), | |
| label="", | |
| format="png", | |
| elem_classes=["plot-container"], | |
| visible=True | |
| ) | |
| # Detailed view components (hidden by default) | |
| with gr.Column(visible=False, elem_classes=["detail-view"]) as detail_view: | |
| # Create the plot output | |
| plot_output = gr.Plot( | |
| label="", | |
| format="png", | |
| elem_classes=["plot-container"] | |
| ) | |
| # Create two separate failed tests displays in a row layout | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| amd_failed_tests_output = gr.Textbox( | |
| value="", | |
| lines=8, | |
| max_lines=8, | |
| interactive=False, | |
| container=False, | |
| elem_classes=["failed-tests"] | |
| ) | |
| with gr.Column(scale=1): | |
| nvidia_failed_tests_output = gr.Textbox( | |
| value="", | |
| lines=8, | |
| max_lines=8, | |
| interactive=False, | |
| container=False, | |
| elem_classes=["failed-tests"] | |
| ) | |
| # Set up click handlers for model buttons | |
| for i, btn in enumerate(model_buttons): | |
| model_name = model_choices[i] | |
| btn.click( | |
| fn=lambda selected_model=model_name: plot_model_stats(selected_model), | |
| outputs=[plot_output, amd_failed_tests_output, nvidia_failed_tests_output] | |
| ).then( | |
| fn=lambda: [gr.update(visible=False), gr.update(visible=True)], | |
| outputs=[summary_display, detail_view] | |
| ) | |
| # Summary button click handler | |
| def show_summary_and_update_links(): | |
| """Show summary page and update CI links.""" | |
| return create_summary_page(Ci_results.df, Ci_results.available_models), get_description_text(), get_ci_links() | |
| summary_button.click( | |
| fn=show_summary_and_update_links, | |
| outputs=[summary_display, description_display, ci_links_display] | |
| ).then( | |
| fn=lambda: [gr.update(visible=True), gr.update(visible=False)], | |
| outputs=[summary_display, detail_view] | |
| ) | |
| # Function to get current description text | |
| def get_description_text(): | |
| """Get description text with integrated last update time.""" | |
| if Ci_results.last_update_time: | |
| return f"**Transformer CI Dashboard**\n\n*Result overview by model and hardware (last updated: {Ci_results.last_update_time})*\n" | |
| else: | |
| return f"**Transformer CI Dashboard**\n\n*Result overview by model and hardware (loading...)*\n" | |
| # Function to get CI job links | |
| def get_ci_links(): | |
| """Get CI job links from the most recent data.""" | |
| try: | |
| # Check if df exists and is not empty | |
| if Ci_results.df is None or Ci_results.df.empty: | |
| return "π **CI Jobs:** *Loading...*" | |
| # Get links from any available model (they should be the same for all models in a run) | |
| amd_multi_link = None | |
| amd_single_link = None | |
| nvidia_multi_link = None | |
| nvidia_single_link = None | |
| for model_name in Ci_results.df.index: | |
| row = Ci_results.df.loc[model_name] | |
| # Extract AMD links | |
| if pd.notna(row.get('job_link_amd')) and (not amd_multi_link or not amd_single_link): | |
| amd_link_raw = row.get('job_link_amd') | |
| if isinstance(amd_link_raw, dict): | |
| if 'multi' in amd_link_raw and not amd_multi_link: | |
| amd_multi_link = amd_link_raw['multi'] | |
| if 'single' in amd_link_raw and not amd_single_link: | |
| amd_single_link = amd_link_raw['single'] | |
| # Extract NVIDIA links | |
| if pd.notna(row.get('job_link_nvidia')) and (not nvidia_multi_link or not nvidia_single_link): | |
| nvidia_link_raw = row.get('job_link_nvidia') | |
| if isinstance(nvidia_link_raw, dict): | |
| if 'multi' in nvidia_link_raw and not nvidia_multi_link: | |
| nvidia_multi_link = nvidia_link_raw['multi'] | |
| if 'single' in nvidia_link_raw and not nvidia_single_link: | |
| nvidia_single_link = nvidia_link_raw['single'] | |
| # Break if we have all links | |
| if amd_multi_link and amd_single_link and nvidia_multi_link and nvidia_single_link: | |
| break | |
| links_md = "π **CI Jobs:**\n\n" | |
| # AMD links | |
| if amd_multi_link or amd_single_link: | |
| links_md += "**AMD:**\n" | |
| if amd_multi_link: | |
| links_md += f"β’ [Multi GPU]({amd_multi_link})\n" | |
| if amd_single_link: | |
| links_md += f"β’ [Single GPU]({amd_single_link})\n" | |
| links_md += "\n" | |
| # NVIDIA links | |
| if nvidia_multi_link or nvidia_single_link: | |
| links_md += "**NVIDIA:**\n" | |
| if nvidia_multi_link: | |
| links_md += f"β’ [Multi GPU]({nvidia_multi_link})\n" | |
| if nvidia_single_link: | |
| links_md += f"β’ [Single GPU]({nvidia_single_link})\n" | |
| if not (amd_multi_link or amd_single_link or nvidia_multi_link or nvidia_single_link): | |
| links_md += "*No links available*" | |
| return links_md | |
| except Exception as e: | |
| logger.error(f"getting CI links: {e}") | |
| return "π **CI Jobs:** *Error loading links*" | |
| # Auto-update CI links when the interface loads | |
| demo.load( | |
| fn=get_ci_links, | |
| outputs=[ci_links_display] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |