|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
from pathlib import Path |
|
|
from typing import Optional |
|
|
from about import ( |
|
|
ENDPOINTS, API, METRICS, |
|
|
submissions_repo, |
|
|
results_repo_test, |
|
|
results_repo_validation, |
|
|
test_repo, |
|
|
THROTTLE_MINUTES |
|
|
) |
|
|
from utils import bootstrap_metrics, clip_and_log_transform, fetch_dataset_df |
|
|
from huggingface_hub import hf_hub_download |
|
|
import datetime |
|
|
import io |
|
|
import json, tempfile |
|
|
import re |
|
|
from pydantic import ( |
|
|
BaseModel, |
|
|
Field, |
|
|
model_validator, |
|
|
field_validator, |
|
|
ValidationError |
|
|
) |
|
|
from loguru import logger |
|
|
|
|
|
HF_USERNAME_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-_]{1,38})$") |
|
|
def _safeify_username(username: str) -> str: |
|
|
return str(username.strip()).replace("/", "_").replace(" ", "_") |
|
|
|
|
|
def _unsafify_username(username: str) -> str: |
|
|
return str(username.strip()).replace("/", "_").replace(" ", "_") |
|
|
|
|
|
def _check_required_columns(df: pd.DataFrame, name: str, cols: list[str]): |
|
|
missing = [c for c in cols if c not in df.columns] |
|
|
if missing: |
|
|
raise ValueError(f"{name} is missing required columns: {missing}") |
|
|
|
|
|
class ParticipantRecord(BaseModel): |
|
|
hf_username: str = Field(description="Hugging Face username") |
|
|
display_name: Optional[str] = Field(description="Name to display on leaderboard") |
|
|
participant_name: Optional[str] = Field(default=None, description="Participant's real name") |
|
|
discord_username: Optional[str] = Field(default=None, description="Discord username") |
|
|
email: Optional[str] = Field(default=None, description="Email address") |
|
|
affiliation: Optional[str] = Field(default=None, description="Affiliation") |
|
|
model_tag: Optional[str] = Field(default=None, description="Link to model description") |
|
|
anonymous: bool = Field(default=False, description="Whether to display username as 'anonymous'") |
|
|
consent_publication: bool = Field(default=False, description="Consent to be included in publications") |
|
|
|
|
|
@field_validator("hf_username") |
|
|
@classmethod |
|
|
def validate_hf_username(cls, v: str) -> str: |
|
|
v = v.strip() |
|
|
if not HF_USERNAME_RE.match(v): |
|
|
raise gr.Error("Invalid Hugging Face username (letters, numbers, -, _; min 2, max ~39).") |
|
|
return v |
|
|
|
|
|
@field_validator("display_name") |
|
|
@classmethod |
|
|
def validate_display_name(cls, v: Optional[str]) -> Optional[str]: |
|
|
if v is None: |
|
|
return None |
|
|
v = v.strip() |
|
|
if not v: |
|
|
return None |
|
|
if len(v) > 20: |
|
|
raise ValueError("Display name is too long (max 20 chars).") |
|
|
return v |
|
|
|
|
|
@field_validator("model_tag", mode="before") |
|
|
@classmethod |
|
|
def normalize_url(cls, v): |
|
|
if v is None: |
|
|
return v |
|
|
s = str(v).strip() |
|
|
if not s: |
|
|
return None |
|
|
if "://" not in s: |
|
|
s = "https://" + s |
|
|
return s |
|
|
|
|
|
@model_validator(mode="after") |
|
|
def require_display_name_if_anonymous(self) -> "ParticipantRecord": |
|
|
if self.anonymous and not self.display_name: |
|
|
raise ValueError("Alias is required when anonymous box is checked.") |
|
|
return self |
|
|
|
|
|
class SubmissionMetadata(BaseModel): |
|
|
submission_time_utc: str |
|
|
user: str |
|
|
original_filename: str |
|
|
evaluated: bool |
|
|
participant: ParticipantRecord |
|
|
|
|
|
|
|
|
def submit_data(predictions_file: str, |
|
|
user_state, |
|
|
participant_name: str = "", |
|
|
discord_username: str = "", |
|
|
email: str = "", |
|
|
affiliation: str = "", |
|
|
model_tag: str = "", |
|
|
user_display: str = "", |
|
|
anon_checkbox: bool = False, |
|
|
paper_checkbox: bool = False |
|
|
): |
|
|
|
|
|
if user_state is None: |
|
|
raise gr.Error("Username or alias is required for submission.") |
|
|
|
|
|
|
|
|
|
|
|
data = fetch_dataset_df() |
|
|
if not data[data['user'] == user_state].empty: |
|
|
last_time = data[data['user'] == user_state]['submission time'].max() |
|
|
delta = datetime.datetime.now(datetime.timezone.utc) - last_time.to_pydatetime() |
|
|
if delta < datetime.timedelta(minutes=THROTTLE_MINUTES): |
|
|
raise gr.Error(f"You have submitted within the last {THROTTLE_MINUTES} minutes. Please wait {THROTTLE_MINUTES - int(delta.total_seconds() // 60)} minutes before submitting again.") |
|
|
|
|
|
|
|
|
file_path = Path(predictions_file).resolve() |
|
|
if not file_path.exists(): |
|
|
raise gr.Error("Uploaded file object does not have a valid file path.") |
|
|
|
|
|
|
|
|
try: |
|
|
results_df = pd.read_csv(file_path) |
|
|
except Exception as e: |
|
|
raise gr.Error(f"❌ Error reading results file: {str(e)}") |
|
|
|
|
|
if results_df.empty: |
|
|
raise gr.Error("The uploaded file is empty.") |
|
|
|
|
|
missing = set(ENDPOINTS) - set(results_df.columns) |
|
|
if missing: |
|
|
raise gr.Error(f"The uploaded file must contain all endpoint predictions {ENDPOINTS} as columns, missing: {missing}") |
|
|
|
|
|
|
|
|
try: |
|
|
participant_record = ParticipantRecord( |
|
|
hf_username=user_state, |
|
|
participant_name=participant_name, |
|
|
discord_username=discord_username, |
|
|
email=email, |
|
|
affiliation=affiliation, |
|
|
model_tag=model_tag, |
|
|
display_name=user_display, |
|
|
anonymous=anon_checkbox, |
|
|
consent_publication=paper_checkbox |
|
|
) |
|
|
except ValidationError as e: |
|
|
raise gr.Error(f"❌ Error in participant information: {str(e)}") |
|
|
|
|
|
|
|
|
ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") |
|
|
try: |
|
|
meta = SubmissionMetadata( |
|
|
submission_time_utc=ts, |
|
|
user=user_state, |
|
|
original_filename=file_path.name, |
|
|
evaluated=False, |
|
|
participant=participant_record |
|
|
) |
|
|
except ValidationError as e: |
|
|
raise gr.Error(f"❌ Error in metadata information: {str(e)}") |
|
|
|
|
|
safe_user = _safeify_username(user_state) |
|
|
destination_csv = f"submissions/{safe_user}_{ts}.csv" |
|
|
destination_json = destination_csv.replace(".csv", ".json") |
|
|
|
|
|
|
|
|
API.upload_file( |
|
|
path_or_fileobj=str(file_path), |
|
|
path_in_repo=destination_csv, |
|
|
repo_id=submissions_repo, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Add submission for user at {ts}" |
|
|
) |
|
|
|
|
|
meta_bytes = io.BytesIO(json.dumps(meta.model_dump(), indent=2).encode("utf-8")) |
|
|
API.upload_file( |
|
|
path_or_fileobj=meta_bytes, |
|
|
path_in_repo=destination_json, |
|
|
repo_id=submissions_repo, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Add metadata for user submission at {ts}" |
|
|
) |
|
|
|
|
|
return "✅ Your submission has been received! Your scores will appear on the leaderboard shortly.", destination_csv |
|
|
|
|
|
def evaluate_data(filename: str) -> None: |
|
|
|
|
|
logger.info(f"Evaluating submission file {filename}") |
|
|
|
|
|
_evaluate_data(filename, test_repo=test_repo, split_filename="data/expansion_data_test.csv", results_repo=results_repo_test) |
|
|
|
|
|
_evaluate_data(filename, test_repo=test_repo, split_filename="data/expansion_data_test_validation.csv", results_repo=results_repo_validation) |
|
|
logger.info(f"Finished evaluating submission file {filename}") |
|
|
|
|
|
def _evaluate_data(filename: str, test_repo: str, split_filename: str, results_repo: str) -> None: |
|
|
|
|
|
|
|
|
try: |
|
|
local_path = hf_hub_download( |
|
|
repo_id=submissions_repo, |
|
|
repo_type="dataset", |
|
|
filename=filename, |
|
|
) |
|
|
except Exception as e: |
|
|
raise gr.Error(f"Failed to download submission file: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
test_path = hf_hub_download( |
|
|
repo_id=test_repo, |
|
|
repo_type="dataset", |
|
|
filename=split_filename |
|
|
) |
|
|
except Exception as e: |
|
|
raise gr.Error(f"Failed to download test file: {e}") |
|
|
|
|
|
data_df = pd.read_csv(local_path) |
|
|
test_df = pd.read_csv(test_path) |
|
|
try: |
|
|
results_df = calculate_metrics(data_df, test_df) |
|
|
if not isinstance(results_df, pd.DataFrame) or results_df.empty: |
|
|
raise gr.Error("Evaluation produced no results.") |
|
|
except Exception as e: |
|
|
raise gr.Error(f'Evaluation failed: {e}. No results written to results dataset.') |
|
|
|
|
|
|
|
|
meta_filename = filename.replace(".csv", ".json") |
|
|
try: |
|
|
meta_path = hf_hub_download( |
|
|
repo_id=submissions_repo, |
|
|
repo_type="dataset", |
|
|
filename=meta_filename, |
|
|
) |
|
|
with open(meta_path, "r", encoding="utf-8") as f: |
|
|
_meta = json.load(f) |
|
|
meta = SubmissionMetadata(**_meta) |
|
|
username = meta.participant.hf_username |
|
|
timestamp = meta.submission_time_utc |
|
|
report = meta.participant.model_tag |
|
|
if meta.participant.anonymous: |
|
|
display_name = meta.participant.display_name |
|
|
else: |
|
|
display_name = username |
|
|
except Exception as e: |
|
|
raise gr.Error(f"Failed to load metadata file: {e}. No results written to results dataset.") |
|
|
|
|
|
|
|
|
results_df['user'] = display_name |
|
|
results_df['submission_time'] = timestamp |
|
|
results_df['model_report'] = report |
|
|
results_df['anonymous'] = meta.participant.anonymous |
|
|
safe_user = _unsafify_username(username) |
|
|
destination_path = f"results/{safe_user}_{timestamp}_results.csv" |
|
|
tmp_name = None |
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as tmp: |
|
|
results_df.to_csv(tmp, index=False) |
|
|
tmp.flush() |
|
|
tmp_name = tmp.name |
|
|
|
|
|
API.upload_file( |
|
|
path_or_fileobj=tmp_name, |
|
|
path_in_repo=destination_path, |
|
|
repo_id=results_repo, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Add result data for {username}" |
|
|
) |
|
|
Path(tmp_name).unlink() |
|
|
|
|
|
|
|
|
|
|
|
def calculate_metrics( |
|
|
results_dataframe: pd.DataFrame, |
|
|
test_dataframe: pd.DataFrame |
|
|
): |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_check_required_columns(results_dataframe, "Results file", ["Molecule Name"] + ENDPOINTS) |
|
|
_check_required_columns(test_dataframe, "Test file", ["Molecule Name"] + ENDPOINTS) |
|
|
|
|
|
|
|
|
|
|
|
if not (test_dataframe['Molecule Name']).isin(results_dataframe['Molecule Name']).all(): |
|
|
raise gr.Error("Some molecules in the test set are missing from the predictions file. Please ensure all molecules are included.") |
|
|
|
|
|
|
|
|
|
|
|
if results_dataframe['Molecule Name'].duplicated().any(): |
|
|
raise gr.Error("The predictions file contains duplicated molecules. Please ensure each molecule is only listed once.") |
|
|
|
|
|
|
|
|
merged_df = results_dataframe.merge( |
|
|
test_dataframe, |
|
|
on="Molecule Name", |
|
|
suffixes=('_pred', '_true'), |
|
|
how="inner" |
|
|
) |
|
|
merged_df = merged_df.sort_values("Molecule Name") |
|
|
|
|
|
|
|
|
|
|
|
final_cols = ["MAE", "RAE", "R2", "Spearman R", "Kendall's Tau"] |
|
|
all_endpoint_results = [] |
|
|
|
|
|
for ept in ENDPOINTS: |
|
|
pred_col = f"{ept}_pred" |
|
|
true_col = f"{ept}_true" |
|
|
|
|
|
|
|
|
merged_df[pred_col] = pd.to_numeric(merged_df[pred_col], errors="coerce") |
|
|
merged_df[true_col] = pd.to_numeric(merged_df[true_col], errors="coerce") |
|
|
|
|
|
if merged_df[pred_col].isnull().all(): |
|
|
raise gr.Error(f"All predictions are missing for endpoint {ept}. Please provide valid predictions.") |
|
|
|
|
|
|
|
|
subset = merged_df[[pred_col, true_col]].dropna() |
|
|
if subset.empty: |
|
|
raise gr.Error(f"No valid data available for endpoint {ept} after removing NaNs.") |
|
|
|
|
|
|
|
|
y_pred = subset[pred_col].to_numpy() |
|
|
y_true = subset[true_col].to_numpy() |
|
|
|
|
|
|
|
|
if ept.lower() not in ['logd']: |
|
|
y_true_log = clip_and_log_transform(y_true) |
|
|
y_pred_log = clip_and_log_transform(y_pred) |
|
|
|
|
|
else: |
|
|
y_true_log = y_true |
|
|
y_pred_log = y_pred |
|
|
|
|
|
|
|
|
bootstrap_df = bootstrap_metrics(y_pred_log, y_true_log, ept, n_bootstrap_samples=1000) |
|
|
|
|
|
grouped = bootstrap_df.groupby(["Endpoint", "Metric"])["Value"].agg(["mean", "std"]) |
|
|
df_unstacked = grouped.unstack(level="Metric") |
|
|
df_reindexed = df_unstacked.reindex(columns=list(METRICS), level=1) |
|
|
|
|
|
df_reindexed.columns = [f"{agg}_{metric}" for agg, metric in df_reindexed.columns] |
|
|
df_endpoint = df_reindexed.reset_index() |
|
|
all_endpoint_results.append(df_endpoint) |
|
|
|
|
|
df_results = pd.concat(all_endpoint_results, ignore_index=True) |
|
|
mean_cols = [f'mean_{m}' for m in final_cols] |
|
|
std_cols = [f'std_{m}' for m in final_cols] |
|
|
|
|
|
macro_means = df_results[mean_cols].mean() |
|
|
macro_stds = df_results[std_cols].mean() |
|
|
avg_row = {"Endpoint": "Average"} |
|
|
avg_row.update(macro_means.to_dict()) |
|
|
avg_row.update(macro_stds.to_dict()) |
|
|
df_with_average = pd.concat([df_results, pd.DataFrame([avg_row])], ignore_index=True) |
|
|
|
|
|
df_with_average = df_with_average[["Endpoint"]+mean_cols+std_cols] |
|
|
return df_with_average |