Maria Castellanos
validations and improvements
20ed309
raw
history blame
12.5 kB
import gradio as gr
import pandas as pd
from pathlib import Path
from typing import Optional
from about import ENDPOINTS, API, submissions_repo, results_repo, test_repo
from utils import metrics_per_ep
from huggingface_hub import hf_hub_download
import datetime
import io
import json, tempfile
import re
from pydantic import (
BaseModel,
Field,
model_validator,
field_validator,
ValidationError
)
HF_USERNAME_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-_]{1,38})$")
def _safeify_username(username: str) -> str:
return str(username.strip()).replace("/", "_").replace(" ", "_")
def _unsafify_username(username: str) -> str:
return str(username.strip()).replace("/", "_").replace(" ", "_")
def _check_required_columns(df: pd.DataFrame, name: str, cols: list[str]):
missing = [c for c in cols if c not in df.columns]
if missing:
raise ValueError(f"{name} is missing required columns: {missing}")
class ParticipantRecord(BaseModel):
hf_username: str = Field(description="Hugging Face username")
display_name: Optional[str] = Field(description="Name to display on leaderboard")
participant_name: Optional[str] = Field(default=None, description="Participant's real name")
discord_username: Optional[str] = Field(default=None, description="Discord username")
email: Optional[str] = Field(default=None, description="Email address")
affiliation: Optional[str] = Field(default=None, description="Affiliation")
model_tag: Optional[str] = Field(default=None, description="Link to model description")
anonymous: bool = Field(default=False, description="Whether to display username as 'anonymous'")
consent_publication: bool = Field(default=False, description="Consent to be included in publications")
@field_validator("hf_username")
@classmethod
def validate_hf_username(cls, v: str) -> str:
v = v.strip()
if not HF_USERNAME_RE.match(v):
raise gr.Error("Invalid Hugging Face username (letters, numbers, -, _; min 2, max ~39).")
return v
@field_validator("display_name")
@classmethod
def validate_display_name(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return None
v = v.strip()
if not v:
return None
if len(v) > 20:
raise ValueError("Display name is too long (max 20 chars).")
return v
@field_validator("model_tag", mode="before")
@classmethod
def normalize_url(cls, v):
if v is None:
return v
s = str(v).strip()
if not s:
return None
if "://" not in s:
s = "https://" + s
return s
@model_validator(mode="after")
def require_display_name_if_anonymous(self) -> "ParticipantRecord":
if self.anonymous and not self.display_name:
raise ValueError("Alias is required when anonymous box is checked.")
return self
class SubmissionMetadata(BaseModel):
submission_time_utc: str
user: str
original_filename: str
evaluated: bool
participant: ParticipantRecord
def submit_data(predictions_file: str,
user_state,
participant_name: str = "",
discord_username: str = "",
email: str = "",
affiliation: str = "",
model_tag: str = "",
user_display: str = "",
anon_checkbox: bool = False,
paper_checkbox: bool = False
):
if user_state is None:
raise gr.Error("Username or alias is required for submission.")
file_path = Path(predictions_file).resolve()
if not file_path.exists():
raise gr.Error("Uploaded file object does not have a valid file path.")
# Read results file
try:
results_df = pd.read_csv(file_path)
except Exception as e:
return f"❌ Error reading results file: {str(e)}"
if results_df.empty:
return gr.Error("The uploaded file is empty.")
missing = set(ENDPOINTS) - set(results_df.columns)
if missing:
return gr.Error(f"The uploaded file must contain all endpoint predictions {ENDPOINTS} as columns.")
# Save participant record
try:
participant_record = ParticipantRecord(
hf_username=user_state,
participant_name=participant_name,
discord_username=discord_username,
email=email,
affiliation=affiliation,
model_tag=model_tag,
display_name=user_display,
anonymous=anon_checkbox,
consent_publication=paper_checkbox
)
except ValidationError as e:
return f"❌ Error in participant information: {str(e)}"
# Build destination filename in the dataset
ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") # should keep default time so can be deserialized correctly
try:
meta = SubmissionMetadata(
submission_time_utc=ts,
user=user_state,
original_filename=file_path.name,
evaluated=False,
participant=participant_record
)
except ValidationError as e:
return f"❌ Error in metadata information: {str(e)}"
safe_user = _safeify_username(user_state)
destination_csv = f"submissions/{safe_user}_{ts}.csv"
destination_json = destination_csv.replace(".csv", ".json")
# Upload the CSV file
API.upload_file(
path_or_fileobj=str(file_path),
path_in_repo=destination_csv,
repo_id=submissions_repo,
repo_type="dataset",
commit_message=f"Add submission for {safe_user} at {ts}"
)
# Upload the metadata JSON file
meta_bytes = io.BytesIO(json.dumps(meta.model_dump(), indent=2).encode("utf-8"))
API.upload_file(
path_or_fileobj=meta_bytes,
path_in_repo=destination_json,
repo_id=submissions_repo,
repo_type="dataset",
commit_message=f"Add metadata for {user_state} submission at {ts}"
)
return "✅ Your submission has been received! Your scores will appear on the leaderboard shortly.", destination_csv
def evaluate_data(filename: str) -> None:
# Load the submission csv
try:
local_path = hf_hub_download(
repo_id=submissions_repo,
repo_type="dataset",
filename=filename,
)
except Exception as e:
raise gr.Error(f"Failed to download submission file: {e}")
# Load the test set
try:
test_path = hf_hub_download(
repo_id=test_repo,
repo_type="dataset",
filename="data/challenge_mock_test_set.csv", #Replace later with "test_dataset.csv",
)
except Exception as e:
raise gr.Error(f"Failed to download test file: {e}")
data_df = pd.read_csv(local_path)
test_df = pd.read_csv(test_path)
try:
results_df = calculate_metrics(data_df, test_df)
if not isinstance(results_df, pd.DataFrame) or results_df.empty:
raise gr.Error("Evaluation produced no results.")
except Exception as e:
raise gr.Error(f'Evaluation failed: {e}. No results written to results dataset.')
# Load metadata file
meta_filename = filename.replace(".csv", ".json")
try:
meta_path = hf_hub_download(
repo_id=submissions_repo,
repo_type="dataset",
filename=meta_filename,
)
with open(meta_path, "r", encoding="utf-8") as f:
_meta = json.load(f)
meta = SubmissionMetadata(**_meta)
username = meta.participant.hf_username
timestamp = meta.submission_time_utc
report = meta.participant.model_tag
if meta.participant.anonymous:
display_name = meta.participant.display_name
else:
display_name = username
except Exception as e:
raise gr.Error(f"Failed to load metadata file: {e}. No results written to results dataset.")
# Write results to results dataset
results_df['user'] = display_name
results_df['submission_time'] = timestamp
results_df['model_report'] = report
results_df['anonymous'] = meta.participant.anonymous
safe_user = _unsafify_username(username)
destination_path = f"results/{safe_user}_{timestamp}_results.csv"
tmp_name = None
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as tmp:
results_df.to_csv(tmp, index=False)
tmp.flush()
tmp_name = tmp.name
API.upload_file(
path_or_fileobj=tmp_name,
path_in_repo=destination_path,
repo_id=results_repo,
repo_type="dataset",
commit_message=f"Add result data for {username}"
)
Path(tmp_name).unlink()
def calculate_metrics(
results_dataframe: pd.DataFrame,
test_dataframe: pd.DataFrame
):
import numpy as np
# Do some checks
# 1) Check all columns are present
_check_required_columns(results_dataframe, "Results file", ["Molecule Name"] + ENDPOINTS)
_check_required_columns(test_dataframe, "Test file", ["Molecule Name"] + ENDPOINTS)
# 2) Check all Molecules in the test set are present in the predictions
merged_df = pd.merge(test_dataframe, results_dataframe, on=['Molecule Name'], how='left', indicator=True)
if not (merged_df['_merge'] == 'both').all():
raise gr.Error("The predictions file is missing some molecules present in the test set. Please ensure all molecules are included.")
# TODO: What to do when a molecule is duplicated in the Predictions file?
df_results = pd.DataFrame(columns=["endpoint", "MAE", "RAE", "R2", "Spearman R", "Kendall's Tau"])
for i, measurement in enumerate(ENDPOINTS):
df_pred = results_dataframe[['Molecule Name', measurement]].copy()
df_true = test_dataframe[['Molecule Name', measurement]].copy()
# coerce numeric columns
df_pred[measurement] = pd.to_numeric(df_pred[measurement], errors="coerce")
df_true[measurement] = pd.to_numeric(df_true[measurement], errors="coerce")
if df_pred[measurement].isnull().all():
# TODO: Allow missing endpoints or raise an error?
raise gr.Error(f"All predictions are missing for endpoint {measurement}. Please provide valid predictions.")
# Drop NaNs and calculate coverage
merged = (
df_pred.rename(columns={measurement: f"{measurement}_pred"})
.merge(
df_true.rename(columns={measurement: f"{measurement}_true"}),
on="Molecule Name",
how="inner",
)
.dropna(subset=[f"{measurement}_pred", f"{measurement}_true"])
)
n_total = merged[f"{measurement}_true"].notna().sum() # Valid test set points
n_pairs = len(merged) # actual pairs with predictions
coverage = (n_pairs / n_total * 100.0) if n_total else 0.0
merged = merged.sort_values("Molecule Name", kind="stable")
# validate pairs
if n_pairs < 10:
mae = rae = r2 = spearman = ktau = np.nan
else:
y_pred = merged[f"{measurement}_pred"].to_numpy()
y_true = merged[f"{measurement}_true"].to_numpy()
# Force log scale for all endpoints except LogD (for outliers)
if measurement != "LogD":
y_pred = np.log10(y_pred)
y_true = np.log10(y_true)
mae, rae, r2, spearman, ktau = metrics_per_ep(y_pred, y_true)
df_results.loc[i, 'endpoint'] = measurement
df_results.loc[i, 'MAE'] = mae
df_results.loc[i, 'RAE'] = rae
df_results.loc[i, 'R2'] = r2
df_results.loc[i, 'Spearman R'] = spearman
df_results.loc[i, "Kendall's Tau"] = ktau
df_results.loc[i, 'data coverage (%)'] = coverage
# Average results
num_cols = ["MAE", "RAE", "R2", "Spearman R", "Kendall's Tau", "data coverage (%)"]
df_results[num_cols] = df_results[num_cols].apply(pd.to_numeric, errors="coerce")
means = df_results[num_cols].mean()
avg_row = {"endpoint": "Average", **means.to_dict()}
df_with_average = pd.concat([df_results, pd.DataFrame([avg_row])], ignore_index=True)
return df_with_average