mshuaibi's picture
black
0056d04
raw
history blame
24.6 kB
import datetime
import json
import os
import tempfile
from email.utils import parseaddr
from typing import Dict, List, Tuple, Optional
import gradio as gr
import numpy as np
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
from datasets import VerificationMode, load_dataset, Dataset
from huggingface_hub import HfApi, snapshot_download
from content import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
INTRODUCTION_TEXT,
SUBMISSION_TEXT,
PRE_COLUMN_NAMES,
POST_COLUMN_NAMES,
TITLE,
TYPES,
model_hyperlink,
)
from evaluator import evaluate
# Configuration constants
TOKEN = os.environ.get("TOKEN", None)
OWNER = "facebook"
# Dataset repositories
INTERNAL_DATA_DATASET = f"{OWNER}/fairchem_internal"
SUBMISSION_DATASET = f"{OWNER}/fairchem_leaderboard_submissions"
RESULTS_DATASET = f"{OWNER}/fairchem_leaderboard_results"
CONTACT_DATASET = f"{OWNER}/fairchem_leaderboard_contact_info_internal"
LEADERBOARD_PATH = f"{OWNER}/fairchem_leaderboard"
# Initialize HuggingFace API
api = HfApi()
# S2EF subsplits for validation and test data
S2EF_SUBSPLITS = [
"all",
"biomolecules",
"electrolytes",
"metal_complexes",
"neutral_organics",
]
# Evaluation types that are not S2EF
OTHER_EVAL_TYPES = [
"Ligand pocket",
"Ligand strain",
"Conformers",
"Protonation",
"IE_EA",
"Distance scaling",
"Spin gap",
]
# All evaluation types for the dropdown
ALL_EVAL_TYPES = ["Validation", "Test"] + OTHER_EVAL_TYPES
class LeaderboardData:
"""
Manages leaderboard data loading and processing.
"""
def __init__(self):
self._setup_data_paths()
self._load_contact_info()
self._eval_results = None
self._results_dfs = None
def _setup_data_paths(self):
"""
Setup target and result file paths.
"""
target_data_dir = snapshot_download(
repo_id=INTERNAL_DATA_DATASET,
repo_type="dataset",
token=TOKEN,
)
self.target_paths = {
"Validation": f"{target_data_dir}/omol_val_labels.npz",
"Test": f"{target_data_dir}/omol_test_labels.npz",
"Distance Scaling": f"{target_data_dir}/distance_scaling_labels.json",
"Ligand pocket": f"{target_data_dir}/ligand_pocket_labels.json",
"Ligand strain": f"{target_data_dir}/ligand_strain_labels.json",
"Conformers": f"{target_data_dir}/geom_conformers_labels.json",
"Protonation": f"{target_data_dir}/protonation_energies_labels.json",
"IE_EA": f"{target_data_dir}/unoptimized_ie_ea_labels.json",
"Distance scaling": f"{target_data_dir}/distance_scaling_labels.json",
"Spin gap": f"{target_data_dir}/unoptimized_spin_gap_labels.json",
}
self.result_paths = {
"Validation": "validation_s2ef.parquet",
"Test": "test_s2ef.parquet",
"Ligand pocket": "ligand_pocket.parquet",
"Ligand strain": "ligand_strain.parquet",
"Conformers": "geom_conformers.parquet",
"Protonation": "protonation.parquet",
"IE_EA": "ie_ea.parquet",
"Distance scaling": "distance_scaling.parquet",
"Spin gap": "spin_gap.parquet",
}
def _load_contact_info(self):
"""
Load contact information dataset.
"""
self.contact_infos = load_dataset(
CONTACT_DATASET,
token=TOKEN,
download_mode="force_redownload",
verification_mode=VerificationMode.NO_CHECKS,
)
def load_eval_data(self) -> Tuple[Dict, Dict[str, pd.DataFrame]]:
"""
Load all evaluation data and return results and dataframes.
"""
if self._eval_results is not None and self._results_dfs is not None:
return self._eval_results, self._results_dfs
# Load S2EF results
s2ef_results = load_dataset(
RESULTS_DATASET,
token=TOKEN,
download_mode="force_redownload",
verification_mode=VerificationMode.NO_CHECKS,
data_files={
"Validation": os.path.join("data", self.result_paths["Validation"]),
"Test": os.path.join("data", self.result_paths["Test"]),
},
)
eval_results = dict(s2ef_results)
# Load other evaluation types
for eval_type in OTHER_EVAL_TYPES:
eval_type_data = load_dataset(
RESULTS_DATASET,
token=TOKEN,
download_mode="force_redownload",
verification_mode=VerificationMode.NO_CHECKS,
data_files={"data": os.path.join("data", self.result_paths[eval_type])},
)
eval_results[eval_type] = eval_type_data["data"]
# Generate result dataframes
results_dfs = {}
# S2EF dataframes
for split in ["Validation", "Test"]:
for subsplit in S2EF_SUBSPLITS:
df_key = f"{split}_{subsplit}"
results_dfs[df_key] = self._get_s2ef_df_from_results(
eval_results, split, subsplit
)
# Other evaluation dataframes
for split in OTHER_EVAL_TYPES:
results_dfs[split] = self._get_eval_df_from_results(eval_results, split)
# Cache the results
self._eval_results = eval_results
self._results_dfs = results_dfs
return eval_results, results_dfs
def _get_s2ef_df_from_results(
self, eval_results: Dict, split: str, subsplit: str
) -> pd.DataFrame:
"""
Generate S2EF dataframe from evaluation results.
"""
local_df = eval_results[split]
local_df = local_df.map(
lambda row: {
"Model": model_hyperlink(
row["model_url"], row["paper_url"], row["Model"]
)
}
)
filtered_columns = (
PRE_COLUMN_NAMES
+ [f"{subsplit}_energy_mae", f"{subsplit}_forces_mae"]
+ POST_COLUMN_NAMES
)
df = pd.DataFrame(local_df)
avail_columns = list(df.columns)
missing_columns = list(set(filtered_columns) - set(avail_columns))
df[missing_columns] = "-"
df = df[filtered_columns]
# Unit conversion
for col in df.columns:
if "mae" in col.lower():
df[col] = (df[col] * 1000).round(2)
elif pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].round(4)
df = df.sort_values(by=[f"{subsplit}_energy_mae"], ascending=True)
df[f"{subsplit}_energy_mae"] = df[f"{subsplit}_energy_mae"]
df[f"{subsplit}_forces_mae"] = df[f"{subsplit}_forces_mae"]
df = df.rename(
columns={
f"{subsplit}_energy_mae": "Energy MAE\n[meV]",
f"{subsplit}_forces_mae": "Forces MAE\n[meV/Å]",
"Energy Conserving": "Energy\nConserving",
}
)
return df
def _get_eval_df_from_results(self, eval_results: Dict, split: str) -> pd.DataFrame:
"""
Generate evaluation dataframe from results.
"""
local_df = eval_results[split]
local_df = local_df.map(
lambda row: {
"Model": model_hyperlink(
row["model_url"], row["paper_url"], row["Model"]
)
}
)
eval_columns = LEADERBOARD_COLUMNS[split]
filtered_columns = PRE_COLUMN_NAMES + eval_columns + POST_COLUMN_NAMES
df = pd.DataFrame(local_df)
avail_columns = list(df.columns)
missing_columns = list(set(filtered_columns) - set(avail_columns))
df[missing_columns] = "-"
df = df[filtered_columns]
# Unit conversion
for col in df.columns:
if "mae" in col.lower():
df[col] = (df[col] * 1000).round(2)
elif pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].round(4)
df = df.sort_values(by=[eval_columns[0]], ascending=True)
df = df.rename(columns=COLUMN_MAPPING)
return df
leaderboard_data = LeaderboardData()
# Column configurations for different evaluation types
LEADERBOARD_COLUMNS = {
"Ligand pocket": ["interaction_energy_mae", "interaction_forces_mae"],
"Ligand strain": ["strain_energy_mae", "global_min_rmsd"],
"Conformers": ["deltaE_mae", "ensemble_rmsd"],
"Protonation": ["deltaE_mae", "rmsd"],
"IE_EA": ["deltaE_mae", "deltaF_mae"],
"Distance scaling": ["lr_ddE_mae", "lr_ddF_mae", "sr_ddE_mae", "sr_ddF_mae"],
"Spin gap": ["deltaE_mae", "deltaF_mae"],
}
COLUMN_MAPPING = {
"interaction_energy_mae": "Ixn Energy\nMAE [meV]",
"interaction_forces_mae": "Ixn Forces\nMAE [meV/Å]",
"strain_energy_mae": "Strain Energy\nMAE [meV]",
"deltaE_mae": "\u0394Energy MAE\n[meV]",
"deltaF_mae": "\u0394Forces MAE\n[meV/Å]",
"ensemble_rmsd": "RMSD [Å]",
"global_min_rmsd": "RMSD [Å]",
"rmsd": "RMSD [Å]",
"lr_ddE_mae": "\u0394Energy (LR)\n MAE [meV]",
"lr_ddF_mae": "\u0394Forces (LR)\n MAE [meV/Å]",
"sr_ddE_mae": "\u0394Energy (SR)\n MAE [meV]",
"sr_ddF_mae": "\u0394Forces (SR)\n MAE [meV/Å]",
"Energy Conserving": "Energy\nConserving",
}
def add_new_eval(
path_to_file: str,
eval_type: str,
organization: str,
model: str,
model_url: str,
paper_url: str,
energy_conserving: bool,
mail: str,
training_set: str,
additional_info: str,
profile: gr.OAuthProfile,
) -> str:
"""Add a new evaluation to the leaderboard."""
print(f"Adding new eval of type: {eval_type}")
try:
# Validate email address
_, parsed_mail = parseaddr(mail)
if "@" not in parsed_mail:
yield "⚠️ Please provide a valid email address."
return
# Check monthly submission limit (5 submissions per month)
contact_key = eval_type.replace(" ", "_")
user_submission_dates = sorted(
row["date"]
for row in leaderboard_data.contact_infos.get(contact_key, [])
if row["username"] == profile.username
)
current_month = datetime.datetime.now().strftime("%Y-%m")
current_month_submissions = [
date for date in user_submission_dates if date.startswith(current_month)
]
if len(current_month_submissions) >= 5:
yield f"⚠️ You have reached the monthly submission limit of 5 submissions. Please try again next month."
return
# Validate file submission
if path_to_file is None:
yield "⚠️ Please upload a file."
return
if not (path_to_file.endswith(".npz") or path_to_file.endswith(".json")):
yield "⚠️ Please submit a valid npz or json file"
return
# Evaluate the submission
yield "⚙️ Evaluating your submission...(do not close/refresh this page!)"
metrics = evaluate(
leaderboard_data.target_paths[eval_type],
path_to_file,
eval_type,
)
submission_time = datetime.datetime.today().strftime("%Y-%m-%d-%H:%M")
# Upload submission file
yield "☁️ Uploading submission file..."
api.upload_file(
repo_id=SUBMISSION_DATASET,
path_or_fileobj=path_to_file,
path_in_repo=f"{organization}/{model}/submissions/{training_set}/{eval_type}_{submission_time}_{os.path.basename(path_to_file)}",
repo_type="dataset",
token=TOKEN,
)
# Update leaderboard data
yield "📋 Updating leaderboard data..."
eval_results, _ = leaderboard_data.load_eval_data()
eval_entry = {
"Model": model,
"Organization": organization,
"Submission date": submission_time,
"Training Set": training_set,
"Energy Conserving": energy_conserving,
"model_url": model_url,
"paper_url": paper_url,
"Notes": additional_info,
}
eval_entry.update(metrics)
if eval_type not in eval_results:
eval_results[eval_type] = Dataset.from_dict(
{k: [v] for k, v in eval_entry.items()}
)
else:
eval_results[eval_type] = eval_results[eval_type].add_item(eval_entry)
data_file_name = leaderboard_data.result_paths[eval_type]
# Upload results
yield "💾 Saving results to database..."
with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp_file:
eval_results[eval_type].to_parquet(tmp_file.name)
api.upload_file(
repo_id=RESULTS_DATASET,
path_or_fileobj=tmp_file.name,
path_in_repo=f"data/{data_file_name}",
repo_type="dataset",
token=TOKEN,
)
# Save contact information
contact_info = {
"model": model,
"organization": organization,
"username": profile.username,
"email": mail,
"date": submission_time,
}
if contact_key not in leaderboard_data.contact_infos:
leaderboard_data.contact_infos[contact_key] = Dataset.from_dict(
{k: [v] for k, v in contact_info.items()}
)
else:
leaderboard_data.contact_infos[contact_key] = (
leaderboard_data.contact_infos[contact_key].add_item(contact_info)
)
leaderboard_data.contact_infos.push_to_hub(CONTACT_DATASET, token=TOKEN)
success_str = f"✅ Model {model} is successfully evaluated and stored in our database.\nPlease wait an hour and refresh the leaderboard to see your results displayed."
yield success_str
except Exception as e:
print(f"Error during submission: {e}")
yield (
f"An error occurred, please open a discussion and indicate at what time you encountered the error.\n{e}"
)
def create_dataframe_tab(
tab_name: str,
df: pd.DataFrame,
datatype: List[str] = None,
widths: List[str] = None,
) -> gr.Tab:
"""
Create a tab with a dataframe.
"""
if datatype is None:
datatype = TYPES
if widths is None:
num_cols = len(df.columns)
fixed_cols = len(PRE_COLUMN_NAMES) + len(POST_COLUMN_NAMES)
# Model | Organization |Energy Conserving | Training Set | Metrics | date
widths = ["10%", "5%", "5%", "5%"] + ["5%"] * (num_cols - fixed_cols) + ["10%"]
with gr.Tab(tab_name) as tab:
gr.Dataframe(
value=df,
datatype=datatype,
interactive=False,
show_search="filter",
column_widths=widths,
)
return tab
def create_s2ef_tabs(split: str, results_dfs: Dict[str, pd.DataFrame]) -> None:
"""
Create S2EF tabs for a given split (Validation/Test).
"""
subsplit_names = {
"all": "All",
"biomolecules": "Biomolecules",
"electrolytes": "Electrolytes",
"metal_complexes": "Metal Complexes",
"neutral_organics": "Neutral Organics",
}
for subsplit, display_name in subsplit_names.items():
df_key = f"{split}_{subsplit}"
create_dataframe_tab(display_name, results_dfs[df_key])
def create_evaluation_tabs(results_dfs: Dict[str, pd.DataFrame]) -> None:
"""
Create evaluation tabs for non-S2EF evaluations, including Overview tab.
"""
# Create Overview tab first
overview_df = create_overview_dataframe(results_dfs)
n_overview_columns = len(overview_df.columns)
create_dataframe_tab(
"Overview", overview_df, widths=["20%"] + ["10%"] * (n_overview_columns - 1)
)
# Create individual evaluation tabs
for eval_type in OTHER_EVAL_TYPES:
display_name = "IE/EA" if eval_type == "IE_EA" else eval_type
create_dataframe_tab(display_name, results_dfs[eval_type])
def create_overview_dataframe(results_dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Create an overview dataframe combining all models with only the first metric from each eval type.
"""
# Initialize overview data with model info
overview_data = {}
# Get all unique model-dataset combinations across all dataframes
all_model_entries = set()
model_info = {}
# Collect all models and their info from all evaluation types
for eval_type, df in results_dfs.items():
if eval_type.startswith("Validation_") or eval_type.startswith("Test_"):
continue
for _, row in df.iterrows():
model_name = row["Model"]
dataset = row["Training Set"]
# Create unique identifier combining model name and training set
model_entry = (model_name, dataset)
all_model_entries.add(model_entry)
# Store model metadata for this specific entry
model_info[model_entry] = {
"Model": model_name,
"Organization": row.get("Organization", ""),
"Energy Conserving": row.get("Energy\nConserving", ""),
"Training Set": dataset,
}
# Initialize overview data structure
overview_data = {
"Model": [],
"Organization": [],
"Energy\nConserving": [],
"Training Set": [],
}
# Add columns for the primary metric from each evaluation type
metric_columns = {}
# Add primary metric from each OTHER evaluation type (skip S2EF)
for eval_type in OTHER_EVAL_TYPES:
if eval_type in results_dfs and eval_type in LEADERBOARD_COLUMNS:
primary_metric = LEADERBOARD_COLUMNS[eval_type][0] # First metric
# Map to display name using COLUMN_MAPPING
metric_display_name = COLUMN_MAPPING.get(primary_metric, primary_metric)
# Include task name to avoid conflicts when multiple tasks have same metric
task_display_name = "IE/EA" if eval_type == "IE_EA" else eval_type
full_display_name = f"{task_display_name}\n{metric_display_name}"
overview_data[full_display_name] = []
metric_columns[full_display_name] = (eval_type, metric_display_name)
# Populate data for each model entry
for model_entry in sorted(
all_model_entries, key=lambda x: (x[0], x[1])
): # Sort by model name, then dataset
model_name, dataset = model_entry
entry_info = model_info[model_entry]
overview_data["Model"].append(entry_info["Model"])
overview_data["Organization"].append(entry_info["Organization"])
overview_data["Energy\nConserving"].append(entry_info["Energy Conserving"])
overview_data["Training Set"].append(entry_info["Training Set"])
# Fill in metrics for each column
for display_col, (eval_type, source_col) in metric_columns.items():
if eval_type in results_dfs:
df = results_dfs[eval_type]
# Match both model name and training set
model_row = df[
(df["Model"] == model_name) & (df["Training Set"] == dataset)
]
if not model_row.empty and source_col in model_row.columns:
value = model_row.iloc[0][source_col]
else:
value = "-"
else:
value = "-"
overview_data[display_col].append(value)
overview_df = pd.DataFrame(overview_data)
# Sort by the average of all metric columns (ascending for MAE metrics)
metric_cols = [
col
for col in overview_df.columns
if col not in PRE_COLUMN_NAMES + POST_COLUMN_NAMES
]
if metric_cols:
# Calculate average across all metric columns for each row
# Convert all metric columns to numeric, keeping "-" as NaN
numeric_metrics = overview_df[metric_cols].apply(pd.to_numeric, errors="coerce")
# Calculate mean across columns, ignoring NaN values
avg_scores = numeric_metrics.mean(axis=1)
# Sort by average score (ascending for MAE metrics)
overview_df = overview_df.loc[avg_scores.sort_values().index]
return overview_df
def create_submission_interface() -> Tuple[gr.components.Component, ...]:
"""
Create the submission interface components.
"""
with gr.Accordion("Submit predictions"):
with gr.Row():
gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text")
with gr.Row():
with gr.Column():
model_name_textbox = gr.Textbox(label="Model name")
energy_conserving = gr.Checkbox(
label="Is the model energy conserving? (i.e. F= -dE/dx)"
)
model_url = gr.Textbox(label="Model/Checkpoint URL")
paper_url = gr.Textbox(label="Paper URL")
dataset = gr.Dropdown(
choices=["OMol-All", "OMol-4M", "UMA-459M", "Other"],
label="Training set",
interactive=True,
)
additional_info = gr.Textbox(
label="Additional info (cutoff radius, # of params, etc.)"
)
organization = gr.Textbox(label="Organization")
mail = gr.Textbox(
label="Contact email (will be stored privately, & used if there is an issue with your submission)"
)
with gr.Column():
file_output = gr.File()
with gr.Row():
eval_type = gr.Dropdown(
choices=ALL_EVAL_TYPES,
label="Eval Type",
interactive=True,
)
with gr.Column():
gr.LoginButton()
with gr.Column():
submit_button = gr.Button("Submit Eval")
submission_result = gr.Textbox(label="Status")
return (
submit_button,
file_output,
eval_type,
organization,
model_name_textbox,
model_url,
paper_url,
energy_conserving,
mail,
dataset,
additional_info,
submission_result,
)
def create_interface() -> gr.Blocks:
"""
Create the complete Gradio interface.
"""
# Load data
_, results_dfs = leaderboard_data.load_eval_data()
demo = gr.Blocks()
with demo:
gr.HTML(TITLE)
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text")
# Citation section
with gr.Row():
with gr.Accordion("📙 Citation", open=False):
gr.Markdown(CITATION_BUTTON_LABEL)
gr.Markdown(CITATION_BUTTON_TEXT)
# Evaluation results
gr.Markdown("## Evaluations", elem_classes="markdown-text")
with gr.Row():
create_evaluation_tabs(results_dfs)
# S2EF Results tabs
gr.Markdown("## S2EF", elem_classes="markdown-text")
with gr.Tab("Test"):
create_s2ef_tabs("Test", results_dfs)
with gr.Tab("Validation"):
create_s2ef_tabs("Validation", results_dfs)
(
submit_button,
file_output,
eval_type,
organization,
model_name_textbox,
model_url,
paper_url,
energy_conserving,
mail,
dataset,
additional_info,
submission_result,
) = create_submission_interface()
submit_button.click(
add_new_eval,
[
file_output,
eval_type,
organization,
model_name_textbox,
model_url,
paper_url,
energy_conserving,
mail,
dataset,
additional_info,
],
submission_result,
)
return demo
def restart_space():
api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN)
def main():
demo = create_interface()
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=3600)
scheduler.start()
# Launch the demo
demo.launch(debug=True)
if __name__ == "__main__":
main()