Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from difflib import SequenceMatcher | |
| from typing import Any, Dict, Optional, Tuple | |
| from fastapi import FastAPI, Request, Response | |
| from huggingface_hub import (DatasetCard, HfApi, ModelCard, comment_discussion, | |
| create_discussion, get_discussion_details, | |
| get_repo_discussions, login) | |
| from huggingface_hub.utils import EntryNotFoundError | |
| from tabulate import tabulate | |
| KEY = os.environ.get("KEY") | |
| #HF_TOKEN = os.environ.get("HF_ACCESS_TOKEN") | |
| #api = HfApi(token=HF_TOKEN) | |
| #login(HF_TOKEN) | |
| #app = FastAPI() | |
| def similar(a, b): | |
| """Check similarity of two sequences""" | |
| return SequenceMatcher(None, a, b).ratio() | |
| def create_metadata_key_dict(card_data, repo_type: str): | |
| shared_keys = ["tags", "license"] | |
| if repo_type == "model": | |
| model_keys = ["library_name", "datasets", "metrics", "co2", "pipeline_tag"] | |
| shared_keys.extend(model_keys) | |
| keys = shared_keys | |
| return {key: card_data.get(key) for key in keys} | |
| if repo_type == "dataset": | |
| data_keys = [ | |
| "pretty_name", | |
| "size_categories", | |
| "task_categories", | |
| "task_ids", | |
| "source_datasets", | |
| ] | |
| shared_keys.extend(data_keys) | |
| keys = shared_keys | |
| return {key: card_data.get(key) for key in keys} | |
| def create_metadata_breakdown_table(desired_metadata_dictionary): | |
| data = {k:v or "Field Missing" for k,v in desired_metadata_dictionary.items()} | |
| metadata_fields_column = list(data.keys()) | |
| metadata_values_column = list(data.values()) | |
| table_data = list(zip(metadata_fields_column, metadata_values_column)) | |
| return tabulate( | |
| table_data, tablefmt="github", headers=("Metadata Field", "Provided Value") | |
| ) | |
| def calculate_grade(desired_metadata_dictionary): | |
| metadata_values = list(desired_metadata_dictionary.values()) | |
| score = sum(1 if field else 0 for field in metadata_values) / len(metadata_values) | |
| return round(score, 2) | |
| def create_markdown_report( | |
| desired_metadata_dictionary, repo_name, repo_type, score, update: bool = False | |
| ): | |
| report = f"""# {repo_type.title()} metadata report card {"(updated)" if update else ""} | |
| \n | |
| This is an automatically produced metadata quality report card for {repo_name}. This report is meant as a POC! | |
| \n | |
| ## Breakdown of metadata fields for your{repo_type} | |
| \n | |
| {create_metadata_breakdown_table(desired_metadata_dictionary)} | |
| \n | |
| You scored a metadata coverage grade of: **{score}**% \n {f"We're not angry we're just disappointed! {repo_type.title()} metadata is super important. Please try harder..." | |
| if score <= 0.5 else f"Not too shabby! Make sure you also fill in a {repo_type} card too!"} | |
| """ | |
| return report | |
| def parse_webhook_post(data: Dict[str, Any]) -> Optional[Tuple[str, str]]: | |
| event = data["event"] | |
| if event["scope"] != "repo": | |
| return None | |
| repo = data["repo"] | |
| repo_name = repo["name"] | |
| repo_type = repo["type"] | |
| if repo_type not in {"model", "dataset"}: | |
| raise ValueError("Unknown hub type") | |
| return repo_type, repo_name | |
| def load_repo_card_metadata(repo_type, repo_name): | |
| if repo_type == "dataset": | |
| try: | |
| return DatasetCard.load(repo_name).data.to_dict() | |
| except EntryNotFoundError: | |
| return {} | |
| if repo_type == "model": | |
| try: | |
| return ModelCard.load(repo_name).data.to_dict() | |
| except EntryNotFoundError: | |
| return {} | |
| def create_or_update_report(data): | |
| if parsed_post := parse_webhook_post(data): | |
| repo_type, repo_name = parsed_post | |
| else: | |
| return Response("Unable to parse webhook data", status_code=400) | |
| card_data = load_repo_card_metadata(repo_type, repo_name) | |
| desired_metadata_dictionary = create_metadata_key_dict(card_data, repo_type) | |
| score = calculate_grade(desired_metadata_dictionary) | |
| report = create_markdown_report( | |
| desired_metadata_dictionary, repo_name, repo_type, score, update=False | |
| ) | |
| repo_discussions = get_repo_discussions( | |
| repo_name, | |
| repo_type=repo_type, | |
| ) | |
| for discussion in repo_discussions: | |
| if ( | |
| discussion.title == "Metadata Report Card" and discussion.status == "open" | |
| ): # An existing open report card thread | |
| discussion_details = get_discussion_details( | |
| repo_name, discussion.num, repo_type=repo_type | |
| ) | |
| last_comment = discussion_details.events[-1].content | |
| if similar(report, last_comment) <= 0.999: | |
| report = create_markdown_report( | |
| desired_metadata_dictionary, | |
| repo_name, | |
| repo_type, | |
| score, | |
| update=True, | |
| ) | |
| comment_discussion( | |
| repo_name, | |
| discussion.num, | |
| comment=report, | |
| repo_type=repo_type, | |
| ) | |
| return True | |
| create_discussion( | |
| repo_name, | |
| "Metadata Report Card", | |
| description=report, | |
| repo_type=repo_type, | |
| ) | |
| return True | |