Spaces:
Running
Running
| import gradio as gr | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files | |
| from pathlib import Path | |
| import tempfile | |
| from auth import verify_hf_token, get_user_from_request_headers | |
| # HuggingFace configuration | |
| HF_TOKEN = os.getenv("HF_TOKEN") # Required for writing to dataset | |
| DATASET_REPO = "Fraser/piclets" # Public dataset repository | |
| DATASET_TYPE = "dataset" | |
| # Initialize HuggingFace API with token if available | |
| api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() | |
| # Cache directory for local operations | |
| CACHE_DIR = Path("cache") | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| class PicletDiscoveryService: | |
| """Manages Piclet discovery using HuggingFace datasets""" | |
| def normalize_object_name(name: str) -> str: | |
| """ | |
| Normalize object names for consistent storage and lookup | |
| Examples: "The Blue Pillow" -> "pillow", "wooden chairs" -> "wooden_chair" | |
| """ | |
| if not name: | |
| return "unknown" | |
| # Convert to lowercase and strip | |
| name = name.lower().strip() | |
| # Remove articles (the, a, an) | |
| name = re.sub(r'^(the|a|an)\s+', '', name) | |
| # Remove special characters except spaces | |
| name = re.sub(r'[^a-z0-9\s]', '', name) | |
| # Handle common plurals (basic pluralization rules) | |
| if name.endswith('ies') and len(name) > 4: | |
| name = name[:-3] + 'y' # berries -> berry | |
| elif name.endswith('ves') and len(name) > 4: | |
| name = name[:-3] + 'f' # leaves -> leaf | |
| elif name.endswith('es') and len(name) > 3: | |
| # Check if it's a special case like "glasses" | |
| if not name.endswith(('ses', 'xes', 'zes', 'ches', 'shes')): | |
| name = name[:-2] # boxes -> box (but keep glasses) | |
| elif name.endswith('s') and len(name) > 2 and not name.endswith('ss'): | |
| name = name[:-1] # chairs -> chair (but keep glass) | |
| # Replace spaces with underscores | |
| name = re.sub(r'\s+', '_', name.strip()) | |
| return name | |
| def load_piclet_data(object_name: str) -> Optional[dict]: | |
| """Load Piclet data from HuggingFace dataset""" | |
| try: | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| file_path = f"piclets/{normalized_name}.json" | |
| # Download the file from HuggingFace | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=file_path, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"Could not load piclet data for {object_name}: {e}") | |
| return None | |
| def save_piclet_data(object_name: str, data: dict) -> bool: | |
| """Save Piclet data to HuggingFace dataset""" | |
| try: | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| file_path = f"piclets/{normalized_name}.json" | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(data, f, indent=2) | |
| temp_path = f.name | |
| # Upload to HuggingFace | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Update piclet: {normalized_name}" | |
| ) | |
| # Clean up | |
| os.unlink(temp_path) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to save piclet data: {e}") | |
| return False | |
| def load_user_data(sub: str) -> dict: | |
| """ | |
| Load user profile from dataset by HF user ID (sub) | |
| Args: | |
| sub: HuggingFace user ID (stable identifier) | |
| Returns: | |
| User profile dict or default profile if not found | |
| """ | |
| try: | |
| file_path = f"users/{sub}.json" | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=file_path, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| return json.load(f) | |
| except: | |
| # Return default user profile if not found | |
| # Will be populated with actual data on first save | |
| return { | |
| "sub": sub, | |
| "preferred_username": None, | |
| "name": None, | |
| "picture": None, | |
| "joinedAt": datetime.now().isoformat(), | |
| "lastSeen": datetime.now().isoformat(), | |
| "discoveries": [], | |
| "uniqueFinds": 0, | |
| "totalFinds": 0, | |
| "rarityScore": 0, | |
| "visibility": "public" | |
| } | |
| def save_user_data(sub: str, data: dict) -> bool: | |
| """ | |
| Save user profile to dataset by HF user ID (sub) | |
| Args: | |
| sub: HuggingFace user ID (stable identifier) | |
| data: User profile dict | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| file_path = f"users/{sub}.json" | |
| # Update lastSeen timestamp | |
| data["lastSeen"] = datetime.now().isoformat() | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(data, f, indent=2) | |
| temp_path = f.name | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Update user profile: {data.get('preferred_username', sub)}" | |
| ) | |
| os.unlink(temp_path) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to save user data: {e}") | |
| return False | |
| def get_or_create_user_profile(user_info: dict) -> dict: | |
| """ | |
| Get existing user profile or create new one from OAuth user_info | |
| Refreshes cached profile data on each call | |
| Args: | |
| user_info: OAuth user info from HF (sub, preferred_username, name, picture) | |
| Returns: | |
| User profile dict | |
| """ | |
| sub = user_info['sub'] | |
| # Load existing profile | |
| profile = PicletDiscoveryService.load_user_data(sub) | |
| # Update cached profile fields from OAuth | |
| profile['sub'] = sub | |
| profile['preferred_username'] = user_info.get('preferred_username') | |
| profile['name'] = user_info.get('name') | |
| profile['picture'] = user_info.get('picture') | |
| profile['email'] = user_info.get('email') | |
| # Set joinedAt only if this is a new profile | |
| if 'joinedAt' not in profile or not profile['joinedAt']: | |
| profile['joinedAt'] = datetime.now().isoformat() | |
| return profile | |
| def update_global_stats() -> dict: | |
| """Update and return global statistics""" | |
| try: | |
| # Try to load existing stats | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename="metadata/stats.json", | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| stats = json.load(f) | |
| except: | |
| stats = { | |
| "totalDiscoveries": 0, | |
| "uniqueObjects": 0, | |
| "totalVariations": 0, | |
| "lastUpdated": datetime.now().isoformat() | |
| } | |
| return stats | |
| except Exception as e: | |
| print(f"Failed to update global stats: {e}") | |
| return {} | |
| # API Endpoints | |
| def search_piclet(object_name: str, attributes: List[str]) -> dict: | |
| """ | |
| Search for canonical Piclet or variations | |
| Returns matching piclet or None | |
| """ | |
| piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not piclet_data: | |
| return { | |
| "status": "new", | |
| "message": f"No Piclet found for '{object_name}'", | |
| "piclet": None | |
| } | |
| # Check if searching for canonical (no attributes) | |
| if not attributes or len(attributes) == 0: | |
| return { | |
| "status": "existing", | |
| "message": f"Found canonical Piclet for '{object_name}'", | |
| "piclet": piclet_data.get("canonical") | |
| } | |
| # Search for matching variation | |
| variations = piclet_data.get("variations", []) | |
| for variation in variations: | |
| var_attrs = set(variation.get("attributes", [])) | |
| search_attrs = set(attributes) | |
| # Check for close match (at least 50% overlap) | |
| overlap = len(var_attrs.intersection(search_attrs)) | |
| if overlap >= len(search_attrs) * 0.5: | |
| return { | |
| "status": "variation", | |
| "message": f"Found variation of '{object_name}'", | |
| "piclet": variation, | |
| "canonicalId": piclet_data["canonical"]["typeId"] | |
| } | |
| # No variation found, suggest creating one | |
| return { | |
| "status": "new_variation", | |
| "message": f"No variation found for '{object_name}' with attributes {attributes}", | |
| "canonicalId": piclet_data["canonical"]["typeId"], | |
| "piclet": None | |
| } | |
| def create_canonical(object_name: str, piclet_data: str, token_or_username: str) -> dict: | |
| """ | |
| Create a new canonical Piclet | |
| Args: | |
| object_name: The normalized object name (e.g., "pillow") | |
| piclet_data: JSON string of Piclet instance data | |
| token_or_username: Either OAuth token (starts with "hf_") or username for testing | |
| Returns: | |
| Dict with success status and piclet data | |
| """ | |
| try: | |
| piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data | |
| # Determine if this is a token or username | |
| user_info = None | |
| if token_or_username and token_or_username.startswith('hf_'): | |
| # OAuth token - verify it | |
| user_info = verify_hf_token(token_or_username) | |
| if not user_info: | |
| return { | |
| "success": False, | |
| "error": "Invalid OAuth token" | |
| } | |
| else: | |
| # Legacy username mode (for testing) | |
| user_info = { | |
| "sub": f"legacy_{token_or_username}", | |
| "preferred_username": token_or_username, | |
| "name": token_or_username, | |
| "picture": None | |
| } | |
| # Get or create user profile | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Create canonical entry with full discoverer info | |
| canonical_data = { | |
| "canonical": { | |
| "objectName": object_name, | |
| "typeId": f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical", | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": 1, | |
| "picletData": piclet_json | |
| }, | |
| "variations": [] | |
| } | |
| # Save to dataset | |
| if PicletDiscoveryService.save_piclet_data(object_name, canonical_data): | |
| # Update user profile | |
| user_profile["discoveries"].append(canonical_data["canonical"]["typeId"]) | |
| user_profile["uniqueFinds"] += 1 | |
| user_profile["totalFinds"] += 1 | |
| user_profile["rarityScore"] += 100 # Bonus for canonical discovery | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| return { | |
| "success": True, | |
| "message": f"Created canonical Piclet for '{object_name}'", | |
| "piclet": canonical_data["canonical"] | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to save canonical Piclet" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def create_variation(canonical_id: str, attributes: List[str], piclet_data: str, token_or_username: str, object_name: str) -> dict: | |
| """ | |
| Create a variation of an existing canonical Piclet with OAuth verification | |
| Args: | |
| canonical_id: ID of the canonical Piclet | |
| attributes: List of variation attributes | |
| piclet_data: JSON data for the Piclet | |
| token_or_username: Either OAuth token (starts with "hf_") or username for testing | |
| object_name: Normalized object name | |
| Returns: | |
| Success/error dict with variation data | |
| """ | |
| try: | |
| piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data | |
| # Verify token or use legacy mode | |
| user_info = None | |
| if token_or_username and token_or_username.startswith('hf_'): | |
| user_info = verify_hf_token(token_or_username) | |
| if not user_info: | |
| return {"success": False, "error": "Invalid OAuth token"} | |
| else: | |
| # Legacy mode for testing | |
| user_info = { | |
| "sub": f"legacy_{token_or_username}", | |
| "preferred_username": token_or_username, | |
| "name": token_or_username, | |
| "picture": None | |
| } | |
| # Get or create user profile | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Load existing data | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not existing_data: | |
| return { | |
| "success": False, | |
| "error": f"Canonical Piclet not found for '{object_name}'" | |
| } | |
| # Create variation entry | |
| variation_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_{len(existing_data['variations']) + 1:03d}" | |
| variation = { | |
| "typeId": variation_id, | |
| "attributes": attributes, | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": 1, | |
| "picletData": piclet_json | |
| } | |
| # Add to variations | |
| existing_data["variations"].append(variation) | |
| # Save updated data | |
| if PicletDiscoveryService.save_piclet_data(object_name, existing_data): | |
| # Update user profile | |
| user_profile["discoveries"].append(variation_id) | |
| user_profile["totalFinds"] += 1 | |
| user_profile["rarityScore"] += 50 # Bonus for variation discovery | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| return { | |
| "success": True, | |
| "message": f"Created variation of '{object_name}'", | |
| "piclet": variation | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to save variation" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def increment_scan_count(piclet_id: str, object_name: str) -> dict: | |
| """ | |
| Increment the scan count for a Piclet | |
| """ | |
| try: | |
| data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not data: | |
| return { | |
| "success": False, | |
| "error": "Piclet not found" | |
| } | |
| # Check canonical | |
| if data["canonical"]["typeId"] == piclet_id: | |
| data["canonical"]["scanCount"] = data["canonical"].get("scanCount", 0) + 1 | |
| scan_count = data["canonical"]["scanCount"] | |
| else: | |
| # Check variations | |
| for variation in data["variations"]: | |
| if variation["typeId"] == piclet_id: | |
| variation["scanCount"] = variation.get("scanCount", 0) + 1 | |
| scan_count = variation["scanCount"] | |
| break | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Piclet ID not found" | |
| } | |
| # Save updated data | |
| if PicletDiscoveryService.save_piclet_data(object_name, data): | |
| return { | |
| "success": True, | |
| "scanCount": scan_count | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to update scan count" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def get_recent_activity(limit: int = 20) -> dict: | |
| """ | |
| Get recent discoveries across all users | |
| """ | |
| try: | |
| activities = [] | |
| # List all piclet files | |
| try: | |
| files = list_repo_files( | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN | |
| ) | |
| piclet_files = [f for f in files if f.startswith("piclets/") and f.endswith(".json")] | |
| except: | |
| piclet_files = [] | |
| # Load recent piclets (simplified - in production, maintain a separate activity log) | |
| for file_path in piclet_files[-limit:]: | |
| try: | |
| object_name = file_path.replace("piclets/", "").replace(".json", "") | |
| data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if data: | |
| # Add canonical discovery | |
| canonical = data["canonical"] | |
| activities.append({ | |
| "type": "discovery", | |
| "objectName": object_name, | |
| "typeId": canonical["typeId"], | |
| "discoveredBy": canonical["discoveredBy"], | |
| "discoveredAt": canonical["discoveredAt"], | |
| "scanCount": canonical.get("scanCount", 1) | |
| }) | |
| # Add recent variations | |
| for variation in data.get("variations", [])[-5:]: | |
| activities.append({ | |
| "type": "variation", | |
| "objectName": object_name, | |
| "typeId": variation["typeId"], | |
| "attributes": variation["attributes"], | |
| "discoveredBy": variation["discoveredBy"], | |
| "discoveredAt": variation["discoveredAt"], | |
| "scanCount": variation.get("scanCount", 1) | |
| }) | |
| except: | |
| continue | |
| # Sort by discovery date | |
| activities.sort(key=lambda x: x.get("discoveredAt", ""), reverse=True) | |
| return { | |
| "success": True, | |
| "activities": activities[:limit] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "activities": [] | |
| } | |
| def get_leaderboard(limit: int = 10) -> dict: | |
| """ | |
| Get top discoverers | |
| """ | |
| try: | |
| leaderboard = [] | |
| # List all user files | |
| try: | |
| files = list_repo_files( | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN | |
| ) | |
| user_files = [f for f in files if f.startswith("users/") and f.endswith(".json")] | |
| except: | |
| user_files = [] | |
| # Load user data | |
| for file_path in user_files: | |
| try: | |
| username = file_path.replace("users/", "").replace(".json", "") | |
| user_data = PicletDiscoveryService.load_user_data(username) | |
| leaderboard.append({ | |
| "username": username, | |
| "totalFinds": user_data.get("totalFinds", 0), | |
| "uniqueFinds": user_data.get("uniqueFinds", 0), | |
| "rarityScore": user_data.get("rarityScore", 0) | |
| }) | |
| except: | |
| continue | |
| # Sort by rarity score | |
| leaderboard.sort(key=lambda x: x["rarityScore"], reverse=True) | |
| # Add ranks | |
| for i, entry in enumerate(leaderboard[:limit]): | |
| entry["rank"] = i + 1 | |
| return { | |
| "success": True, | |
| "leaderboard": leaderboard[:limit] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "leaderboard": [] | |
| } | |
| def get_user_profile(username: str) -> dict: | |
| """ | |
| Get user's discovery profile | |
| """ | |
| try: | |
| user_data = PicletDiscoveryService.load_user_data(username) | |
| return { | |
| "success": True, | |
| "profile": user_data | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "profile": None | |
| } | |
| # Create Gradio interface | |
| with gr.Blocks(title="Piclets Discovery Server") as app: | |
| gr.Markdown(""" | |
| # π Piclets Discovery Server | |
| Backend service for the Piclets discovery game. Each real-world object has ONE canonical Piclet! | |
| """) | |
| with gr.Tab("Search Piclet"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| search_object = gr.Textbox(label="Object Name", placeholder="e.g., pillow") | |
| search_attrs = gr.Textbox(label="Attributes (comma-separated)", placeholder="e.g., velvet, blue") | |
| search_btn = gr.Button("Search", variant="primary") | |
| with gr.Column(): | |
| search_result = gr.JSON(label="Search Result") | |
| search_btn.click( | |
| fn=lambda obj, attrs: search_piclet(obj, [a.strip() for a in attrs.split(",")] if attrs else []), | |
| inputs=[search_object, search_attrs], | |
| outputs=search_result | |
| ) | |
| with gr.Tab("Create Canonical"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| canonical_object = gr.Textbox(label="Object Name") | |
| canonical_data = gr.Textbox(label="Piclet Data (JSON)", lines=10) | |
| canonical_user = gr.Textbox(label="Username") | |
| canonical_btn = gr.Button("Create Canonical", variant="primary") | |
| with gr.Column(): | |
| canonical_result = gr.JSON(label="Creation Result") | |
| canonical_btn.click( | |
| fn=create_canonical, | |
| inputs=[canonical_object, canonical_data, canonical_user], | |
| outputs=canonical_result | |
| ) | |
| with gr.Tab("Create Variation"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| var_object = gr.Textbox(label="Object Name") | |
| var_canonical = gr.Textbox(label="Canonical ID") | |
| var_attrs = gr.Textbox(label="Variation Attributes (comma-separated)") | |
| var_data = gr.Textbox(label="Piclet Data (JSON)", lines=10) | |
| var_user = gr.Textbox(label="Username") | |
| var_btn = gr.Button("Create Variation", variant="primary") | |
| with gr.Column(): | |
| var_result = gr.JSON(label="Creation Result") | |
| var_btn.click( | |
| fn=lambda obj, cid, attrs, data, user: create_variation( | |
| cid, [a.strip() for a in attrs.split(",")] if attrs else [], data, user, obj | |
| ), | |
| inputs=[var_object, var_canonical, var_attrs, var_data, var_user], | |
| outputs=var_result | |
| ) | |
| with gr.Tab("Activity Feed"): | |
| activity_limit = gr.Slider(5, 50, value=20, label="Number of Activities") | |
| activity_btn = gr.Button("Get Recent Activity") | |
| activity_result = gr.JSON(label="Recent Discoveries") | |
| activity_btn.click( | |
| fn=get_recent_activity, | |
| inputs=activity_limit, | |
| outputs=activity_result | |
| ) | |
| with gr.Tab("Leaderboard"): | |
| leader_limit = gr.Slider(5, 20, value=10, label="Top N Discoverers") | |
| leader_btn = gr.Button("Get Leaderboard") | |
| leader_result = gr.JSON(label="Top Discoverers") | |
| leader_btn.click( | |
| fn=get_leaderboard, | |
| inputs=leader_limit, | |
| outputs=leader_result | |
| ) | |
| with gr.Tab("User Profile"): | |
| profile_user = gr.Textbox(label="Username") | |
| profile_btn = gr.Button("Get Profile") | |
| profile_result = gr.JSON(label="User Profile") | |
| profile_btn.click( | |
| fn=get_user_profile, | |
| inputs=profile_user, | |
| outputs=profile_result | |
| ) | |
| # API Documentation | |
| gr.Markdown(""" | |
| ## API Endpoints | |
| All endpoints accept JSON and return JSON responses. | |
| - **search_piclet**: Search for canonical or variation Piclets | |
| - **create_canonical**: Register a new canonical Piclet | |
| - **create_variation**: Add a variation to existing canonical | |
| - **increment_scan_count**: Track discovery popularity | |
| - **get_recent_activity**: Global discovery feed | |
| - **get_leaderboard**: Top discoverers | |
| - **get_user_profile**: Individual discovery stats | |
| See API_DOCUMENTATION.md for detailed usage. | |
| """) | |
| if __name__ == "__main__": | |
| app.launch() |