Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| import argparse | |
| import os | |
| import sys | |
| from pathlib import Path | |
| sys.path.append(str(Path(__file__).parent)) | |
| import fev | |
| import pandas as pd | |
| from src.utils import format_leaderboard | |
| # Constants from the main app | |
| BASELINE_MODEL = "Seasonal Naive" | |
| LEAKAGE_IMPUTATION_MODEL = "Chronos-Bolt" | |
| SORT_COL = "win_rate" | |
| N_RESAMPLES_FOR_CI = 1000 | |
| TOP_K_MODELS_TO_PLOT = 15 | |
| AVAILABLE_METRICS = ["SQL", "MASE", "WQL", "WAPE"] | |
| def load_summaries(path="."): | |
| csv_files = list(Path(path).glob("*.csv")) | |
| if not csv_files: | |
| raise FileNotFoundError(f"No CSV files found in {path}") | |
| dfs = [pd.read_csv(file) for file in csv_files] | |
| return pd.concat(dfs, ignore_index=True) | |
| def compute_leaderboard(summaries: pd.DataFrame, metric_name: str) -> pd.DataFrame: | |
| lb = fev.analysis.leaderboard( | |
| summaries=summaries, | |
| metric_column=metric_name, | |
| missing_strategy="impute", | |
| baseline_model=BASELINE_MODEL, | |
| leakage_imputation_model=LEAKAGE_IMPUTATION_MODEL, | |
| ) | |
| lb = lb.astype("float64").reset_index() | |
| lb["skill_score"] = lb["skill_score"] * 100 | |
| lb["win_rate"] = lb["win_rate"] * 100 | |
| lb["num_failures"] = lb["num_failures"] / summaries["task_name"].nunique() * 100 | |
| return lb | |
| def compute_pairwise(summaries: pd.DataFrame, metric_name: str, included_models: list[str]) -> pd.DataFrame: | |
| if BASELINE_MODEL not in included_models: | |
| included_models = included_models + [BASELINE_MODEL] | |
| return ( | |
| fev.analysis.pairwise_comparison( | |
| summaries, | |
| included_models=included_models, | |
| metric_column=metric_name, | |
| baseline_model=BASELINE_MODEL, | |
| missing_strategy="impute", | |
| n_resamples=N_RESAMPLES_FOR_CI, | |
| leakage_imputation_model=LEAKAGE_IMPUTATION_MODEL, | |
| ) | |
| .round(3) | |
| .reset_index() | |
| ) | |
| def compute_pivot_table(summaries: pd.DataFrame, metric_name: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | |
| errors = fev.pivot_table(summaries=summaries, metric_column=metric_name, task_columns=["task_name"]) | |
| train_overlap = ( | |
| fev.pivot_table(summaries=summaries, metric_column="trained_on_this_dataset", task_columns=["task_name"]) | |
| .fillna(False) | |
| .astype(bool) | |
| ) | |
| is_imputed_baseline = errors.isna() | |
| is_leakage_imputed = train_overlap | |
| # Handle imputations | |
| errors = errors.mask(train_overlap, errors[LEAKAGE_IMPUTATION_MODEL], axis=0) | |
| for col in errors.columns: | |
| if col != BASELINE_MODEL: | |
| errors[col] = errors[col].fillna(errors[BASELINE_MODEL]) | |
| errors = errors[errors.rank(axis=1).mean().sort_values().index] | |
| is_imputed_baseline = is_imputed_baseline[errors.columns] | |
| is_leakage_imputed = is_leakage_imputed[errors.columns] | |
| errors.index.rename("Task name", inplace=True) | |
| is_imputed_baseline.index.rename("Task name", inplace=True) | |
| is_leakage_imputed.index.rename("Task name", inplace=True) | |
| return errors.reset_index(), is_imputed_baseline.reset_index(), is_leakage_imputed.reset_index() | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Generate leaderboard tables from CSV summaries") | |
| parser.add_argument("-s", "--summaries-path", default=".", help="Path to directory containing CSV files") | |
| args = parser.parse_args() | |
| # Create tables directory | |
| tables_dir = Path("tables") | |
| tables_dir.mkdir(exist_ok=True) | |
| print("Loading summaries...") | |
| summaries = load_summaries(args.summaries_path) | |
| for metric in AVAILABLE_METRICS: | |
| print(f"Processing {metric}...") | |
| # Compute leaderboard | |
| leaderboard_df = compute_leaderboard(summaries, metric) | |
| leaderboard_df.to_csv(tables_dir / f"leaderboard_{metric}.csv", index=False) | |
| # Get top models for pairwise comparison | |
| top_k_models = ( | |
| leaderboard_df.sort_values(by=SORT_COL, ascending=False).head(TOP_K_MODELS_TO_PLOT)["model_name"].tolist() | |
| ) | |
| # Compute pairwise comparison | |
| pairwise_df = compute_pairwise(summaries, metric, top_k_models) | |
| pairwise_df.to_csv(tables_dir / f"pairwise_{metric}.csv", index=False) | |
| # Compute pivot table | |
| pivot_df, baseline_imputed, leakage_imputed = compute_pivot_table(summaries, metric) | |
| pivot_df.to_csv(tables_dir / f"pivot_{metric}.csv", index=False) | |
| baseline_imputed.to_csv(tables_dir / f"pivot_{metric}_baseline_imputed.csv", index=False) | |
| leakage_imputed.to_csv(tables_dir / f"pivot_{metric}_leakage_imputed.csv", index=False) | |
| print(f" Saved: leaderboard_{metric}.csv, pairwise_{metric}.csv, pivot_{metric}.csv") | |
| print(f"All tables saved to {tables_dir}/") | |
| if __name__ == "__main__": | |
| main() | |