import os import json import re import streamlit as st import pandas as pd import numpy as np from urllib.parse import quote from pathlib import Path import re import html import pickle from typing import Dict, Any from scipy.stats import sem from utils.constants import (DATASETS, DIGITS_FOR_VALUES, DIGITS_FOR_ERRORS, DATASET_INFO, DIMENSIONS, RESULTS_DIR, DIMENSION_INFO) def sanitize_model_name(model_name): # Only allow alphanumeric chars, hyphen, underscore if model_name.startswith('.'): raise ValueError("model name cannot start with a dot") if not re.match("^[a-zA-Z0-9-_][a-zA-Z0-9-_.]*$", model_name): raise ValueError("Invalid model name format") return model_name def safe_path_join(*parts): # Ensure we stay within results directory base = Path("results").resolve() try: path = base.joinpath(*parts).resolve() if not str(path).startswith(str(base)): raise ValueError("Path traversal detected") return path except Exception: raise ValueError("Invalid path") def sanitize_column_name(col: str) -> str: """Sanitize column names for HTML display""" col= str(col) is_result_column = [True if item in col else False for item in ["IQM", "Mean"]] col = col.replace("_", " ") if any(is_result_column) else col.replace("_", " ").title() return html.escape(col) def sanitize_cell_value(value: Any) -> str: """Sanitize cell values for HTML display""" if isinstance(value, (int, float)): return str(value) return html.escape(str(value)) def create_html_results_table(df, df_err): html = ''' ''' html += '
' html += '' html += '' for column in df.columns: #if column == "index": continue html += f'' html += '' html += '' for (_, row), (_, row_err) in zip(df.iterrows(), df_err.iterrows()): html += '' for col in df.columns: #if column == "index": continue if col == "Model": html += f'' else: if col in row_err: if row[col] != row_err[col]: html += f'' else: html += f'' else: html += f'' html += '' html += '
{sanitize_column_name(column)}
{row[col]}{sanitize_cell_value(row[col])} ± {sanitize_cell_value(row_err[col])} {sanitize_cell_value(row[col])}{sanitize_cell_value(row[col])}
' html += '
' return html def create_html_table_info(df): #create html table html = ''' ''' html += '
' html += '' html += '' for column in df.columns: html += f'' html += '' html += '' for (_, row) in df.iterrows(): html += '' for column in df.columns: if column == "Citation": html += f'' else: html += f'' html += '' html += '
{sanitize_column_name(column)}
{row[column]}{sanitize_cell_value(row[column])}
' html += '
' return html def check_sanity(model_name): try: safe_model = sanitize_model_name(model_name) for benchmark in DATASETS: file_path = safe_path_join(safe_model, f"{benchmark.lower()}.json") if not file_path.is_file(): continue original_count = 0 with open(file_path) as f: results = json.load(f) for result in results: #if not all(key in result for key in ["model_name", "benchmark", "original_or_reproduced", "score", "std_err", "task_type", "followed_evaluation_protocol", "reproducible", "comments", "date_time"]): # return False #if result["model_name"] != model_name: # return False #if result["benchmark"] != benchmark: # return False if result["original_or_reproduced"] == "Original": original_count += 1 if original_count != 1: return False return True except ValueError: return False def make_hyperlink_datasets(url: str , url_name: str, root: str = "") -> str: try: if len(url) == 0: return url_name full_url = f"{root}{url}" return f'{html.escape(url_name)}' except ValueError: return "" def filter_with_user_selections(unique_key: str, iqm_column_name: str, table = pd.DataFrame, table_err = pd.DataFrame ) -> tuple[pd.DataFrame, pd.DataFrame]: table.reset_index(inplace=True) table_err.reset_index(inplace=True) #filter best results per model if selected view_best_per_model = st.radio( "Select all results or best results", ["all results", "best results per model"], index=0, key=unique_key, horizontal=True ) if view_best_per_model == "best results per model": table[iqm_column_name] = pd.to_numeric(table[iqm_column_name]) table = table.loc[table.groupby('Model')[iqm_column_name].transform('idxmax'),:] table = table.drop_duplicates(['Model']) #filter by search bars col1, col2, col3 = st.columns(3) with col1: search_models_query = st.text_input(f"Search by model", "", key=f"search_{unique_key}_models") with col2: search_submission_query = st.text_input(f"Search by submission", "", key=f"search_{unique_key}_submission") with col3: search_settings_query = st.text_input(f"Search by settings", "", key=f"search_{unique_key}_settings") if search_models_query: table = table[table['Model'].str.contains(search_models_query, case=False)] if search_submission_query: table = table[table['submission'].str.contains(search_submission_query, case=False)] if search_settings_query: table = table[table['Config Settings'].str.contains(search_settings_query, case=False)] # Sort values table = table.sort_values(by=iqm_column_name, ascending=False) table_err = table_err.loc[table.index] #table = table.reset_index() #table_err = table_err.reset_index() table = table.drop(["index"], errors='ignore') table_err = table_err.drop(["index"], errors='ignore') return table, table_err def create_overall_performance_tab(overall_performance_tables): # Main Leaderboard tab st.header("Overall Performance") #show raw or normalized results if selected view_raw_or_normalized = st.radio( "Select raw or normalized values", ["normalized values (with IQM)", "raw values (with Mean)"], index=0, key="overall_raw_or_normalized", horizontal=True ) if view_raw_or_normalized == "normalized values (with IQM)": overall_table = overall_performance_tables["normalized"].copy() overall_table_err = overall_performance_tables["normalized_err"].copy() iqm_column_name = 'Overall IQM' else: overall_table = overall_performance_tables["raw"].copy() overall_table_err = overall_performance_tables["raw_err"].copy() iqm_column_name = 'Overall Mean' # filter with user selections overall_table, overall_table_err = filter_with_user_selections(unique_key="overall_all_or_best", iqm_column_name = iqm_column_name, table = overall_table, table_err = overall_table_err ) # Display the filtered DataFrame or the entire leaderboard #df['submission'] = df['submission'].apply(make_hyperlink) #overall_performance_table['Model'] = overall_performance_table['Model'].apply(make_hyperlink) html_table = create_html_results_table(overall_table, overall_table_err) st.markdown(html_table, unsafe_allow_html=True) # Export the DataFrame to CSV if st.button("Export to CSV", key=f"overall_performance_export_main"): csv_data = overall_table.to_csv(index=False) st.download_button( label="Download CSV", data=csv_data, file_name=f"overall_performance_leaderboard.csv", key="download-csv", help="Click to download the CSV file", ) def create_dimension_performance_tab( performance_by_dimension_tables ): # Dimension tab st.header("Performance By Dimension") #add drop down dimension_drop_down = st.selectbox('Select dimension to view', ([f"{key} ({value})" for key, value in DIMENSION_INFO.items()])) dimension_drop_down = dimension_drop_down.split(" (")[0] #show raw or normalized results if selected view_raw_or_normalized_dimension = st.radio( "Select raw or normalized values", ["normalized values (with IQM)", "raw values (with Mean)"], index=0, key="dimension_raw_or_normalized", horizontal=True ) if view_raw_or_normalized_dimension == "normalized values (with IQM)": dimension_table = performance_by_dimension_tables["normalized"][dimension_drop_down].copy() dimension_table_err = performance_by_dimension_tables["normalized_err"][f"{dimension_drop_down}_err"].copy() iqm_column_name = f'Overall {dimension_drop_down} IQM' else: dimension_table = performance_by_dimension_tables["raw"][dimension_drop_down].copy() dimension_table_err = performance_by_dimension_tables["raw_err"][f"{dimension_drop_down}_err"].copy() iqm_column_name = f'Overall {dimension_drop_down} Mean' # filter with search bars dimension_table, dimension_table_err = filter_with_user_selections(unique_key = "dimension_all_or_best", iqm_column_name = iqm_column_name, table = dimension_table, table_err = dimension_table_err) #st.markdown(f"DIMENSION INFO: {dimension_drop_down} {DIMENSION_INFO[dimension_drop_down]}") #performance_by_dimension_tables[dimension_drop_down]['Model'] = performance_by_dimension_tables[dimension_drop_down]['Model'].apply(make_hyperlink) html_table = create_html_results_table(dimension_table, dimension_table_err) st.markdown(html_table, unsafe_allow_html=True) def create_datasets_tabs(datasets_tables: dict ): datasets_tabs = st.tabs([dataset.replace("_", " ") for dataset in DATASETS]) for i, dataset in enumerate(DATASETS): with datasets_tabs[i]: dataset_name = dataset.replace("_", " ").title() dataset_desc = DATASET_INFO["Description"][DATASET_INFO["Dataset"].index(dataset_name)] st.header(dataset.replace("_", " ").title()) st.markdown(dataset_desc) #show raw or normalized results if selected view_raw_or_normalized_dataset = st.radio( "Select raw or normalized values", ["normalized values (with IQM)", "raw values (with Mean)"], index=0, key=f"{dataset}_raw_or_normalized", horizontal=True ) if view_raw_or_normalized_dataset == "normalized values (with IQM)": dataset_table = datasets_tables["normalized"][dataset].copy() dataset_table_err = datasets_tables["normalized_err"][dataset].copy() iqm_column_name = "IQM" else: dataset_table = datasets_tables["raw"][dataset].copy() dataset_table_err = datasets_tables["raw_err"][dataset].copy() iqm_column_name = "Mean" # filter with search bars dataset_table, dataset_table_err = filter_with_user_selections(unique_key = dataset, iqm_column_name = iqm_column_name, table = dataset_table, table_err = dataset_table_err ) #create html table html_table = create_html_results_table(dataset_table, dataset_table_err) st.markdown(html_table, unsafe_allow_html=True) def create_info_tab(): tabs = st.tabs(["Dataset Info", "Dimension Info"]) with tabs[0]: st.header("Dataset Info") dataset_table = pd.DataFrame(DATASET_INFO) citation_hyperlinks = [make_hyperlink_datasets(url = row.Hyperlinks, url_name = row.Citation) for _, row in dataset_table.iterrows()] dataset_table.drop(columns=['Hyperlinks', 'Citation'], inplace = True) dataset_table["Citation"] = citation_hyperlinks dataset_table = create_html_table_info(dataset_table) st.markdown(dataset_table, unsafe_allow_html=True) with tabs[1]: st.header("Dimension Info") dims = [] datasets = [] details = [] for dimension, info in DIMENSION_INFO.items(): dims.append(dimension) datasets.append(", ".join(DIMENSIONS[dimension])) details.append(info) dim_table = pd.DataFrame({ "Dimension": dims, "Details": details, "Datasets": datasets, }) dim_table = create_html_table_info(dim_table) st.markdown(dim_table, unsafe_allow_html=True) def main(): st.set_page_config(page_title="GeoBench Leaderboard", layout="wide", initial_sidebar_state="expanded") st.markdown(""" """, unsafe_allow_html=True) #read compiled results with open(f'{RESULTS_DIR}/compiled.pkl', 'rb') as handle: compiled_results = pickle.load(handle) overall_performance_tables = compiled_results["overall_performance_tables"] performance_by_dimension_tables = compiled_results["performance_by_dimension_tables"] datasets_tables = compiled_results["datasets_tables"] del compiled_results #create header st.title("🏆 GEO-Bench Leaderboard") st.markdown("Leaderboard to evaluate Geospatial Foundation Models on downstream tasks") # content = create_yall() tabs = st.tabs(["🏆 Main Leaderboard", "Dimensions", "Datasets", "Info", "📝 How to Submit"]) with tabs[0]: create_overall_performance_tab(overall_performance_tables=overall_performance_tables) with tabs[1]: create_dimension_performance_tab(performance_by_dimension_tables=performance_by_dimension_tables) with tabs[2]: # Datasets tabs #create individual dataset pages create_datasets_tabs(datasets_tables=datasets_tables) with tabs[3]: # Dimensions tab create_info_tab() with tabs[-1]: #About page st.header("How to Submit") with open("utils/about_page.txt") as f: about_page = f.read() st.markdown(about_page) comment = """ with tabs[2]: # Models tab st.markdown("Models used for benchmarking") model_tabs = st.tabs(all_model_names) #create individual benchmark pages #create_models_tabs(all_submission_results=all_submission_results, # model_tabs=model_tabs, # all_model_names=all_model_names # ) with tabs[3]: # Submissions tab st.markdown("Experiments submitted to benchmark benchmarking") submissions_tabs = st.tabs(all_submissions) #create individual benchmark pages #create_submissions_tabs(all_submission_results=all_submission_results, # model_tabs=submissions_tabs, # all_submissions=all_submissions # ) """ if __name__ == "__main__": main()