Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from gradio_client import Client, handle_file | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime | |
| from typing import List, Optional | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files | |
| from pathlib import Path | |
| import tempfile | |
| from auth import verify_hf_token | |
| # HuggingFace configuration | |
| HF_TOKEN = os.getenv("HF_TOKEN") # Required for writing to dataset | |
| DATASET_REPO = "Fraser/piclets" # Public dataset repository | |
| DATASET_TYPE = "dataset" | |
| # Initialize HuggingFace API with token if available | |
| api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() | |
| # Cache directory for local operations | |
| CACHE_DIR = Path("cache") | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| class PicletDiscoveryService: | |
| """Manages Piclet discovery using HuggingFace datasets""" | |
| def normalize_object_name(name: str) -> str: | |
| """ | |
| Normalize object names for consistent storage and lookup | |
| Examples: "The Blue Pillow" -> "pillow", "wooden chairs" -> "wooden_chair" | |
| """ | |
| if not name: | |
| return "unknown" | |
| # Convert to lowercase and strip | |
| name = name.lower().strip() | |
| # Remove articles (the, a, an) | |
| name = re.sub(r'^(the|a|an)\s+', '', name) | |
| # Remove special characters except spaces | |
| name = re.sub(r'[^a-z0-9\s]', '', name) | |
| # Handle common plurals (basic pluralization rules) | |
| if name.endswith('ies') and len(name) > 4: | |
| name = name[:-3] + 'y' # berries -> berry | |
| elif name.endswith('ves') and len(name) > 4: | |
| name = name[:-3] + 'f' # leaves -> leaf | |
| elif name.endswith('es') and len(name) > 3: | |
| # Check if it's a special case like "glasses" | |
| if not name.endswith(('ses', 'xes', 'zes', 'ches', 'shes')): | |
| name = name[:-2] # boxes -> box (but keep glasses) | |
| elif name.endswith('s') and len(name) > 2 and not name.endswith('ss'): | |
| name = name[:-1] # chairs -> chair (but keep glass) | |
| # Replace spaces with underscores | |
| name = re.sub(r'\s+', '_', name.strip()) | |
| return name | |
| def load_piclet_data(object_name: str) -> Optional[dict]: | |
| """Load Piclet data from HuggingFace dataset""" | |
| try: | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| file_path = f"piclets/{normalized_name}.json" | |
| # Download the file from HuggingFace | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=file_path, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"Could not load piclet data for {object_name}: {e}") | |
| return None | |
| def save_piclet_data(object_name: str, data: dict) -> bool: | |
| """Save Piclet data to HuggingFace dataset""" | |
| try: | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| file_path = f"piclets/{normalized_name}.json" | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(data, f, indent=2) | |
| temp_path = f.name | |
| # Upload to HuggingFace | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Update piclet: {normalized_name}" | |
| ) | |
| # Clean up | |
| os.unlink(temp_path) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to save piclet data: {e}") | |
| return False | |
| def load_user_data(sub: str) -> dict: | |
| """ | |
| Load user profile from dataset by HF user ID (sub) | |
| Args: | |
| sub: HuggingFace user ID (stable identifier) | |
| Returns: | |
| User profile dict or default profile if not found | |
| """ | |
| try: | |
| file_path = f"users/{sub}.json" | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=file_path, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| return json.load(f) | |
| except: | |
| # Return default user profile if not found | |
| # Will be populated with actual data on first save | |
| return { | |
| "sub": sub, | |
| "preferred_username": None, | |
| "name": None, | |
| "picture": None, | |
| "joinedAt": datetime.now().isoformat(), | |
| "lastSeen": datetime.now().isoformat(), | |
| "discoveries": [], | |
| "uniqueFinds": 0, | |
| "totalFinds": 0, | |
| "rarityScore": 0, | |
| "visibility": "public" | |
| } | |
| def save_user_data(sub: str, data: dict) -> bool: | |
| """ | |
| Save user profile to dataset by HF user ID (sub) | |
| Args: | |
| sub: HuggingFace user ID (stable identifier) | |
| data: User profile dict | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| file_path = f"users/{sub}.json" | |
| # Update lastSeen timestamp | |
| data["lastSeen"] = datetime.now().isoformat() | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(data, f, indent=2) | |
| temp_path = f.name | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Update user profile: {data.get('preferred_username', sub)}" | |
| ) | |
| os.unlink(temp_path) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to save user data: {e}") | |
| return False | |
| def get_or_create_user_profile(user_info: dict) -> dict: | |
| """ | |
| Get existing user profile or create new one from OAuth user_info | |
| Refreshes cached profile data on each call | |
| Args: | |
| user_info: OAuth user info from HF (sub, preferred_username, name, picture) | |
| Returns: | |
| User profile dict | |
| """ | |
| sub = user_info['sub'] | |
| # Load existing profile | |
| profile = PicletDiscoveryService.load_user_data(sub) | |
| # Update cached profile fields from OAuth | |
| profile['sub'] = sub | |
| profile['preferred_username'] = user_info.get('preferred_username') | |
| profile['name'] = user_info.get('name') | |
| profile['picture'] = user_info.get('picture') | |
| profile['email'] = user_info.get('email') | |
| # Set joinedAt only if this is a new profile | |
| if 'joinedAt' not in profile or not profile['joinedAt']: | |
| profile['joinedAt'] = datetime.now().isoformat() | |
| return profile | |
| def update_global_stats() -> dict: | |
| """Update and return global statistics""" | |
| try: | |
| # Try to load existing stats | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename="metadata/stats.json", | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| stats = json.load(f) | |
| except: | |
| stats = { | |
| "totalDiscoveries": 0, | |
| "uniqueObjects": 0, | |
| "totalVariations": 0, | |
| "lastUpdated": datetime.now().isoformat() | |
| } | |
| return stats | |
| except Exception as e: | |
| print(f"Failed to update global stats: {e}") | |
| return {} | |
| class PicletGeneratorService: | |
| """ | |
| Orchestrates Piclet generation by calling external AI services | |
| Uses user's hf_token to consume their GPU quota | |
| """ | |
| # Space endpoints | |
| JOY_CAPTION_SPACE = "fancyfeast/joy-caption-alpha-two" | |
| GPT_OSS_SPACE = "amd/gpt-oss-120b-chatbot" | |
| QWEN_IMAGE_SPACE = "multimodalart/Qwen-Image-Fast" | |
| def generate_enhanced_caption(image_path: str, hf_token: str) -> str: | |
| """Generate detailed image description using JoyCaption | |
| Args: | |
| image_path: Path to image file | |
| hf_token: User's HuggingFace token | |
| """ | |
| try: | |
| print(f"Connecting to JoyCaption space with user token...") | |
| client = Client( | |
| PicletGeneratorService.JOY_CAPTION_SPACE, | |
| hf_token=hf_token | |
| ) | |
| print(f"Generating caption for image...") | |
| result = client.predict( | |
| handle_file(image_path), # Wrap path so client uploads file | |
| "Descriptive", # caption_type | |
| "medium-length", # caption_length | |
| [], # extra_options | |
| "", # name_input | |
| "Describe this image in detail, identifying any recognizable objects, brands, logos, or specific models. Be specific about product names and types.", # custom_prompt | |
| api_name="/stream_chat" | |
| ) | |
| # JoyCaption returns tuple: (prompt_used, caption_text) in .data | |
| result_data = result.data if hasattr(result, 'data') else result | |
| caption = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else str(result_data) | |
| print(f"Caption generated: {caption[:100]}...") | |
| return caption | |
| except Exception as e: | |
| print(f"Failed to generate caption: {e}") | |
| raise Exception(f"Caption generation failed: {str(e)}") | |
| def generate_text_with_gpt(prompt: str, hf_token: str) -> str: | |
| """Generate text using GPT-OSS-120B""" | |
| try: | |
| print(f"Connecting to GPT-OSS space...") | |
| client = Client( | |
| PicletGeneratorService.GPT_OSS_SPACE, | |
| hf_token=hf_token | |
| ) | |
| print(f"Generating text...") | |
| result = client.predict( | |
| prompt, # message (positional) | |
| "You are a helpful assistant that creates Pokรฉmon-style monster concepts based on real-world objects.", # system_prompt (positional) | |
| 0.7, # temperature (positional) | |
| api_name="/chat" | |
| ) | |
| # Extract response text (GPT-OSS formats with Analysis and Response) | |
| result_data = result.data if hasattr(result, 'data') else result | |
| response_text = result_data[0] if isinstance(result_data, (list, tuple)) else str(result_data) | |
| # Try to extract Response section | |
| response_match = re.search(r'\*\*๐ฌ Response:\*\*\s*\n\n([\s\S]*)', response_text) | |
| if response_match: | |
| return response_match.group(1).strip() | |
| # Fallback: extract after "assistantfinal" | |
| final_match = re.search(r'assistantfinal\s*([\s\S]*)', response_text) | |
| if final_match: | |
| return final_match.group(1).strip() | |
| return response_text | |
| except Exception as e: | |
| print(f"Failed to generate text: {e}") | |
| raise Exception(f"Text generation failed: {str(e)}") | |
| def generate_piclet_concept(caption: str, hf_token: str) -> dict: | |
| """ | |
| Generate complete Piclet concept from image caption | |
| Returns parsed concept with object name, variation, stats, etc. | |
| """ | |
| concept_prompt = f"""You are analyzing an image to create a Pokรฉmon-style creature. Here's the image description: | |
| "{caption}" | |
| STEP 1 - REASONING (think through these before writing): | |
| 1. What is the PRIMARY PHYSICAL OBJECT? Be SPECIFIC (e.g., "f-16 fighting falcon" not "jet", "macbook pro" not "laptop") | |
| 2. What is this object's real-world PURPOSE and FUNCTION? | |
| 3. What PERSONALITY traits would naturally emerge from this object's characteristics? | |
| - If it's fast โ energetic, agile | |
| - If it's protective โ loyal, defensive | |
| - If it's precise โ disciplined, focused | |
| 4. What NATURAL HABITAT suits this object-turned-creature? | |
| - Electronics โ urban/tech environments | |
| - Vehicles โ roads, skies, waters they traverse | |
| - Tools โ workshops, sites where they're used | |
| 5. What BEHAVIORS and ABILITIES reflect the object's function? | |
| - What does the object DO in real life? | |
| - How would those actions become creature abilities? | |
| 6. What are the object's most ICONIC VISUAL FEATURES that define it? | |
| STEP 2 - FORMAT your complete concept EXACTLY as follows: | |
| ```md | |
| # Canonical Object | |
| {{Specific object name: "macbook", "eiffel tower", "iphone", "tesla", "le creuset mug", "nintendo switch"}} | |
| {{NOT generic terms like: "laptop", "tower", "phone", "car", "mug", "console"}} | |
| {{Include brand/model/landmark name when identifiable}} | |
| # Variation | |
| {{OPTIONAL: one distinctive attribute like "silver", "pro", "night", "gaming", OR use "canonical" if this is the standard/default version with no special variation}} | |
| # Object Rarity | |
| {{common, uncommon, rare, epic, or legendary based on object uniqueness}} | |
| # Monster Name | |
| {{Creative 8-11 letter name based on the SPECIFIC object, e.g., "Macbyte" for MacBook, "Towerfell" for Eiffel Tower}} | |
| # Primary Type | |
| {{beast, bug, aquatic, flora, mineral, space, machina, structure, culture, or cuisine}} | |
| # Physical Stats | |
| Height: {{e.g., "1.2m" or "3'5\\""}} | |
| Weight: {{e.g., "15kg" or "33 lbs"}} | |
| # Personality | |
| {{1-2 sentences describing personality traits based on the object's real-world function and characteristics}} | |
| # Physical Appearance | |
| {{2-3 paragraphs describing how the SPECIFIC object's visual features translate into monster features. Reference the actual object by name. Describe body structure, colors, textures, materials, distinctive markings, and how each physical element relates to the source object.}} | |
| # Lore & Behavior | |
| {{2-3 paragraphs describing the creature's behavior, habitat, abilities, and role in its ecosystem. What does it DO? Where does it live? How does it interact with its environment? What are its natural behaviors and powers that reflect the object's real-world function? This is the creature's background story and behavioral profile.}} | |
| # Monster Image Prompt | |
| {{Detailed 3-4 sentence visual description for anime-style image generation. Describe body structure, colors, textures, materials, distinctive features, personality-driven pose/expression, dynamic action or stance, environment/background setting, and atmospheric lighting. Be specific and detailed about visual elements. DO NOT mention the source object name or include phrases like "Inspired by [object]".}} | |
| ``` | |
| CRITICAL RULES: | |
| - Canonical Object MUST be SPECIFIC: "f-16 fighting falcon" not "jet", "macbook pro" not "laptop", "coca cola" not "soda" | |
| - If you can identify a brand, model, or proper name from the description, USE IT | |
| - Variation should be meaningful and distinctive (material, style, color, context, or model variant) | |
| - Physical Appearance must describe the CREATURE'S BODY with references to the specific object's visual features | |
| - Lore & Behavior must describe WHAT THE CREATURE DOES, not how it looks | |
| - Monster Image Prompt must be a detailed (3-4 sentences) pure visual description without mentioning the source object name | |
| - Monster Image Prompt must NOT include the monster's name or style prefixes like "Anime-style" or "Pokรฉmon-style" | |
| - Primary Type must match the object category (machina for electronics/vehicles, structure for buildings, etc.)""" | |
| response_text = PicletGeneratorService.generate_text_with_gpt(concept_prompt, hf_token) | |
| # Parse the concept | |
| return PicletGeneratorService.parse_concept(response_text) | |
| def parse_concept(concept_text: str) -> dict: | |
| """Parse structured concept text into dict""" | |
| # Remove code block markers if present | |
| if '```' in concept_text: | |
| code_block_match = re.search(r'```(?:md|markdown)?\s*\n([\s\S]*?)```', concept_text) | |
| if code_block_match: | |
| concept_text = code_block_match.group(1).strip() | |
| def extract_section(text: str, section: str) -> str: | |
| """Extract content of a markdown section""" | |
| pattern = rf'\*{{0,2}}#\s*{re.escape(section)}\s*\*{{0,2}}\s*\n([\s\S]*?)(?=^\*{{0,2}}#|$)' | |
| match = re.search(pattern, text, re.MULTILINE) | |
| if match: | |
| content = match.group(1).strip() | |
| # Remove curly braces and quotes that GPT sometimes adds | |
| content = re.sub(r'^[{"]|["}]$', '', content) | |
| content = re.sub(r'^.*:\s*["\']|["\']$', '', content) | |
| return content.strip() | |
| return '' | |
| # Extract all sections | |
| object_name = extract_section(concept_text, 'Canonical Object').lower() | |
| variation_text = extract_section(concept_text, 'Variation') | |
| rarity_text = extract_section(concept_text, 'Object Rarity').lower() | |
| monster_name = extract_section(concept_text, 'Monster Name') | |
| primary_type = extract_section(concept_text, 'Primary Type').lower() | |
| # Extract both appearance and lore sections separately (keep them separate!) | |
| physical_appearance = extract_section(concept_text, 'Physical Appearance') | |
| lore_behavior = extract_section(concept_text, 'Lore & Behavior') | |
| image_prompt = extract_section(concept_text, 'Monster Image Prompt') | |
| # Parse physical stats | |
| physical_stats_text = extract_section(concept_text, 'Physical Stats') | |
| height_match = re.search(r'Height:\s*(.+)', physical_stats_text, re.IGNORECASE) | |
| weight_match = re.search(r'Weight:\s*(.+)', physical_stats_text, re.IGNORECASE) | |
| height = height_match.group(1).strip() if height_match else None | |
| weight = weight_match.group(1).strip() if weight_match else None | |
| personality = extract_section(concept_text, 'Personality') | |
| # Clean monster name | |
| if monster_name: | |
| monster_name = re.sub(r'\*+', '', monster_name) # Remove asterisks | |
| if ',' in monster_name: | |
| monster_name = monster_name.split(',')[0] | |
| if len(monster_name) > 12: | |
| monster_name = monster_name[:12] | |
| # Parse variation | |
| attributes = [] | |
| if variation_text and variation_text.lower() not in ['none', 'canonical', '']: | |
| attributes = [variation_text.lower()] | |
| # Map rarity to tier | |
| tier = 'medium' | |
| if 'common' in rarity_text: | |
| tier = 'low' | |
| elif 'uncommon' in rarity_text: | |
| tier = 'medium' | |
| elif 'rare' in rarity_text and 'epic' not in rarity_text: | |
| tier = 'high' | |
| elif 'legendary' in rarity_text or 'epic' in rarity_text or 'mythical' in rarity_text: | |
| tier = 'legendary' | |
| return { | |
| 'objectName': object_name, | |
| 'attributes': attributes, | |
| 'concept': concept_text, | |
| 'stats': { | |
| 'name': monster_name or 'Unknown', | |
| 'physicalAppearance': physical_appearance, | |
| 'lore': lore_behavior, | |
| 'tier': tier, | |
| 'primaryType': primary_type or 'beast', | |
| 'height': height, | |
| 'weight': weight, | |
| 'personality': personality | |
| }, | |
| 'imagePrompt': image_prompt | |
| } | |
| def generate_piclet_image(image_prompt: str, tier: str, hf_token: str) -> dict: | |
| """Generate Piclet image using Qwen-Image-Fast""" | |
| try: | |
| print(f"Connecting to Qwen-Image-Fast space...") | |
| client = Client( | |
| PicletGeneratorService.QWEN_IMAGE_SPACE, | |
| hf_token=hf_token | |
| ) | |
| # Build enhanced prompt for Pokemon-style anime art | |
| full_prompt = f"{image_prompt} Pokรฉmon anime art style, idle pose, centered, full body visible in frame." | |
| print(f"Generating image with Qwen-Image-Fast...") | |
| print(f"Prompt: {full_prompt[:100]}...") | |
| # Qwen-Image-Fast API: infer(prompt, seed, randomize_seed, aspect_ratio, guidance_scale, num_inference_steps, prompt_enhance) | |
| result = client.predict( | |
| full_prompt, # prompt | |
| 0, # seed (will be randomized) | |
| True, # randomize_seed | |
| "3:4", # aspect_ratio (768x1024 - portrait) | |
| 1.0, # guidance_scale (default) | |
| 8, # num_inference_steps (default, optimized with Lightning LoRA) | |
| True, # prompt_enhance (uses LLM to enhance prompt) | |
| api_name="/infer" | |
| ) | |
| # Qwen returns: (PIL.Image, seed) tuple | |
| result_data = result.data if hasattr(result, 'data') else result | |
| image_data = result_data[0] if isinstance(result_data, (list, tuple)) else result_data | |
| seed = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else 0 | |
| # Handle different return formats (URL or PIL Image object) | |
| image_url = None | |
| if isinstance(image_data, str): | |
| image_url = image_data | |
| elif isinstance(image_data, dict): | |
| image_url = image_data.get('url') or image_data.get('path') | |
| elif hasattr(image_data, 'url'): | |
| image_url = image_data.url | |
| if not image_url: | |
| raise Exception("Failed to extract image URL from Qwen response") | |
| return { | |
| 'imageUrl': image_url, | |
| 'seed': seed, | |
| 'prompt': image_prompt | |
| } | |
| except Exception as e: | |
| print(f"Failed to generate image: {e}") | |
| raise Exception(f"Image generation failed: {str(e)}") | |
| def upload_image_to_dataset(image_path: str, file_name: str) -> str: | |
| """ | |
| Upload image to HuggingFace dataset | |
| Args: | |
| image_path: Local path to the image file (or URL to download from) | |
| file_name: Name for the file (e.g., "pillow_canonical.png") | |
| Returns: | |
| URL to the uploaded image in the dataset | |
| """ | |
| try: | |
| print(f"Uploading image to dataset: {file_name}") | |
| # Handle both local paths and URLs | |
| if image_path.startswith('http'): | |
| # Download from URL first | |
| import requests | |
| response = requests.get(image_path) | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f: | |
| f.write(response.content) | |
| temp_path = f.name | |
| else: | |
| # Use local path directly | |
| temp_path = image_path | |
| # Upload to HuggingFace dataset | |
| file_path = f"images/{file_name}" | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Add piclet image: {file_name}" | |
| ) | |
| # Clean up temp file if we downloaded it | |
| if image_path.startswith('http'): | |
| os.unlink(temp_path) | |
| # Return the dataset URL | |
| dataset_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_path}" | |
| print(f"Image uploaded successfully: {dataset_url}") | |
| return dataset_url | |
| except Exception as e: | |
| print(f"Failed to upload image: {e}") | |
| raise Exception(f"Image upload failed: {str(e)}") | |
| # API Endpoints | |
| def search_piclet(object_name: str, attributes: List[str]) -> dict: | |
| """ | |
| Search for canonical Piclet or variations | |
| Returns matching piclet or None | |
| """ | |
| piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not piclet_data: | |
| return { | |
| "status": "new", | |
| "message": f"No Piclet found for '{object_name}'", | |
| "piclet": None | |
| } | |
| # Check if searching for canonical (no attributes) | |
| if not attributes or len(attributes) == 0: | |
| return { | |
| "status": "existing", | |
| "message": f"Found canonical Piclet for '{object_name}'", | |
| "piclet": piclet_data.get("canonical") | |
| } | |
| # Search for matching variation | |
| variations = piclet_data.get("variations", []) | |
| for variation in variations: | |
| var_attrs = set(variation.get("attributes", [])) | |
| search_attrs = set(attributes) | |
| # Check for close match (at least 50% overlap) | |
| overlap = len(var_attrs.intersection(search_attrs)) | |
| if overlap >= len(search_attrs) * 0.5: | |
| return { | |
| "status": "variation", | |
| "message": f"Found variation of '{object_name}'", | |
| "piclet": variation, | |
| "canonicalId": piclet_data["canonical"]["typeId"] | |
| } | |
| # No variation found, suggest creating one | |
| return { | |
| "status": "new_variation", | |
| "message": f"No variation found for '{object_name}' with attributes {attributes}", | |
| "canonicalId": piclet_data["canonical"]["typeId"], | |
| "piclet": None | |
| } | |
| def create_canonical(object_name: str, piclet_data: str, token_or_username: str) -> dict: | |
| """ | |
| Create a new canonical Piclet | |
| Args: | |
| object_name: The normalized object name (e.g., "pillow") | |
| piclet_data: JSON string of Piclet instance data | |
| token_or_username: Either OAuth token (starts with "hf_") or username for testing | |
| Returns: | |
| Dict with success status and piclet data | |
| """ | |
| try: | |
| piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data | |
| # Determine if this is a token or username | |
| user_info = None | |
| if token_or_username and token_or_username.startswith('hf_'): | |
| # OAuth token - verify it | |
| user_info = verify_hf_token(token_or_username) | |
| if not user_info: | |
| return { | |
| "success": False, | |
| "error": "Invalid OAuth token" | |
| } | |
| else: | |
| # Legacy username mode (for testing) | |
| user_info = { | |
| "sub": f"legacy_{token_or_username}", | |
| "preferred_username": token_or_username, | |
| "name": token_or_username, | |
| "picture": None | |
| } | |
| # Get or create user profile | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Create canonical entry with full discoverer info | |
| canonical_data = { | |
| "canonical": { | |
| "objectName": object_name, | |
| "typeId": f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical", | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": 1, | |
| "picletData": piclet_json | |
| }, | |
| "variations": [] | |
| } | |
| # Save to dataset | |
| if PicletDiscoveryService.save_piclet_data(object_name, canonical_data): | |
| # Update user profile | |
| user_profile["discoveries"].append(canonical_data["canonical"]["typeId"]) | |
| user_profile["uniqueFinds"] += 1 | |
| user_profile["totalFinds"] += 1 | |
| user_profile["rarityScore"] += 100 # Bonus for canonical discovery | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| return { | |
| "success": True, | |
| "message": f"Created canonical Piclet for '{object_name}'", | |
| "piclet": canonical_data["canonical"] | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to save canonical Piclet" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def create_variation(canonical_id: str, attributes: List[str], piclet_data: str, token_or_username: str, object_name: str) -> dict: | |
| """ | |
| Create a variation of an existing canonical Piclet with OAuth verification | |
| Args: | |
| canonical_id: ID of the canonical Piclet | |
| attributes: List of variation attributes | |
| piclet_data: JSON data for the Piclet | |
| token_or_username: Either OAuth token (starts with "hf_") or username for testing | |
| object_name: Normalized object name | |
| Returns: | |
| Success/error dict with variation data | |
| """ | |
| try: | |
| piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data | |
| # Verify token or use legacy mode | |
| user_info = None | |
| if token_or_username and token_or_username.startswith('hf_'): | |
| user_info = verify_hf_token(token_or_username) | |
| if not user_info: | |
| return {"success": False, "error": "Invalid OAuth token"} | |
| else: | |
| # Legacy mode for testing | |
| user_info = { | |
| "sub": f"legacy_{token_or_username}", | |
| "preferred_username": token_or_username, | |
| "name": token_or_username, | |
| "picture": None | |
| } | |
| # Get or create user profile | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Load existing data | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not existing_data: | |
| return { | |
| "success": False, | |
| "error": f"Canonical Piclet not found for '{object_name}'" | |
| } | |
| # Create variation entry | |
| variation_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_{len(existing_data['variations']) + 1:03d}" | |
| variation = { | |
| "typeId": variation_id, | |
| "attributes": attributes, | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": 1, | |
| "picletData": piclet_json | |
| } | |
| # Add to variations | |
| existing_data["variations"].append(variation) | |
| # Save updated data | |
| if PicletDiscoveryService.save_piclet_data(object_name, existing_data): | |
| # Update user profile | |
| user_profile["discoveries"].append(variation_id) | |
| user_profile["totalFinds"] += 1 | |
| user_profile["rarityScore"] += 50 # Bonus for variation discovery | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| return { | |
| "success": True, | |
| "message": f"Created variation of '{object_name}'", | |
| "piclet": variation | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to save variation" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def increment_scan_count(piclet_id: str, object_name: str) -> dict: | |
| """ | |
| Increment the scan count for a Piclet | |
| """ | |
| try: | |
| data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not data: | |
| return { | |
| "success": False, | |
| "error": "Piclet not found" | |
| } | |
| # Check canonical | |
| if data["canonical"]["typeId"] == piclet_id: | |
| data["canonical"]["scanCount"] = data["canonical"].get("scanCount", 0) + 1 | |
| scan_count = data["canonical"]["scanCount"] | |
| else: | |
| # Check variations | |
| for variation in data["variations"]: | |
| if variation["typeId"] == piclet_id: | |
| variation["scanCount"] = variation.get("scanCount", 0) + 1 | |
| scan_count = variation["scanCount"] | |
| break | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Piclet ID not found" | |
| } | |
| # Save updated data | |
| if PicletDiscoveryService.save_piclet_data(object_name, data): | |
| return { | |
| "success": True, | |
| "scanCount": scan_count | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to update scan count" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def generate_piclet(image, hf_token: str) -> dict: | |
| """ | |
| Complete Piclet generation workflow - single endpoint | |
| Takes user's image and hf_token, returns generated Piclet with discovery status | |
| Args: | |
| image: Uploaded image file (Gradio file input) | |
| hf_token: User's HuggingFace OAuth token | |
| Returns: | |
| { | |
| "success": bool, | |
| "piclet": {complete piclet data}, | |
| "discoveryStatus": "new" | "variation" | "existing", | |
| "canonicalId": str (if variation/existing), | |
| "message": str | |
| } | |
| """ | |
| try: | |
| # Validate token and get user info | |
| user_info = verify_hf_token(hf_token) | |
| if not user_info: | |
| return { | |
| "success": False, | |
| "error": "Invalid HuggingFace token" | |
| } | |
| print(f"Generating Piclet for user: {user_info.get('preferred_username', 'unknown')}") | |
| # Get user profile (creates if doesn't exist) | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Get image path from Gradio (type="filepath" gives us a string path) | |
| image_path = image if isinstance(image, str) else str(image) | |
| # Step 1: Generate caption | |
| print("Step 1/5: Generating image caption...") | |
| caption = PicletGeneratorService.generate_enhanced_caption(image_path, hf_token) | |
| # Step 2: Generate concept | |
| print("Step 2/5: Generating Piclet concept...") | |
| concept_data = PicletGeneratorService.generate_piclet_concept(caption, hf_token) | |
| object_name = concept_data['objectName'] | |
| attributes = concept_data['attributes'] | |
| stats = concept_data['stats'] | |
| image_prompt = concept_data['imagePrompt'] | |
| concept_text = concept_data['concept'] | |
| # Step 3: Generate image | |
| print("Step 3/5: Generating Piclet image...") | |
| image_result = PicletGeneratorService.generate_piclet_image( | |
| image_prompt, | |
| stats['tier'], | |
| hf_token | |
| ) | |
| # Step 4: Check for canonical/variation | |
| print("Step 4/5: Checking for existing canonical...") | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| discovery_status = 'new' | |
| canonical_id = None | |
| scan_count = 1 | |
| if existing_data: | |
| # Check if this is an exact canonical match (no attributes) | |
| if not attributes or len(attributes) == 0: | |
| discovery_status = 'existing' | |
| canonical_id = existing_data['canonical']['typeId'] | |
| # Increment scan count | |
| existing_data['canonical']['scanCount'] = existing_data['canonical'].get('scanCount', 0) + 1 | |
| scan_count = existing_data['canonical']['scanCount'] | |
| PicletDiscoveryService.save_piclet_data(object_name, existing_data) | |
| else: | |
| # Check for matching variation | |
| variations = existing_data.get('variations', []) | |
| matched_variation = None | |
| for variation in variations: | |
| var_attrs = set(variation.get('attributes', [])) | |
| search_attrs = set(attributes) | |
| overlap = len(var_attrs.intersection(search_attrs)) | |
| if overlap >= len(search_attrs) * 0.5: | |
| matched_variation = variation | |
| discovery_status = 'existing' | |
| canonical_id = existing_data['canonical']['typeId'] | |
| # Increment variation scan count | |
| variation['scanCount'] = variation.get('scanCount', 0) + 1 | |
| scan_count = variation['scanCount'] | |
| PicletDiscoveryService.save_piclet_data(object_name, existing_data) | |
| break | |
| if not matched_variation: | |
| discovery_status = 'variation' | |
| canonical_id = existing_data['canonical']['typeId'] | |
| # Step 5: Save new discovery if needed | |
| print("Step 5/5: Saving to dataset...") | |
| if discovery_status == 'new': | |
| # Create new canonical | |
| type_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical" | |
| # Upload image to dataset with canonical filename | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| image_filename = f"{normalized_name}_canonical.png" | |
| dataset_image_url = PicletGeneratorService.upload_image_to_dataset( | |
| image_result['imageUrl'], | |
| image_filename | |
| ) | |
| canonical_data = { | |
| "canonical": { | |
| "objectName": object_name, | |
| "typeId": type_id, | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": scan_count, | |
| "picletData": { | |
| "typeId": type_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": dataset_image_url, | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| }, | |
| "variations": [] | |
| } | |
| canonical_id = type_id | |
| PicletDiscoveryService.save_piclet_data(object_name, canonical_data) | |
| # Update user profile | |
| user_profile["discoveries"].append(type_id) | |
| user_profile["uniqueFinds"] = user_profile.get("uniqueFinds", 0) + 1 | |
| user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 | |
| user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 100 | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| elif discovery_status == 'variation': | |
| # Create new variation | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| variation_num = len(existing_data['variations']) + 1 | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| variation_id = f"{normalized_name}_{variation_num:03d}" | |
| # Upload image to dataset with variation filename | |
| image_filename = f"{normalized_name}_{variation_num:03d}.png" | |
| dataset_image_url = PicletGeneratorService.upload_image_to_dataset( | |
| image_result['imageUrl'], | |
| image_filename | |
| ) | |
| variation_data = { | |
| "typeId": variation_id, | |
| "attributes": attributes, | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": scan_count, | |
| "picletData": { | |
| "typeId": variation_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": dataset_image_url, | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| } | |
| existing_data['variations'].append(variation_data) | |
| PicletDiscoveryService.save_piclet_data(object_name, existing_data) | |
| # Update user profile | |
| user_profile["discoveries"].append(variation_id) | |
| user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 | |
| user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 50 | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| # Build complete response | |
| # For existing piclets, get the stored data; for new/variation, use generated data | |
| if discovery_status == 'existing': | |
| # Load the existing piclet data to return | |
| existing_piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if existing_piclet_data and existing_piclet_data.get('canonical'): | |
| existing_canonical = existing_piclet_data['canonical'] | |
| piclet_data = existing_canonical.get('picletData', {}) | |
| piclet_data['discoveryStatus'] = discovery_status | |
| piclet_data['scanCount'] = existing_canonical.get('scanCount', 1) | |
| else: | |
| # Fallback if data not found | |
| piclet_data = { | |
| "typeId": canonical_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": image_result.get('imageUrl', ''), | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "objectName": object_name, | |
| "attributes": attributes, | |
| "discoveryStatus": discovery_status, | |
| "scanCount": scan_count, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| else: | |
| # For new and variation, determine the correct dataset URL | |
| if discovery_status == 'new': | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| image_filename = f"{normalized_name}_canonical.png" | |
| else: # variation | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| variation_num = len(existing_data.get('variations', [])) | |
| image_filename = f"{normalized_name}_{variation_num:03d}.png" | |
| dataset_image_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/images/{image_filename}" | |
| piclet_data = { | |
| "typeId": canonical_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": dataset_image_url, | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "objectName": object_name, | |
| "attributes": attributes, | |
| "discoveryStatus": discovery_status, | |
| "scanCount": scan_count, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| messages = { | |
| 'new': f"Congratulations! You discovered the first {object_name} Piclet!", | |
| 'variation': f"You found a new variation of {object_name}!", | |
| 'existing': f"You encountered a known {object_name} Piclet." | |
| } | |
| return { | |
| "success": True, | |
| "piclet": piclet_data, | |
| "discoveryStatus": discovery_status, | |
| "canonicalId": canonical_id, | |
| "message": messages.get(discovery_status, "Piclet generated!") | |
| } | |
| except Exception as e: | |
| print(f"Failed to generate Piclet: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def get_object_details(object_name: str) -> dict: | |
| """ | |
| Get complete details for an object (canonical + all variations) | |
| Args: | |
| object_name: The object name (e.g., "pillow", "macbook") | |
| Returns: | |
| { | |
| "success": bool, | |
| "objectName": str, | |
| "canonical": {canonical data}, | |
| "variations": [list of variations], | |
| "totalScans": int | |
| } | |
| """ | |
| try: | |
| # Load the object data | |
| piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not piclet_data: | |
| return { | |
| "success": False, | |
| "error": f"No piclet found for object '{object_name}'", | |
| "objectName": object_name | |
| } | |
| # Calculate total scans across canonical and variations | |
| total_scans = piclet_data['canonical'].get('scanCount', 0) | |
| for variation in piclet_data.get('variations', []): | |
| total_scans += variation.get('scanCount', 0) | |
| return { | |
| "success": True, | |
| "objectName": object_name, | |
| "canonical": piclet_data['canonical'], | |
| "variations": piclet_data.get('variations', []), | |
| "totalScans": total_scans, | |
| "variationCount": len(piclet_data.get('variations', [])) | |
| } | |
| except Exception as e: | |
| print(f"Failed to get object details: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "objectName": object_name | |
| } | |
| def get_user_piclets(hf_token: str) -> dict: | |
| """ | |
| Get all Piclets discovered by a specific user | |
| Args: | |
| hf_token: User's HuggingFace OAuth token | |
| Returns: | |
| { | |
| "success": bool, | |
| "piclets": [list of piclet discoveries], | |
| "stats": {user stats} | |
| } | |
| """ | |
| try: | |
| # Verify token and get user info | |
| user_info = verify_hf_token(hf_token) | |
| if not user_info: | |
| return { | |
| "success": False, | |
| "error": "Invalid HuggingFace token", | |
| "piclets": [] | |
| } | |
| # Load user profile | |
| user_profile = PicletDiscoveryService.load_user_data(user_info['sub']) | |
| # Get list of discoveries | |
| discoveries = user_profile.get('discoveries', []) | |
| piclets = [] | |
| # Load each discovered piclet | |
| for type_id in discoveries: | |
| # Extract object name from type_id (e.g., "pillow_canonical" -> "pillow") | |
| object_name = type_id.rsplit('_', 1)[0] | |
| # Load the piclet data | |
| piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if piclet_data: | |
| # Check if it's canonical or variation | |
| if piclet_data['canonical']['typeId'] == type_id: | |
| piclets.append({ | |
| 'type': 'canonical', | |
| 'typeId': type_id, | |
| 'objectName': object_name, | |
| 'discoveredAt': piclet_data['canonical']['discoveredAt'], | |
| 'scanCount': piclet_data['canonical'].get('scanCount', 1), | |
| 'picletData': piclet_data['canonical'].get('picletData', {}) | |
| }) | |
| else: | |
| # Find matching variation | |
| for variation in piclet_data.get('variations', []): | |
| if variation['typeId'] == type_id: | |
| piclets.append({ | |
| 'type': 'variation', | |
| 'typeId': type_id, | |
| 'objectName': object_name, | |
| 'attributes': variation.get('attributes', []), | |
| 'discoveredAt': variation['discoveredAt'], | |
| 'scanCount': variation.get('scanCount', 1), | |
| 'picletData': variation.get('picletData', {}) | |
| }) | |
| break | |
| # Sort by discovery date (most recent first) | |
| piclets.sort(key=lambda x: x.get('discoveredAt', ''), reverse=True) | |
| return { | |
| "success": True, | |
| "piclets": piclets, | |
| "stats": { | |
| "username": user_info.get('preferred_username'), | |
| "name": user_info.get('name'), | |
| "picture": user_info.get('picture'), | |
| "totalFinds": user_profile.get('totalFinds', 0), | |
| "uniqueFinds": user_profile.get('uniqueFinds', 0), | |
| "rarityScore": user_profile.get('rarityScore', 0), | |
| "joinedAt": user_profile.get('joinedAt') | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Failed to get user piclets: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "piclets": [] | |
| } | |
| def get_recent_activity(limit: int = 20) -> dict: | |
| """ | |
| Get recent discoveries across all users | |
| """ | |
| try: | |
| activities = [] | |
| # List all piclet files | |
| try: | |
| files = list_repo_files( | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN | |
| ) | |
| piclet_files = [f for f in files if f.startswith("piclets/") and f.endswith(".json")] | |
| except: | |
| piclet_files = [] | |
| # Load recent piclets (simplified - in production, maintain a separate activity log) | |
| for file_path in piclet_files[-limit:]: | |
| try: | |
| object_name = file_path.replace("piclets/", "").replace(".json", "") | |
| data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if data: | |
| # Add canonical discovery | |
| canonical = data["canonical"] | |
| activities.append({ | |
| "type": "discovery", | |
| "objectName": object_name, | |
| "typeId": canonical["typeId"], | |
| "discoveredBy": canonical["discoveredBy"], | |
| "discoveredAt": canonical["discoveredAt"], | |
| "scanCount": canonical.get("scanCount", 1) | |
| }) | |
| # Add recent variations | |
| for variation in data.get("variations", [])[-5:]: | |
| activities.append({ | |
| "type": "variation", | |
| "objectName": object_name, | |
| "typeId": variation["typeId"], | |
| "attributes": variation["attributes"], | |
| "discoveredBy": variation["discoveredBy"], | |
| "discoveredAt": variation["discoveredAt"], | |
| "scanCount": variation.get("scanCount", 1) | |
| }) | |
| except: | |
| continue | |
| # Sort by discovery date | |
| activities.sort(key=lambda x: x.get("discoveredAt", ""), reverse=True) | |
| return { | |
| "success": True, | |
| "activities": activities[:limit] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "activities": [] | |
| } | |
| def get_leaderboard(limit: int = 10) -> dict: | |
| """ | |
| Get top discoverers | |
| """ | |
| try: | |
| leaderboard = [] | |
| # List all user files | |
| try: | |
| files = list_repo_files( | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN | |
| ) | |
| user_files = [f for f in files if f.startswith("users/") and f.endswith(".json")] | |
| except: | |
| user_files = [] | |
| # Load user data | |
| for file_path in user_files: | |
| try: | |
| username = file_path.replace("users/", "").replace(".json", "") | |
| user_data = PicletDiscoveryService.load_user_data(username) | |
| leaderboard.append({ | |
| "username": username, | |
| "totalFinds": user_data.get("totalFinds", 0), | |
| "uniqueFinds": user_data.get("uniqueFinds", 0), | |
| "rarityScore": user_data.get("rarityScore", 0) | |
| }) | |
| except: | |
| continue | |
| # Sort by rarity score | |
| leaderboard.sort(key=lambda x: x["rarityScore"], reverse=True) | |
| # Add ranks | |
| for i, entry in enumerate(leaderboard[:limit]): | |
| entry["rank"] = i + 1 | |
| return { | |
| "success": True, | |
| "leaderboard": leaderboard[:limit] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "leaderboard": [] | |
| } | |
| # Create Gradio interface | |
| with gr.Blocks(title="Piclets Discovery Server") as app: | |
| gr.Markdown(""" | |
| # ๐ Piclets Discovery Server | |
| Backend service for the Piclets discovery game. Each real-world object has ONE canonical Piclet! | |
| """) | |
| with gr.Tab("Generate Piclet"): | |
| gr.Markdown(""" | |
| ## ๐ฎ Complete Piclet Generator | |
| Upload an image and provide your HuggingFace token to generate a complete Piclet. | |
| This endpoint handles the entire workflow: captioning, concept generation, image creation, and dataset storage. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gen_image = gr.Image(label="Upload Image", type="filepath") | |
| gen_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") | |
| gen_btn = gr.Button("Generate Piclet", variant="primary") | |
| with gr.Column(): | |
| gen_result = gr.JSON(label="Generated Piclet Result") | |
| gen_btn.click( | |
| fn=generate_piclet, | |
| inputs=[gen_image, gen_token], | |
| outputs=gen_result | |
| ) | |
| with gr.Tab("My Piclets"): | |
| gr.Markdown(""" | |
| ## ๐ Your Discovery Collection | |
| View all Piclets you've discovered (includes your stats). | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| my_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") | |
| my_btn = gr.Button("Get My Piclets", variant="primary") | |
| with gr.Column(): | |
| my_result = gr.JSON(label="My Piclets") | |
| my_btn.click( | |
| fn=get_user_piclets, | |
| inputs=my_token, | |
| outputs=my_result | |
| ) | |
| with gr.Tab("Object Details"): | |
| gr.Markdown(""" | |
| ## ๐ View Object Details | |
| Get complete information about an object (canonical + all variations). | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| obj_name = gr.Textbox(label="Object Name", placeholder="e.g., pillow, macbook") | |
| obj_btn = gr.Button("Get Details", variant="primary") | |
| with gr.Column(): | |
| obj_result = gr.JSON(label="Object Details") | |
| obj_btn.click( | |
| fn=get_object_details, | |
| inputs=obj_name, | |
| outputs=obj_result | |
| ) | |
| with gr.Tab("Recent Activity"): | |
| activity_limit = gr.Slider(5, 50, value=20, label="Number of Activities") | |
| activity_btn = gr.Button("Get Recent Activity") | |
| activity_result = gr.JSON(label="Recent Discoveries") | |
| activity_btn.click( | |
| fn=get_recent_activity, | |
| inputs=activity_limit, | |
| outputs=activity_result | |
| ) | |
| with gr.Tab("Leaderboard"): | |
| leader_limit = gr.Slider(5, 20, value=10, label="Top N Discoverers") | |
| leader_btn = gr.Button("Get Leaderboard") | |
| leader_result = gr.JSON(label="Top Discoverers") | |
| leader_btn.click( | |
| fn=get_leaderboard, | |
| inputs=leader_limit, | |
| outputs=leader_result | |
| ) | |
| # API Documentation | |
| gr.Markdown(""" | |
| ## ๐ Public API Endpoints | |
| All endpoints return JSON responses. The frontend only needs these 5 endpoints: | |
| ### 1. **generate_piclet** (Scanner) | |
| Complete Piclet generation workflow. | |
| - Input: `image` (File), `hf_token` (string) | |
| - Output: Generated Piclet with discovery status | |
| ### 2. **get_user_piclets** (User Collection) | |
| Get user's discovered Piclets and stats. | |
| - Input: `hf_token` (string) | |
| - Output: List of Piclets + user stats (total/unique finds, rarity score) | |
| ### 3. **get_object_details** (Object Data) | |
| Get complete object info (canonical + all variations). | |
| - Input: `object_name` (string) | |
| - Output: Canonical + variations + total scans | |
| ### 4. **get_recent_activity** (Activity Feed) | |
| Recent discoveries across all users. | |
| - Input: `limit` (int, default 20) | |
| - Output: Recent discoveries with timestamps | |
| ### 5. **get_leaderboard** (Top Users) | |
| Top discoverers by rarity score. | |
| - Input: `limit` (int, default 10) | |
| - Output: Ranked users with stats | |
| --- | |
| *Note: Internal helper functions (search_piclet, create_canonical, etc.) are used by generate_piclet but not exposed to frontend.* | |
| """) | |
| if __name__ == "__main__": | |
| # Protect web UI with authentication while allowing API access | |
| admin_password = os.getenv("ADMIN_PASSWORD", "changeme") | |
| # Configure for HuggingFace Space environment | |
| app.launch() | |