import datetime import json import os import tempfile from email.utils import parseaddr from typing import Dict, List, Tuple, Optional import gradio as gr import numpy as np import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from datasets import VerificationMode, load_dataset, Dataset from huggingface_hub import HfApi, snapshot_download from content import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, INTRODUCTION_TEXT, SUBMISSION_TEXT, PRE_COLUMN_NAMES, POST_COLUMN_NAMES, TITLE, TYPES, model_hyperlink, ) from evaluator import evaluate # Configuration constants TOKEN = os.environ.get("TOKEN", None) OWNER = "facebook" # Dataset repositories INTERNAL_DATA_DATASET = f"{OWNER}/fairchem_internal" SUBMISSION_DATASET = f"{OWNER}/fairchem_leaderboard_submissions" RESULTS_DATASET = f"{OWNER}/fairchem_leaderboard_results" CONTACT_DATASET = f"{OWNER}/fairchem_leaderboard_contact_info_internal" LEADERBOARD_PATH = f"{OWNER}/fairchem_leaderboard" # Initialize HuggingFace API api = HfApi() # S2EF subsplits for validation and test data S2EF_SUBSPLITS = [ "all", "biomolecules", "electrolytes", "metal_complexes", "neutral_organics", ] # Evaluation types that are not S2EF OTHER_EVAL_TYPES = [ "Ligand pocket", "Ligand strain", "Conformers", "Protonation", "IE_EA", "Distance scaling", "Spin gap", ] # All evaluation types for the dropdown ALL_EVAL_TYPES = ["Validation", "Test"] + OTHER_EVAL_TYPES class LeaderboardData: """ Manages leaderboard data loading and processing. """ def __init__(self): self._setup_data_paths() self._load_contact_info() def _setup_data_paths(self): """ Setup target and result file paths. """ target_data_dir = snapshot_download( repo_id=INTERNAL_DATA_DATASET, repo_type="dataset", token=TOKEN, ) self.target_paths = { "Validation": f"{target_data_dir}/omol_val_labels.npz", "Test": f"{target_data_dir}/omol_test_labels.npz", "Distance Scaling": f"{target_data_dir}/distance_scaling_labels.json", "Ligand pocket": f"{target_data_dir}/ligand_pocket_labels.json", "Ligand strain": f"{target_data_dir}/ligand_strain_labels.json", "Conformers": f"{target_data_dir}/geom_conformers_labels.json", "Protonation": f"{target_data_dir}/protonation_energies_labels.json", "IE_EA": f"{target_data_dir}/unoptimized_ie_ea_labels.json", "Distance scaling": f"{target_data_dir}/distance_scaling_labels.json", "Spin gap": f"{target_data_dir}/unoptimized_spin_gap_labels.json", } self.result_paths = { "Validation": "validation_s2ef.parquet", "Test": "test_s2ef.parquet", "Ligand pocket": "ligand_pocket.parquet", "Ligand strain": "ligand_strain.parquet", "Conformers": "geom_conformers.parquet", "Protonation": "protonation.parquet", "IE_EA": "ie_ea.parquet", "Distance scaling": "distance_scaling.parquet", "Spin gap": "spin_gap.parquet", } def _load_contact_info(self): """ Load contact information dataset. """ self.contact_infos = load_dataset( CONTACT_DATASET, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, ) def load_eval_data(self) -> Tuple[Dict, Dict[str, pd.DataFrame]]: """ Load all evaluation data and return results and dataframes. """ # Load S2EF results s2ef_results = load_dataset( RESULTS_DATASET, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, data_files={ "Validation": os.path.join("data", self.result_paths["Validation"]), "Test": os.path.join("data", self.result_paths["Test"]), }, ) eval_results = dict(s2ef_results) # Load other evaluation types for eval_type in OTHER_EVAL_TYPES: eval_type_data = load_dataset( RESULTS_DATASET, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, data_files={"data": os.path.join("data", self.result_paths[eval_type])}, ) eval_results[eval_type] = eval_type_data["data"] # Generate result dataframes results_dfs = {} # S2EF dataframes for split in ["Validation", "Test"]: for subsplit in S2EF_SUBSPLITS: df_key = f"{split}_{subsplit}" results_dfs[df_key] = self._get_s2ef_df_from_results( eval_results, split, subsplit ) # Other evaluation dataframes for split in OTHER_EVAL_TYPES: results_dfs[split] = self._get_eval_df_from_results(eval_results, split) return eval_results, results_dfs def _get_s2ef_df_from_results( self, eval_results: Dict, split: str, subsplit: str ) -> pd.DataFrame: """ Generate S2EF dataframe from evaluation results. """ local_df = eval_results[split] local_df = local_df.map( lambda row: {"Model": model_hyperlink(row["url"], row["Model"])} ) filtered_columns = ( PRE_COLUMN_NAMES + [f"{subsplit}_energy_mae", f"{subsplit}_forces_mae"] + POST_COLUMN_NAMES ) df = pd.DataFrame(local_df) avail_columns = list(df.columns) missing_columns = list(set(filtered_columns) - set(avail_columns)) df[missing_columns] = "-" df = df[filtered_columns].round(4) # Unit conversion for col in df.columns: if "mae" in col.lower(): df[col] = (df[col] * 1000).round(2) df = df.sort_values(by=[f"{subsplit}_energy_mae"], ascending=True) df[f"{subsplit}_energy_mae"] = df[f"{subsplit}_energy_mae"] df[f"{subsplit}_forces_mae"] = df[f"{subsplit}_forces_mae"] df = df.rename( columns={ f"{subsplit}_energy_mae": "Energy MAE [meV]", f"{subsplit}_forces_mae": "Forces MAE [meV/Å]", } ) return df def _get_eval_df_from_results(self, eval_results: Dict, split: str) -> pd.DataFrame: """ Generate evaluation dataframe from results. """ local_df = eval_results[split] local_df = local_df.map( lambda row: {"Model": model_hyperlink(row["url"], row["Model"])} ) eval_columns = LEADERBOARD_COLUMNS[split] filtered_columns = PRE_COLUMN_NAMES + eval_columns + POST_COLUMN_NAMES df = pd.DataFrame(local_df) avail_columns = list(df.columns) missing_columns = list(set(filtered_columns) - set(avail_columns)) df[missing_columns] = "-" df = df[filtered_columns].round(4) # Unit conversion for col in df.columns: if "mae" in col.lower(): df[col] = (df[col] * 1000).round(2) df = df.sort_values(by=[eval_columns[0]], ascending=True) df = df.rename(columns=COLUMN_MAPPING) return df leaderboard_data = LeaderboardData() # Column configurations for different evaluation types LEADERBOARD_COLUMNS = { "Ligand pocket": ["interaction_energy_mae", "interaction_forces_mae"], "Ligand strain": ["strain_energy_mae", "global_min_rmsd"], "Conformers": ["deltaE_mae", "ensemble_rmsd"], "Protonation": ["deltaE_mae", "rmsd"], "IE_EA": ["deltaE_mae", "deltaF_mae"], "Distance scaling": ["lr_ddE_mae", "lr_ddF_mae", "sr_ddE_mae", "sr_ddF_mae"], "Spin gap": ["deltaE_mae", "deltaF_mae"], } COLUMN_MAPPING = { "interaction_energy_mae": "Ixn Energy MAE [meV]", "interaction_forces_mae": "Ixn Forces MAE [meV/Å]", "strain_energy_mae": "Strain Energy MAE [meV]", "deltaE_mae": "\u0394Energy MAE [meV]", "deltaF_mae": "\u0394Forces MAE [meV/Å]", "ensemble_rmsd": "RMSD [Å]", "global_min_rmsd": "RMSD [Å]", "rmsd": "RMSD [Å]", "lr_ddE_mae": "\u0394Energy (LR) MAE [meV]", "lr_ddF_mae": "\u0394Forces (LR) MAE [meV/Å]", "sr_ddE_mae": "\u0394Energy (SR) MAE [meV]", "sr_ddF_mae": "\u0394Forces (SR) MAE [meV/Å]", } def add_new_eval( path_to_file: str, eval_type: str, organization: str, model: str, url: str, mail: str, training_set: str, additional_info: str, profile: gr.OAuthProfile, ) -> str: """Add a new evaluation to the leaderboard.""" print(f"Adding new eval of type: {eval_type}") try: # Validate email address _, parsed_mail = parseaddr(mail) if "@" not in parsed_mail: yield "⚠️ Please provide a valid email address." return # Check monthly submission limit (5 submissions per month) contact_key = eval_type.replace(" ", "_") user_submission_dates = sorted( row["date"] for row in leaderboard_data.contact_infos.get(contact_key, []) if row["username"] == profile.username ) current_month = datetime.datetime.now().strftime("%Y-%m") current_month_submissions = [ date for date in user_submission_dates if date.startswith(current_month) ] if len(current_month_submissions) >= 5: yield f"⚠️ You have reached the monthly submission limit of 5 submissions. Please try again next month." return # Validate file submission if path_to_file is None: yield "⚠️ Please upload a file." return if not (path_to_file.endswith(".npz") or path_to_file.endswith(".json")): yield "⚠️ Please submit a valid npz or json file" return # Evaluate the submission yield "⚙️ Evaluating your submission..." metrics = evaluate( leaderboard_data.target_paths[eval_type], path_to_file, eval_type, ) submission_time = datetime.datetime.today().strftime("%Y-%m-%d-%H:%M") # Upload submission file yield "☁️ Uploading submission file..." api.upload_file( repo_id=SUBMISSION_DATASET, path_or_fileobj=path_to_file, path_in_repo=f"{organization}/{model}/submissions/{training_set}/{eval_type}_{submission_time}_{os.path.basename(path_to_file)}", repo_type="dataset", token=TOKEN, ) # Update leaderboard data yield "📋 Updating leaderboard data..." eval_results, _ = leaderboard_data.load_eval_data() eval_entry = { "Model": model, "Organization": organization, "Submission date": submission_time, "Training Set": training_set, "Notes": additional_info, "url": url, } eval_entry.update(metrics) if eval_type not in eval_results: eval_results[eval_type] = Dataset.from_dict( {k: [v] for k, v in eval_entry.items()} ) else: eval_results[eval_type] = eval_results[eval_type].add_item(eval_entry) data_file_name = leaderboard_data.result_paths[eval_type] # Upload results yield "💾 Saving results to database..." with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp_file: eval_results[eval_type].to_parquet(tmp_file.name) api.upload_file( repo_id=RESULTS_DATASET, path_or_fileobj=tmp_file.name, path_in_repo=f"data/{data_file_name}", repo_type="dataset", token=TOKEN, ) # Save contact information contact_info = { "model": model, "organization": organization, "username": profile.username, "email": mail, "date": submission_time, } if contact_key not in leaderboard_data.contact_infos: leaderboard_data.contact_infos[contact_key] = Dataset.from_dict( {k: [v] for k, v in contact_info.items()} ) else: leaderboard_data.contact_infos[contact_key] = ( leaderboard_data.contact_infos[contact_key].add_item(contact_info) ) leaderboard_data.contact_infos.push_to_hub(CONTACT_DATASET, token=TOKEN) success_str = f"✅ Model {model} is successfully evaluated and stored in our database.\nPlease wait an hour and refresh the leaderboard to see your results displayed." yield success_str except Exception as e: print(f"Error during submission: {e}") yield ( f"An error occurred, please open a discussion and indicate at what time you encountered the error.\n{e}" ) def create_dataframe_tab( tab_name: str, df: pd.DataFrame, datatype: List[str] = None ) -> gr.Tab: """ Create a tab with a dataframe. """ if datatype is None: datatype = TYPES with gr.Tab(tab_name) as tab: gr.Dataframe( value=df, datatype=datatype, interactive=False, column_widths=["20%"], ) return tab def create_s2ef_tabs(split: str, results_dfs: Dict[str, pd.DataFrame]) -> None: """ Create S2EF tabs for a given split (Validation/Test). """ subsplit_names = { "all": "All", "biomolecules": "Biomolecules", "electrolytes": "Electrolytes", "metal_complexes": "Metal Complexes", "neutral_organics": "Neutral Organics", } for subsplit, display_name in subsplit_names.items(): df_key = f"{split}_{subsplit}" create_dataframe_tab(display_name, results_dfs[df_key]) def create_evaluation_tabs(results_dfs: Dict[str, pd.DataFrame]) -> None: """ Create evaluation tabs for non-S2EF evaluations. """ eval_datatype = ["markdown", "markdown", "number", "str"] for eval_type in OTHER_EVAL_TYPES: display_name = "IE/EA" if eval_type == "IE_EA" else eval_type create_dataframe_tab(display_name, results_dfs[eval_type], eval_datatype) def create_submission_interface() -> Tuple[gr.components.Component, ...]: """ Create the submission interface components. """ with gr.Accordion("Submit predictions"): with gr.Row(): gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text") with gr.Row(): with gr.Column(): model_name_textbox = gr.Textbox(label="Model name") model_url = gr.Textbox(label="Model/Paper URL") dataset = gr.Dropdown( choices=["OMol-All", "OMol-4M", "UMA-459M", "Other"], label="Training set", interactive=True, ) additional_info = gr.Textbox( label="Additional info (cutoff radius, # of params, etc.)" ) organization = gr.Textbox(label="Organization") mail = gr.Textbox( label="Contact email (will be stored privately, & used if there is an issue with your submission)" ) with gr.Column(): file_output = gr.File() with gr.Row(): eval_type = gr.Dropdown( choices=ALL_EVAL_TYPES, label="Eval Type", interactive=True, ) with gr.Column(): gr.LoginButton() with gr.Column(): submit_button = gr.Button("Submit Eval") submission_result = gr.Textbox(label="Status") return ( submit_button, file_output, eval_type, organization, model_name_textbox, model_url, mail, dataset, additional_info, submission_result, ) def create_interface() -> gr.Blocks: """ Create the complete Gradio interface. """ # Load data _, results_dfs = leaderboard_data.load_eval_data() demo = gr.Blocks() with demo: gr.HTML(TITLE) gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") # Citation section with gr.Row(): with gr.Accordion("📙 Citation", open=False): gr.Markdown(CITATION_BUTTON_LABEL) gr.Markdown(CITATION_BUTTON_TEXT) # S2EF Results tabs with gr.Tab("Test"): create_s2ef_tabs("Test", results_dfs) with gr.Tab("Validation"): create_s2ef_tabs("Validation", results_dfs) # Evaluation results gr.Markdown("## Evaluations", elem_classes="markdown-text") with gr.Row(): create_evaluation_tabs(results_dfs) ( submit_button, file_output, eval_type, organization, model_name_textbox, model_url, mail, dataset, additional_info, submission_result, ) = create_submission_interface() submit_button.click( add_new_eval, [ file_output, eval_type, organization, model_name_textbox, model_url, mail, dataset, additional_info, ], submission_result, ) return demo def restart_space(): api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN) def main(): demo = create_interface() scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=3600) scheduler.start() # Launch the demo demo.launch(debug=True, share=True) if __name__ == "__main__": main()