Spaces:
Build error
Build error
| import ast | |
| from collections import defaultdict | |
| from functools import partial | |
| import itertools | |
| import os | |
| import re | |
| from concurrent.futures import ThreadPoolExecutor | |
| import numpy as np | |
| from datetime import datetime | |
| from typing import Any | |
| import gradio as gr | |
| import pandas as pd | |
| from datatrove.io import DataFolder | |
| FALLBACK_TOKEN_NAME = "HF_TOKEN" | |
| def is_arary_like(x): | |
| return isinstance(x, list) or isinstance(x, tuple) or isinstance(x, np.ndarray) | |
| def get_task_type(df): | |
| # Compatibility with old lighteval | |
| # [[Pour calculer le bénéfice net de C]] in new lighteval, "Pour calculer le bénéfice net de C" in old lighteval | |
| if all(isinstance(pred, str) or (is_arary_like(pred) and all(isinstance(item, str) for item in pred)) for pred in df['predictions'].iloc[0]): | |
| return "generative" | |
| # [["1", "2"], ["3", "4"]] in new lighteval, ["1", "2"] in old lighteval | |
| if all(is_arary_like(pred) and all(isinstance(item, float) for item in pred) for pred in df['predictions'].iloc[0]): | |
| return "multiple_choice" | |
| return "mixed" | |
| def fix_df(df): | |
| # For some reason some metrics and predictions are stored as strings | |
| for col in ["predictions", "metrics", "choices", "gold", "gold_index"]: | |
| if col in df.columns: | |
| df[col] = [ast.literal_eval(x) if isinstance(x, str) else x for x in df[col].values] | |
| if col == "predictions": | |
| # For multiple choice | |
| df[col] = df[col].apply(lambda x: [[z[0] for z in x]] if is_arary_like(x) and len(x[0]) == 2 else x) | |
| # For unwraping of generative | |
| df[col] = df[col].apply(lambda x: x[0] if is_arary_like(x) and len(x) == 1 else x) | |
| return df | |
| def get_run_name_seed(run_name): | |
| if "-seed-" not in run_name: | |
| return run_name, 5 | |
| run_name, seed = run_name.split("-seed-") | |
| return run_name, int(seed) | |
| def fetch_repo_structure(results_uri, split_checkpoints=False, oauth_token: gr.OAuthToken | None = None): | |
| token = os.environ.get(FALLBACK_TOKEN_NAME) | |
| if oauth_token: | |
| token = oauth_token.token | |
| data_folder = DataFolder(results_uri, token=token) | |
| try: | |
| runs = [f.removeprefix("details/") for f in data_folder.list_files("details", recursive=False, include_directories=True) if f != "details"] | |
| except Exception as e: | |
| print(f"Error fetching repo structure: {e}") | |
| runs = [] | |
| if not runs: | |
| return {}, gr.update(choices=[], value=None) | |
| def process_run(run): | |
| run_files = [f.removeprefix(f"details/{run}/") for f in data_folder.list_files(f"details/{run}", recursive=False, include_directories=True) if f != f"details/{run}"] | |
| return run, run_files | |
| with ThreadPoolExecutor() as executor: | |
| results = list(executor.map(process_run, runs)) | |
| checkpoints_dict = dict(results) | |
| runs = list(checkpoints_dict.keys()) | |
| if not split_checkpoints: | |
| runs = [f"{run}/{checkpoint}" for run, checkpoints in checkpoints_dict.items() for checkpoint in checkpoints] | |
| return checkpoints_dict, gr.update(choices=runs, value=[]) | |
| def update_checkpoints(selected_runs, checkpoints, split_checkpoints): | |
| if not selected_runs or not split_checkpoints: | |
| return gr.update(choices=[], value=[]) | |
| common_checkpoints = set(checkpoints[selected_runs[0]]) | |
| for run in selected_runs[1:]: | |
| common_checkpoints.intersection_update(set(checkpoints[run])) | |
| common_checkpoints = sorted(list(common_checkpoints)) | |
| return gr.update(choices=common_checkpoints, value=[common_checkpoints[0]] if common_checkpoints else []) | |
| def select_runs_by_regex(runs, current_selected, regex_to_select): | |
| comp_re = re.compile(regex_to_select) | |
| return list(sorted(set((current_selected if current_selected else []) + | |
| [run for run in runs if comp_re.fullmatch(run)]))) | |
| def select_runs_by_language(runs, current_selected, language): | |
| if language: | |
| return select_runs_by_regex(runs, current_selected, f".*-{language}-.*") | |
| return current_selected | |
| def fetch_available_tasks(results_uri, selected_run_checkpoint: list[str]) -> dict[str, dict[str, str]]: | |
| token = os.environ.get(FALLBACK_TOKEN_NAME) | |
| data_folder = DataFolder(results_uri, token=token) | |
| all_tasks = defaultdict(lambda: defaultdict(dict)) | |
| for run_checkpoint in selected_run_checkpoint: | |
| try: | |
| details_folder = f"details/{run_checkpoint}" | |
| files = data_folder.list_files(details_folder, recursive=True) | |
| result_files = [f.removeprefix(details_folder + "/") for f in files if f.endswith('.parquet') or f.endswith('.json')] | |
| for full_filename in result_files: | |
| file_ext = '.parquet' if full_filename.endswith('.parquet') else '.json' | |
| # new lighteval has uses date/task_name_date, old lighteval uses task_name_date | |
| filename = full_filename.replace(file_ext, '').split("/")[-1] | |
| task_name, date_str = filename.rsplit('_', 1) | |
| date = datetime.strptime(date_str, '%Y-%m-%dT%H-%M-%S.%f') | |
| if run_checkpoint not in all_tasks[task_name] or date > all_tasks[task_name][run_checkpoint]['date']: | |
| all_tasks[task_name][run_checkpoint] = {'filename': full_filename, 'date': date} | |
| except FileNotFoundError: | |
| print(f"Checkpoint not found for run: {run_checkpoint}") | |
| # Get tasks that have data for all selected runs | |
| available_tasks = { | |
| task: {run_checkpoint: info['filename'] for run_checkpoint, info in runs_info.items()} | |
| for task, runs_info in all_tasks.items() | |
| if set(runs_info.keys()) == set(selected_run_checkpoint) | |
| } | |
| return available_tasks | |
| def fetch_run_results(results_uri, selected_run_checkpoint: list[str], | |
| oauth_token: gr.OAuthToken | None = None, progress=gr.Progress()): | |
| task_runs_dict = fetch_available_tasks(results_uri, selected_run_checkpoint) | |
| task_names = list(task_runs_dict.keys()) | |
| return gr.update(choices=task_names, value=task_names[0] if task_names else None), task_runs_dict | |
| def render_table(df: pd.DataFrame | None, selected_run_checkpoint: list[str], | |
| metric_names: list[str], filter_different: bool = False, | |
| n_samples: int = 100): | |
| if df is None or not selected_run_checkpoint or not metric_names: | |
| return None, "0" | |
| kept_metrics = [f"metric_{metric_name}_{run_checkpoint}" | |
| for run_checkpoint in selected_run_checkpoint | |
| for metric_name in metric_names] | |
| other_metrics = [col for col in df.columns if col.startswith(f"metric_") and col not in kept_metrics] | |
| df = df.drop(columns=other_metrics) | |
| if filter_different: | |
| df = df[df.apply(lambda row: has_different_values(row, selected_run_checkpoint, metric_names), axis=1)] | |
| df = shorten_column_names(df, selected_run_checkpoint, metric_names) | |
| # Get total number of samples before limiting | |
| total_samples = len(df) | |
| # Take first n_samples instead of random sampling | |
| df = df.head(n_samples) | |
| # Get column widths for better display | |
| column_widths = get_column_widths(df) | |
| return gr.Dataframe( | |
| value=df, | |
| column_widths=column_widths | |
| ), str(total_samples) | |
| def update_selected_run_checkpoint(selected_runs: list[str] | None, selected_checkpoint: list[str] | None, split_checkpoints: bool): | |
| if not selected_runs: | |
| return [] | |
| # In this case we simply return the selected runs which already contain checkpoints | |
| if not split_checkpoints: | |
| return selected_runs | |
| # Otherwise combine runs with checkpoints | |
| return [f"{run}/{checkpoint}" for run in selected_runs for checkpoint in (selected_checkpoint if selected_checkpoint else [])] | |
| def get_column_widths(df): | |
| column_widths = [] | |
| for col in df.columns: | |
| if col == "prompt": | |
| column_widths.append("300px") # Fixed width with overflow | |
| elif col.startswith("generation_"): | |
| column_widths.append("200px") | |
| elif col in ["choices", "gold"]: | |
| column_widths.append("100px") | |
| else: | |
| # Metrics | |
| column_widths.append("50px") # Default width for other columns | |
| return column_widths | |
| def shorten_column_names(df, run_names: list[str], metric_names: list[str]): | |
| """ | |
| Turns metric columns (metric_{metric}_{run_name}) into {metric}_i | |
| Turns generation_{run_name} into generation_i | |
| Also truncates full_prompt and generation columns to 100 chars with expandable view | |
| """ | |
| # Handle metric columns | |
| columns_to_rename = {} | |
| for idx, run_name in enumerate(run_names): | |
| for metric_name in metric_names: | |
| original_metric_column = f"metric_{metric_name}_{run_name}" | |
| if original_metric_column in df.columns: | |
| columns_to_rename[original_metric_column] = f"{metric_name}_{idx}" | |
| original_generation_column = f"generation_{run_name}" | |
| if original_generation_column in df.columns: | |
| columns_to_rename[original_generation_column] = f"generation_{idx}" | |
| # Rename columns in a single operation | |
| df = df.rename(columns=columns_to_rename) | |
| # Add markdown formatting to prompt and generation columns for truncation with expansion | |
| def truncate_with_details(text: str | list[str]): | |
| if is_arary_like(text) and all(isinstance(item, str) for item in text): | |
| return [truncate_with_details(item) for item in text] | |
| elif isinstance(text, str): | |
| text = text.replace('\n', ' ').strip() # Replace newlines with spaces | |
| if len(text) <= 100: | |
| return text | |
| return f"""<details><summary>{text[:100]}...</summary>\n\n{text[100:]}</details>""" | |
| return text | |
| if 'prompt' in df.columns: | |
| df['prompt'] = df['prompt'].apply(truncate_with_details) | |
| # Apply the same truncation to all generation columns | |
| generation_columns = [col for col in df.columns if col.startswith('generation_')] | |
| for col in generation_columns: | |
| df[col] = df[col].apply(truncate_with_details) | |
| return df | |
| def unwrap_selected_run_checkpoint(selected_run_checkpoint: list[str]) -> list[str]: | |
| return selected_run_checkpoint # Now just returns the list directly | |
| def load_task_data(results_uri, selected_run_checkpoint: list[str], task_name, tasks_files, prompt_column, progress=gr.Progress()): | |
| token = os.environ.get(FALLBACK_TOKEN_NAME) | |
| if not selected_run_checkpoint or not task_name: | |
| return None, None | |
| data_folder = DataFolder(f"filecache::{results_uri}", token=token, cache_storage="./results-cache") | |
| def fetch_run_file(run_checkpoint): | |
| file_path = f"details/{run_checkpoint}/{tasks_files[task_name][run_checkpoint]}" | |
| try: | |
| with data_folder.open(file_path, "rb") as f: | |
| if file_path.endswith('.parquet'): | |
| df = pd.read_parquet(f) | |
| else: | |
| df = pd.read_json(f, lines=True) | |
| return df, run_checkpoint | |
| except FileNotFoundError: | |
| print(f"File not found: {tasks_files[task_name][run_checkpoint]}") | |
| return None, run_checkpoint | |
| with ThreadPoolExecutor() as pool: | |
| results = list(progress.tqdm(pool.map(fetch_run_file, selected_run_checkpoint), | |
| total=len(selected_run_checkpoint), | |
| desc="Fetching run data...")) | |
| dfs = [fix_df(df) for df, _ in results if df is not None] | |
| run_names = [run for _, run in results if run is not None] | |
| if not dfs: | |
| return None, None, gr.update(choices=[], value=None) | |
| task_type = get_task_type(dfs[0]) | |
| def prepare_df(df, run_name, task_type, prompt_column): | |
| # Mixed in lighteval-old will look like this: ['광', -13.964999198913574, -13.539217948913574, -13.964999198913574, -13.539217948913574, -12.90467357635498, -13.07825756072998] | |
| # Generative in lighteval-old will look like this "prediction" | |
| # Multiple choice in lighteval-old will look like this ["choice1", "choice2"] | |
| # [np.float64(-132.9295196533203), np.float64(-207.1309356689453), np.float64(-186.64553833007812), np.float64(-230.01414489746094), np.float64(-132.9295196533203), np.float64(-207.1309356689453), np.float64(-186.64553833007812), np.float64(-230.01414489746094), np.float64(-128.63824462890625), np.float64(-203.9550018310547), np.float64(-185.35267639160156), np.float64(-228.23837280273438)] | |
| # For the new lighteval we have: | |
| # Generative: [[Pour calculer le bénéfice net de C]] | |
| def get_choice_predictions(df, task_type): | |
| predictions = df['predictions'] | |
| if task_type == "generative": | |
| # This is strange representation in new lighteval... | |
| if is_arary_like(predictions) and all(is_arary_like(item) for item in predictions): | |
| return predictions[0] | |
| return predictions | |
| if task_type == "multiple_choice": | |
| n_choices = len(df['choices']) | |
| return [pred[0] for pred in predictions[:n_choices]] | |
| if task_type == "mixed": | |
| return predictions[0] | |
| return predictions | |
| generative_columns = { | |
| f"generation_{run_name}": df.apply(partial(get_choice_predictions, task_type=task_type), axis=1) | |
| } if task_type == "generative" or task_type == "mixed" else {} | |
| prepared_df = pd.DataFrame({ | |
| 'prompt': df[prompt_column], | |
| 'choices': df['choices'].apply(tuple), # Convert lists to tuples | |
| 'gold': df['gold'].apply(lambda x: tuple(x) if is_arary_like(x) else x), # Convert lists to tuples | |
| 'gold_index': df['gold_index'], | |
| **generative_columns, | |
| }) | |
| # For some reason some metrics are stored as strings | |
| metrics = df['metrics'] | |
| available_metrics = set(metric for row_metrics in metrics for metric in row_metrics) | |
| for metric_key in available_metrics: | |
| prepared_df[f'metric_{metric_key}_{run_name}'] = [metric.get(metric_key, None) for metric in metrics] | |
| # Merge rows with the same full_prompt | |
| prepared_df = prepared_df.groupby('prompt').agg(lambda x: next((item for item in x if item is not None), None)).reset_index() | |
| prepared_df["prompt"] = prepared_df["prompt"].astype(str) | |
| return prepared_df | |
| def get_gold_label(df, task_type): | |
| if task_type == "generative": | |
| return df['gold'] | |
| return df['gold_index'] | |
| # Prepare the first DataFrame with choices and gold | |
| # Join all prepared DataFrames | |
| prepared_dfs = [ | |
| prepare_df(df, run_name, task_type, prompt_column) | |
| for df, run_name in zip(dfs, run_names) | |
| ] | |
| combined_df = prepared_dfs[0] | |
| for idx, prepared_df in enumerate(prepared_dfs[1:]): | |
| combined_df = combined_df.merge(prepared_df, how='outer', on=("prompt", "gold"), suffixes=(None, f"_{idx}")) | |
| to_keep = ["prompt", "gold"] | |
| if task_type in ["multiple_choice", "mixed"]: | |
| to_keep.append("choices") | |
| elif task_type == "generative": | |
| to_keep.extend([col for col in combined_df.columns if col.startswith("generation_")]) | |
| combined_df['gold'] = combined_df.apply(lambda row: get_gold_label(row, task_type), axis=1).values | |
| metric_cols = [col for col in combined_df.columns if col.startswith("metric_")] | |
| combined_df = combined_df[to_keep + metric_cols] | |
| available_metrics = list(set("_".join(col.split('_')[1:-1]) for col in metric_cols)) | |
| chosen_metrics = available_metrics[:1] | |
| return combined_df, gr.update(choices=available_metrics, value=chosen_metrics) | |
| def has_different_values(row: pd.Series, selected_run_checkpoint: list[str], metric_names: list[str]) -> bool: | |
| """Check if a row has different values across runs for any metric or generation.""" | |
| # Check generations | |
| generation_cols = [f"generation_{run}" for run in selected_run_checkpoint] | |
| generation_cols = [col for col in generation_cols if col in row.index] | |
| if generation_cols: | |
| generations = row[generation_cols].dropna() | |
| # Convert lists to tuples for comparison and handle string values | |
| unique_generations = set() | |
| for gen in generations: | |
| if isinstance(gen, list): | |
| unique_generations.add(tuple(gen)) | |
| else: | |
| unique_generations.add(gen) | |
| if len(unique_generations) > 1: | |
| return True | |
| # Check metrics | |
| for metric in metric_names: | |
| metric_cols = [f"metric_{metric}_{run}" for run in selected_run_checkpoint] | |
| metric_cols = [col for col in metric_cols if col in row.index] | |
| if metric_cols: | |
| metrics = row[metric_cols].dropna() | |
| if len(metrics.unique()) > 1: | |
| return True | |
| return False | |
| with gr.Blocks() as demo: | |
| available_runs_checkpoints = gr.State({}) | |
| results_df_full = gr.State(None) | |
| tasks_files = gr.State({}) | |
| selected_run_checkpoint = gr.State([]) | |
| login_button = gr.LoginButton(visible=False) | |
| results_uri = gr.Textbox(label="Fsspec results URI", value="s3://fineweb-v1/evals/test/", visible=True, placeholder="s3://bucket/path/to/results") | |
| with gr.Column(): | |
| gr.Markdown("# FineWeb experiments results explorer") | |
| split_checkpoints = gr.Checkbox(label="Split checkpoints from models", value=True) | |
| with gr.Row(): | |
| with gr.Column(): | |
| select_by_regex_text = gr.Textbox(label="Regex to select runs", | |
| value="ind_minhash(-CC-MAIN-|_)\\d{4}-\\d{2}-seed.*") | |
| select_by_regex_button = gr.Button("Select matching runs") | |
| with gr.Column(): | |
| select_by_language = gr.Dropdown(choices=["ar", "fr", "ru", "hi", "th", "tr", "zh", "sw", "te"], | |
| interactive=True, label="Select by language", | |
| info="Choose a language to prefill the regex") | |
| with gr.Row() as run_selection_row: | |
| selected_runs = gr.Dropdown(choices=[], interactive=True, multiselect=True, label="Selected runs") | |
| checkpoint = gr.Dropdown(choices=[], interactive=True, label="Checkpoint", multiselect=True) | |
| fetch_res = gr.Button("Fetch results") | |
| task_name = gr.Dropdown(choices=[], interactive=True, label="Task name") | |
| metric_names = gr.Dropdown(choices=[], interactive=True, multiselect=True, label="Metric") | |
| results_df = gr.Dataframe( | |
| interactive=False, | |
| wrap=True, | |
| line_breaks=True, | |
| datatype="markdown", | |
| column_widths=get_column_widths(pd.DataFrame()) # Initialize with empty dataframe | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| num_samples = gr.Text(interactive=False, label="# Samples") | |
| prompt_column = gr.Radio(choices=["full_prompt", "example"], label="Prompt display", value="example") | |
| filter_different = gr.Checkbox(label="Show only samples with differences", value=False) | |
| n_samples_input = gr.Number(value=100, label="Number of samples to show", minimum=1, maximum=1000, step=1) | |
| # Run selection | |
| gr.on( | |
| triggers=[split_checkpoints.change], | |
| fn=lambda split_checkpoints: gr.update(visible=split_checkpoints), | |
| inputs=[split_checkpoints], | |
| outputs=[checkpoint] | |
| ) | |
| gr.on( | |
| triggers=[results_uri.change, split_checkpoints.change], | |
| fn=fetch_repo_structure, inputs=[results_uri, split_checkpoints], outputs=[available_runs_checkpoints, selected_runs], | |
| ) | |
| gr.on( | |
| triggers=[select_by_regex_button.click], | |
| fn=select_runs_by_regex, | |
| inputs=[available_runs_checkpoints, selected_runs, select_by_regex_text], outputs=[selected_runs] | |
| ) | |
| gr.on( | |
| triggers=[select_by_language.change], | |
| fn=select_runs_by_language, | |
| inputs=[available_runs_checkpoints, selected_runs, select_by_language], outputs=[selected_runs] | |
| ) | |
| # Update checkpoints based on selected runs | |
| gr.on( | |
| triggers=[selected_runs.change], | |
| fn=update_checkpoints, | |
| inputs=[selected_runs, available_runs_checkpoints, split_checkpoints], | |
| outputs=[checkpoint] | |
| ) | |
| gr.on( | |
| triggers=[checkpoint.change, selected_runs.change], | |
| fn=update_selected_run_checkpoint, | |
| inputs=[selected_runs, checkpoint, split_checkpoints], | |
| outputs=[selected_run_checkpoint] | |
| ) | |
| # Fetch available tasks | |
| gr.on( | |
| triggers=[fetch_res.click], | |
| fn=fetch_run_results, | |
| inputs=[results_uri, selected_run_checkpoint], | |
| outputs=[task_name, tasks_files] | |
| ).then( | |
| fn=load_task_data, | |
| inputs=[results_uri, selected_run_checkpoint, task_name, tasks_files, prompt_column], | |
| outputs=[results_df_full, metric_names] | |
| ).then( | |
| fn=render_table, | |
| inputs=[results_df_full, selected_run_checkpoint, metric_names, filter_different, n_samples_input], | |
| outputs=[results_df, num_samples] | |
| ) | |
| # Update results when task name or metric changes | |
| gr.on( | |
| triggers=[task_name.input, prompt_column.input], | |
| fn=load_task_data, | |
| inputs=[results_uri, selected_run_checkpoint, task_name, tasks_files, prompt_column], | |
| outputs=[results_df_full, metric_names] | |
| ).then( | |
| fn=render_table, | |
| inputs=[results_df_full, selected_run_checkpoint, metric_names, filter_different, n_samples_input], | |
| outputs=[results_df, num_samples] | |
| ) | |
| gr.on( | |
| triggers=[metric_names.input, filter_different.change, n_samples_input.change], | |
| fn=render_table, | |
| inputs=[results_df_full, selected_run_checkpoint, metric_names, filter_different, n_samples_input], | |
| outputs=[results_df, num_samples] | |
| ) | |
| demo.load(fn=fetch_repo_structure, inputs=[results_uri, split_checkpoints], outputs=[available_runs_checkpoints, selected_runs]) | |
| demo.launch() |