import gradio as gr import pandas as pd from pathlib import Path from typing import Optional from about import ( ENDPOINTS, API, METRICS, submissions_repo, results_repo_test, results_repo_validation, test_repo, THROTTLE_MINUTES ) from utils import bootstrap_metrics, clip_and_log_transform, fetch_dataset_df from huggingface_hub import hf_hub_download import datetime import io import json, tempfile import re from pydantic import ( BaseModel, Field, model_validator, field_validator, ValidationError ) from loguru import logger HF_USERNAME_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-_]{1,38})$") def _safeify_username(username: str) -> str: return str(username.strip()).replace("/", "_").replace(" ", "_") def _unsafify_username(username: str) -> str: return str(username.strip()).replace("/", "_").replace(" ", "_") def _check_required_columns(df: pd.DataFrame, name: str, cols: list[str]): missing = [c for c in cols if c not in df.columns] if missing: raise ValueError(f"{name} is missing required columns: {missing}") class ParticipantRecord(BaseModel): hf_username: str = Field(description="Hugging Face username") display_name: Optional[str] = Field(description="Name to display on leaderboard") participant_name: Optional[str] = Field(default=None, description="Participant's real name") discord_username: Optional[str] = Field(default=None, description="Discord username") email: Optional[str] = Field(default=None, description="Email address") affiliation: Optional[str] = Field(default=None, description="Affiliation") model_tag: Optional[str] = Field(default=None, description="Link to model description") anonymous: bool = Field(default=False, description="Whether to display username as 'anonymous'") consent_publication: bool = Field(default=False, description="Consent to be included in publications") @field_validator("hf_username") @classmethod def validate_hf_username(cls, v: str) -> str: v = v.strip() if not HF_USERNAME_RE.match(v): raise gr.Error("Invalid Hugging Face username (letters, numbers, -, _; min 2, max ~39).") return v @field_validator("display_name") @classmethod def validate_display_name(cls, v: Optional[str]) -> Optional[str]: if v is None: return None v = v.strip() if not v: return None if len(v) > 20: raise ValueError("Display name is too long (max 20 chars).") return v @field_validator("model_tag", mode="before") @classmethod def normalize_url(cls, v): if v is None: return v s = str(v).strip() if not s: return None if "://" not in s: s = "https://" + s return s @model_validator(mode="after") def require_display_name_if_anonymous(self) -> "ParticipantRecord": if self.anonymous and not self.display_name: raise ValueError("Alias is required when anonymous box is checked.") return self class SubmissionMetadata(BaseModel): submission_time_utc: str user: str original_filename: str evaluated: bool participant: ParticipantRecord def submit_data(predictions_file: str, user_state, participant_name: str = "", discord_username: str = "", email: str = "", affiliation: str = "", model_tag: str = "", user_display: str = "", anon_checkbox: bool = False, paper_checkbox: bool = False ): if user_state is None: raise gr.Error("Username or alias is required for submission.") # check the last time the user submitted data = fetch_dataset_df() if not data[data['user'] == user_state].empty: last_time = data[data['user'] == user_state]['submission time'].max() delta = datetime.datetime.now(datetime.timezone.utc) - last_time.to_pydatetime() if delta < datetime.timedelta(minutes=THROTTLE_MINUTES): raise gr.Error(f"You have submitted within the last {THROTTLE_MINUTES} minutes. Please wait {THROTTLE_MINUTES - int(delta.total_seconds() // 60)} minutes before submitting again.") file_path = Path(predictions_file).resolve() if not file_path.exists(): raise gr.Error("Uploaded file object does not have a valid file path.") # Read results file try: results_df = pd.read_csv(file_path) except Exception as e: raise gr.Error(f"❌ Error reading results file: {str(e)}") if results_df.empty: raise gr.Error("The uploaded file is empty.") missing = set(ENDPOINTS) - set(results_df.columns) if missing: raise gr.Error(f"The uploaded file must contain all endpoint predictions {ENDPOINTS} as columns, missing: {missing}") # Save participant record try: participant_record = ParticipantRecord( hf_username=user_state, participant_name=participant_name, discord_username=discord_username, email=email, affiliation=affiliation, model_tag=model_tag, display_name=user_display, anonymous=anon_checkbox, consent_publication=paper_checkbox ) except ValidationError as e: raise gr.Error(f"❌ Error in participant information: {str(e)}") # Build destination filename in the dataset ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") # should keep default time so can be deserialized correctly try: meta = SubmissionMetadata( submission_time_utc=ts, user=user_state, original_filename=file_path.name, evaluated=False, participant=participant_record ) except ValidationError as e: raise gr.Error(f"❌ Error in metadata information: {str(e)}") safe_user = _safeify_username(user_state) destination_csv = f"submissions/{safe_user}_{ts}.csv" destination_json = destination_csv.replace(".csv", ".json") # Upload the CSV file API.upload_file( path_or_fileobj=str(file_path), path_in_repo=destination_csv, repo_id=submissions_repo, repo_type="dataset", commit_message=f"Add submission for user at {ts}" ) # Upload the metadata JSON file meta_bytes = io.BytesIO(json.dumps(meta.model_dump(), indent=2).encode("utf-8")) API.upload_file( path_or_fileobj=meta_bytes, path_in_repo=destination_json, repo_id=submissions_repo, repo_type="dataset", commit_message=f"Add metadata for user submission at {ts}" ) return "✅ Your submission has been received! Your scores will appear on the leaderboard shortly.", destination_csv def evaluate_data(filename: str) -> None: # do test set first as a more stringent check of the submission w.r.t matching molecules logger.info(f"Evaluating submission file {filename}") # evaluate on the test set _evaluate_data(filename, test_repo=test_repo, split_filename="data/expansion_data_test.csv", results_repo=results_repo_test) # evaluate on the validation set _evaluate_data(filename, test_repo=test_repo, split_filename="data/expansion_data_test_validation.csv", results_repo=results_repo_validation) logger.info(f"Finished evaluating submission file {filename}") def _evaluate_data(filename: str, test_repo: str, split_filename: str, results_repo: str) -> None: # Load the submission csv try: local_path = hf_hub_download( repo_id=submissions_repo, repo_type="dataset", filename=filename, ) except Exception as e: raise gr.Error(f"Failed to download submission file: {e}") # Load the test set try: test_path = hf_hub_download( repo_id=test_repo, repo_type="dataset", filename=split_filename ) except Exception as e: raise gr.Error(f"Failed to download test file: {e}") data_df = pd.read_csv(local_path) test_df = pd.read_csv(test_path) try: results_df = calculate_metrics(data_df, test_df) if not isinstance(results_df, pd.DataFrame) or results_df.empty: raise gr.Error("Evaluation produced no results.") except Exception as e: raise gr.Error(f'Evaluation failed: {e}. No results written to results dataset.') # Load metadata file meta_filename = filename.replace(".csv", ".json") try: meta_path = hf_hub_download( repo_id=submissions_repo, repo_type="dataset", filename=meta_filename, ) with open(meta_path, "r", encoding="utf-8") as f: _meta = json.load(f) meta = SubmissionMetadata(**_meta) username = meta.participant.hf_username timestamp = meta.submission_time_utc report = meta.participant.model_tag if meta.participant.anonymous: display_name = meta.participant.display_name else: display_name = username except Exception as e: raise gr.Error(f"Failed to load metadata file: {e}. No results written to results dataset.") # Write results to results dataset results_df['user'] = display_name results_df['submission_time'] = timestamp results_df['model_report'] = report results_df['anonymous'] = meta.participant.anonymous safe_user = _unsafify_username(username) destination_path = f"results/{safe_user}_{timestamp}_results.csv" tmp_name = None with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as tmp: results_df.to_csv(tmp, index=False) tmp.flush() tmp_name = tmp.name API.upload_file( path_or_fileobj=tmp_name, path_in_repo=destination_path, repo_id=results_repo, repo_type="dataset", commit_message=f"Add result data for {username}" ) Path(tmp_name).unlink() def calculate_metrics( results_dataframe: pd.DataFrame, test_dataframe: pd.DataFrame ): import numpy as np # Do some checks # 1) Check all columns are present _check_required_columns(results_dataframe, "Results file", ["Molecule Name"] + ENDPOINTS) _check_required_columns(test_dataframe, "Test file", ["Molecule Name"] + ENDPOINTS) # 2) Check all Molecules in the test set are present in the predictions if not (test_dataframe['Molecule Name']).isin(results_dataframe['Molecule Name']).all(): raise gr.Error("Some molecules in the test set are missing from the predictions file. Please ensure all molecules are included.") # 3) check no duplicated molecules in the predictions file if results_dataframe['Molecule Name'].duplicated().any(): raise gr.Error("The predictions file contains duplicated molecules. Please ensure each molecule is only listed once.") # 4) Merge dataframes to ensure alignment merged_df = results_dataframe.merge( test_dataframe, on="Molecule Name", suffixes=('_pred', '_true'), how="inner" ) merged_df = merged_df.sort_values("Molecule Name") # 5) loop over endpoints final_cols = ["MAE", "RAE", "R2", "Spearman R", "Kendall's Tau"] all_endpoint_results = [] for ept in ENDPOINTS: pred_col = f"{ept}_pred" true_col = f"{ept}_true" # cast to numeric, coerce errors to NaN merged_df[pred_col] = pd.to_numeric(merged_df[pred_col], errors="coerce") merged_df[true_col] = pd.to_numeric(merged_df[true_col], errors="coerce") if merged_df[pred_col].isnull().all(): raise gr.Error(f"All predictions are missing for endpoint {ept}. Please provide valid predictions.") # subset and drop NaNs subset = merged_df[[pred_col, true_col]].dropna() if subset.empty: raise gr.Error(f"No valid data available for endpoint {ept} after removing NaNs.") # extract numpy arrays y_pred = subset[pred_col].to_numpy() y_true = subset[true_col].to_numpy() # apply log10 + 1 transform except for logD if ept.lower() not in ['logd']: y_true_log = clip_and_log_transform(y_true) y_pred_log = clip_and_log_transform(y_pred) else: y_true_log = y_true y_pred_log = y_pred # calculate metrics with bootstrapping bootstrap_df = bootstrap_metrics(y_pred_log, y_true_log, ept, n_bootstrap_samples=1000) # Longer pivot alternative for the cases where all metric results are NaN, as pivot ignores those columns grouped = bootstrap_df.groupby(["Endpoint", "Metric"])["Value"].agg(["mean", "std"]) df_unstacked = grouped.unstack(level="Metric") df_reindexed = df_unstacked.reindex(columns=list(METRICS), level=1) df_reindexed.columns = [f"{agg}_{metric}" for agg, metric in df_reindexed.columns] df_endpoint = df_reindexed.reset_index() all_endpoint_results.append(df_endpoint) df_results = pd.concat(all_endpoint_results, ignore_index=True) mean_cols = [f'mean_{m}' for m in final_cols] std_cols = [f'std_{m}' for m in final_cols] # Average results macro_means = df_results[mean_cols].mean() macro_stds = df_results[std_cols].mean() avg_row = {"Endpoint": "Average"} avg_row.update(macro_means.to_dict()) avg_row.update(macro_stds.to_dict()) df_with_average = pd.concat([df_results, pd.DataFrame([avg_row])], ignore_index=True) # Fix order of columns df_with_average = df_with_average[["Endpoint"]+mean_cols+std_cols] return df_with_average