import gradio as gr from gradio_client import Client, handle_file import json import os import re from datetime import datetime from typing import List, Optional from huggingface_hub import HfApi, hf_hub_download, list_repo_files from pathlib import Path import tempfile from auth import verify_hf_token # HuggingFace configuration HF_TOKEN = os.getenv("HF_TOKEN") # Required for writing to dataset DATASET_REPO = "Fraser/piclets" # Public dataset repository DATASET_TYPE = "dataset" # Initialize HuggingFace API with token if available api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() # Cache directory for local operations CACHE_DIR = Path("cache") CACHE_DIR.mkdir(exist_ok=True) class PicletDiscoveryService: """Manages Piclet discovery using HuggingFace datasets""" @staticmethod def normalize_object_name(name: str) -> str: """ Normalize object names for consistent storage and lookup Examples: "The Blue Pillow" -> "pillow", "wooden chairs" -> "wooden_chair" """ if not name: return "unknown" # Convert to lowercase and strip name = name.lower().strip() # Remove articles (the, a, an) name = re.sub(r'^(the|a|an)\s+', '', name) # Remove special characters except spaces name = re.sub(r'[^a-z0-9\s]', '', name) # Handle common plurals (basic pluralization rules) if name.endswith('ies') and len(name) > 4: name = name[:-3] + 'y' # berries -> berry elif name.endswith('ves') and len(name) > 4: name = name[:-3] + 'f' # leaves -> leaf elif name.endswith('es') and len(name) > 3: # Check if it's a special case like "glasses" if not name.endswith(('ses', 'xes', 'zes', 'ches', 'shes')): name = name[:-2] # boxes -> box (but keep glasses) elif name.endswith('s') and len(name) > 2 and not name.endswith('ss'): name = name[:-1] # chairs -> chair (but keep glass) # Replace spaces with underscores name = re.sub(r'\s+', '_', name.strip()) return name @staticmethod def load_piclet_data(object_name: str) -> Optional[dict]: """Load Piclet data from HuggingFace dataset""" try: normalized_name = PicletDiscoveryService.normalize_object_name(object_name) file_path = f"piclets/{normalized_name}.json" # Download the file from HuggingFace local_path = hf_hub_download( repo_id=DATASET_REPO, filename=file_path, repo_type=DATASET_TYPE, token=HF_TOKEN, cache_dir=str(CACHE_DIR) ) with open(local_path, 'r') as f: return json.load(f) except Exception as e: print(f"Could not load piclet data for {object_name}: {e}") return None @staticmethod def save_piclet_data(object_name: str, data: dict) -> bool: """Save Piclet data to HuggingFace dataset""" try: normalized_name = PicletDiscoveryService.normalize_object_name(object_name) file_path = f"piclets/{normalized_name}.json" # Create a temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(data, f, indent=2) temp_path = f.name # Upload to HuggingFace api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_path, repo_id=DATASET_REPO, repo_type=DATASET_TYPE, commit_message=f"Update piclet: {normalized_name}" ) # Clean up os.unlink(temp_path) return True except Exception as e: print(f"Failed to save piclet data: {e}") return False @staticmethod def load_user_data(sub: str) -> dict: """ Load user profile from dataset by HF user ID (sub) Args: sub: HuggingFace user ID (stable identifier) Returns: User profile dict or default profile if not found """ try: file_path = f"users/{sub}.json" local_path = hf_hub_download( repo_id=DATASET_REPO, filename=file_path, repo_type=DATASET_TYPE, token=HF_TOKEN, cache_dir=str(CACHE_DIR) ) with open(local_path, 'r') as f: return json.load(f) except: # Return default user profile if not found # Will be populated with actual data on first save return { "sub": sub, "preferred_username": None, "name": None, "picture": None, "joinedAt": datetime.now().isoformat(), "lastSeen": datetime.now().isoformat(), "discoveries": [], "uniqueFinds": 0, "totalFinds": 0, "rarityScore": 0, "visibility": "public" } @staticmethod def save_user_data(sub: str, data: dict) -> bool: """ Save user profile to dataset by HF user ID (sub) Args: sub: HuggingFace user ID (stable identifier) data: User profile dict Returns: True if successful, False otherwise """ try: file_path = f"users/{sub}.json" # Update lastSeen timestamp data["lastSeen"] = datetime.now().isoformat() with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(data, f, indent=2) temp_path = f.name api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_path, repo_id=DATASET_REPO, repo_type=DATASET_TYPE, commit_message=f"Update user profile: {data.get('preferred_username', sub)}" ) os.unlink(temp_path) return True except Exception as e: print(f"Failed to save user data: {e}") return False @staticmethod def get_or_create_user_profile(user_info: dict) -> dict: """ Get existing user profile or create new one from OAuth user_info Refreshes cached profile data on each call Args: user_info: OAuth user info from HF (sub, preferred_username, name, picture) Returns: User profile dict """ sub = user_info['sub'] # Load existing profile profile = PicletDiscoveryService.load_user_data(sub) # Update cached profile fields from OAuth profile['sub'] = sub profile['preferred_username'] = user_info.get('preferred_username') profile['name'] = user_info.get('name') profile['picture'] = user_info.get('picture') profile['email'] = user_info.get('email') # Set joinedAt only if this is a new profile if 'joinedAt' not in profile or not profile['joinedAt']: profile['joinedAt'] = datetime.now().isoformat() return profile @staticmethod def update_global_stats() -> dict: """Update and return global statistics""" try: # Try to load existing stats try: local_path = hf_hub_download( repo_id=DATASET_REPO, filename="metadata/stats.json", repo_type=DATASET_TYPE, token=HF_TOKEN, cache_dir=str(CACHE_DIR) ) with open(local_path, 'r') as f: stats = json.load(f) except: stats = { "totalDiscoveries": 0, "uniqueObjects": 0, "totalVariations": 0, "lastUpdated": datetime.now().isoformat() } return stats except Exception as e: print(f"Failed to update global stats: {e}") return {} class PicletGeneratorService: """ Orchestrates Piclet generation by calling external AI services Uses user's hf_token to consume their GPU quota """ # Space endpoints JOY_CAPTION_SPACE = "fancyfeast/joy-caption-alpha-two" GPT_OSS_SPACE = "amd/gpt-oss-120b-chatbot" QWEN_IMAGE_SPACE = "multimodalart/Qwen-Image-Fast" @staticmethod def generate_enhanced_caption(image_path: str, hf_token: str) -> str: """Generate detailed image description using JoyCaption Args: image_path: Path to image file hf_token: User's HuggingFace token """ try: print(f"Connecting to JoyCaption space with user token...") client = Client( PicletGeneratorService.JOY_CAPTION_SPACE, hf_token=hf_token ) print(f"Generating caption for image...") result = client.predict( handle_file(image_path), # Wrap path so client uploads file "Descriptive", # caption_type "medium-length", # caption_length [], # extra_options "", # name_input "Describe this image in detail, identifying any recognizable objects, brands, logos, or specific models. Be specific about product names and types.", # custom_prompt api_name="/stream_chat" ) # JoyCaption returns tuple: (prompt_used, caption_text) in .data result_data = result.data if hasattr(result, 'data') else result caption = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else str(result_data) print(f"Caption generated: {caption[:100]}...") return caption except Exception as e: print(f"Failed to generate caption: {e}") raise Exception(f"Caption generation failed: {str(e)}") @staticmethod def generate_text_with_gpt(prompt: str, hf_token: str) -> str: """Generate text using GPT-OSS-120B""" try: print(f"Connecting to GPT-OSS space...") client = Client( PicletGeneratorService.GPT_OSS_SPACE, hf_token=hf_token ) print(f"Generating text...") result = client.predict( prompt, # message (positional) "You are a helpful assistant that creates Pokémon-style monster concepts based on real-world objects.", # system_prompt (positional) 0.7, # temperature (positional) api_name="/chat" ) # Extract response text (GPT-OSS formats with Analysis and Response) result_data = result.data if hasattr(result, 'data') else result response_text = result_data[0] if isinstance(result_data, (list, tuple)) else str(result_data) # Try to extract Response section response_match = re.search(r'\*\*💬 Response:\*\*\s*\n\n([\s\S]*)', response_text) if response_match: return response_match.group(1).strip() # Fallback: extract after "assistantfinal" final_match = re.search(r'assistantfinal\s*([\s\S]*)', response_text) if final_match: return final_match.group(1).strip() return response_text except Exception as e: print(f"Failed to generate text: {e}") raise Exception(f"Text generation failed: {str(e)}") @staticmethod def generate_piclet_concept(caption: str, hf_token: str) -> dict: """ Generate complete Piclet concept from image caption Returns parsed concept with object name, variation, stats, etc. """ concept_prompt = f"""You are analyzing an image to create a Pokémon-style creature. Here's the image description: "{caption}" STEP 1 - REASONING (think through these before writing): 1. What is the PRIMARY PHYSICAL OBJECT? Be SPECIFIC (e.g., "f-16 fighting falcon" not "jet", "macbook pro" not "laptop") 2. What is this object's real-world PURPOSE and FUNCTION? 3. What PERSONALITY traits would naturally emerge from this object's characteristics? - If it's fast → energetic, agile - If it's protective → loyal, defensive - If it's precise → disciplined, focused 4. What NATURAL HABITAT suits this object-turned-creature? - Electronics → urban/tech environments - Vehicles → roads, skies, waters they traverse - Tools → workshops, sites where they're used 5. What BEHAVIORS and ABILITIES reflect the object's function? - What does the object DO in real life? - How would those actions become creature abilities? 6. What are the object's most ICONIC VISUAL FEATURES that define it? STEP 2 - FORMAT your complete concept EXACTLY as follows: ```md # Canonical Object {{Specific object name: "macbook", "eiffel tower", "iphone", "tesla", "le creuset mug", "nintendo switch"}} {{NOT generic terms like: "laptop", "tower", "phone", "car", "mug", "console"}} {{Include brand/model/landmark name when identifiable}} # Variation {{OPTIONAL: one distinctive attribute like "silver", "pro", "night", "gaming", OR use "canonical" if this is the standard/default version with no special variation}} # Object Rarity {{common, uncommon, rare, epic, or legendary based on object uniqueness}} # Monster Name {{Creative 8-11 letter name based on the SPECIFIC object, e.g., "Macbyte" for MacBook, "Towerfell" for Eiffel Tower}} # Primary Type {{beast, bug, aquatic, flora, mineral, space, machina, structure, culture, or cuisine}} # Physical Stats Height: {{e.g., "1.2m" or "3'5\\""}} Weight: {{e.g., "15kg" or "33 lbs"}} # Personality {{1-2 sentences describing personality traits based on the object's real-world function and characteristics}} # Physical Appearance {{2-3 paragraphs describing how the SPECIFIC object's visual features translate into monster features. Reference the actual object by name. Describe body structure, colors, textures, materials, distinctive markings, and how each physical element relates to the source object.}} # Lore & Behavior {{2-3 paragraphs describing the creature's behavior, habitat, abilities, and role in its ecosystem. What does it DO? Where does it live? How does it interact with its environment? What are its natural behaviors and powers that reflect the object's real-world function? This is the creature's background story and behavioral profile.}} # Monster Image Prompt {{Detailed 3-4 sentence visual description for anime-style image generation. Describe body structure, colors, textures, materials, distinctive features, personality-driven pose/expression, dynamic action or stance, environment/background setting, and atmospheric lighting. Be specific and detailed about visual elements. DO NOT mention the source object name or include phrases like "Inspired by [object]".}} ``` CRITICAL RULES: - Canonical Object MUST be SPECIFIC: "f-16 fighting falcon" not "jet", "macbook pro" not "laptop", "coca cola" not "soda" - If you can identify a brand, model, or proper name from the description, USE IT - Variation should be meaningful and distinctive (material, style, color, context, or model variant) - Physical Appearance must describe the CREATURE'S BODY with references to the specific object's visual features - Lore & Behavior must describe WHAT THE CREATURE DOES, not how it looks - Monster Image Prompt must be a detailed (3-4 sentences) pure visual description without mentioning the source object name - Monster Image Prompt must NOT include the monster's name or style prefixes like "Anime-style" or "Pokémon-style" - Primary Type must match the object category (machina for electronics/vehicles, structure for buildings, etc.)""" response_text = PicletGeneratorService.generate_text_with_gpt(concept_prompt, hf_token) # Parse the concept return PicletGeneratorService.parse_concept(response_text) @staticmethod def parse_concept(concept_text: str) -> dict: """Parse structured concept text into dict""" # Remove code block markers if present if '```' in concept_text: code_block_match = re.search(r'```(?:md|markdown)?\s*\n([\s\S]*?)```', concept_text) if code_block_match: concept_text = code_block_match.group(1).strip() def extract_section(text: str, section: str) -> str: """Extract content of a markdown section""" pattern = rf'\*{{0,2}}#\s*{re.escape(section)}\s*\*{{0,2}}\s*\n([\s\S]*?)(?=^\*{{0,2}}#|$)' match = re.search(pattern, text, re.MULTILINE) if match: content = match.group(1).strip() # Remove curly braces and quotes that GPT sometimes adds content = re.sub(r'^[{"]|["}]$', '', content) content = re.sub(r'^.*:\s*["\']|["\']$', '', content) return content.strip() return '' # Extract all sections object_name = extract_section(concept_text, 'Canonical Object').lower() variation_text = extract_section(concept_text, 'Variation') rarity_text = extract_section(concept_text, 'Object Rarity').lower() monster_name = extract_section(concept_text, 'Monster Name') primary_type = extract_section(concept_text, 'Primary Type').lower() # Extract both appearance and lore sections separately (keep them separate!) physical_appearance = extract_section(concept_text, 'Physical Appearance') lore_behavior = extract_section(concept_text, 'Lore & Behavior') image_prompt = extract_section(concept_text, 'Monster Image Prompt') # Parse physical stats physical_stats_text = extract_section(concept_text, 'Physical Stats') height_match = re.search(r'Height:\s*(.+)', physical_stats_text, re.IGNORECASE) weight_match = re.search(r'Weight:\s*(.+)', physical_stats_text, re.IGNORECASE) height = height_match.group(1).strip() if height_match else None weight = weight_match.group(1).strip() if weight_match else None personality = extract_section(concept_text, 'Personality') # Clean monster name if monster_name: monster_name = re.sub(r'\*+', '', monster_name) # Remove asterisks if ',' in monster_name: monster_name = monster_name.split(',')[0] if len(monster_name) > 12: monster_name = monster_name[:12] # Parse variation attributes = [] if variation_text and variation_text.lower() not in ['none', 'canonical', '']: attributes = [variation_text.lower()] # Map rarity to tier tier = 'medium' if 'common' in rarity_text: tier = 'low' elif 'uncommon' in rarity_text: tier = 'medium' elif 'rare' in rarity_text and 'epic' not in rarity_text: tier = 'high' elif 'legendary' in rarity_text or 'epic' in rarity_text or 'mythical' in rarity_text: tier = 'legendary' return { 'objectName': object_name, 'attributes': attributes, 'concept': concept_text, 'stats': { 'name': monster_name or 'Unknown', 'physicalAppearance': physical_appearance, 'lore': lore_behavior, 'tier': tier, 'primaryType': primary_type or 'beast', 'height': height, 'weight': weight, 'personality': personality }, 'imagePrompt': image_prompt } @staticmethod def generate_piclet_image(image_prompt: str, tier: str, hf_token: str) -> dict: """Generate Piclet image using Qwen-Image-Fast""" try: print(f"Connecting to Qwen-Image-Fast space...") client = Client( PicletGeneratorService.QWEN_IMAGE_SPACE, hf_token=hf_token ) # Build enhanced prompt for Pokemon-style anime art full_prompt = f"{image_prompt} Pokémon anime art style, idle pose, centered, full body visible in frame." print(f"Generating image with Qwen-Image-Fast...") print(f"Prompt: {full_prompt[:100]}...") # Qwen-Image-Fast API: infer(prompt, seed, randomize_seed, aspect_ratio, guidance_scale, num_inference_steps, prompt_enhance) result = client.predict( full_prompt, # prompt 0, # seed (will be randomized) True, # randomize_seed "3:4", # aspect_ratio (768x1024 - portrait) 1.0, # guidance_scale (default) 8, # num_inference_steps (default, optimized with Lightning LoRA) True, # prompt_enhance (uses LLM to enhance prompt) api_name="/infer" ) # Qwen returns: (PIL.Image, seed) tuple result_data = result.data if hasattr(result, 'data') else result image_data = result_data[0] if isinstance(result_data, (list, tuple)) else result_data seed = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else 0 # Handle different return formats (URL or PIL Image object) image_url = None if isinstance(image_data, str): image_url = image_data elif isinstance(image_data, dict): image_url = image_data.get('url') or image_data.get('path') elif hasattr(image_data, 'url'): image_url = image_data.url if not image_url: raise Exception("Failed to extract image URL from Qwen response") return { 'imageUrl': image_url, 'seed': seed, 'prompt': image_prompt } except Exception as e: print(f"Failed to generate image: {e}") raise Exception(f"Image generation failed: {str(e)}") @staticmethod def upload_image_to_dataset(image_path: str, file_name: str) -> str: """ Upload image to HuggingFace dataset Args: image_path: Local path to the image file (or URL to download from) file_name: Name for the file (e.g., "pillow_canonical.png") Returns: URL to the uploaded image in the dataset """ try: print(f"Uploading image to dataset: {file_name}") # Handle both local paths and URLs if image_path.startswith('http'): # Download from URL first import requests response = requests.get(image_path) with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f: f.write(response.content) temp_path = f.name else: # Use local path directly temp_path = image_path # Upload to HuggingFace dataset file_path = f"images/{file_name}" api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_path, repo_id=DATASET_REPO, repo_type=DATASET_TYPE, commit_message=f"Add piclet image: {file_name}" ) # Clean up temp file if we downloaded it if image_path.startswith('http'): os.unlink(temp_path) # Return the dataset URL dataset_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_path}" print(f"Image uploaded successfully: {dataset_url}") return dataset_url except Exception as e: print(f"Failed to upload image: {e}") raise Exception(f"Image upload failed: {str(e)}") # API Endpoints def search_piclet(object_name: str, attributes: List[str]) -> dict: """ Search for canonical Piclet or variations Returns matching piclet or None """ piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if not piclet_data: return { "status": "new", "message": f"No Piclet found for '{object_name}'", "piclet": None } # Check if searching for canonical (no attributes) if not attributes or len(attributes) == 0: return { "status": "existing", "message": f"Found canonical Piclet for '{object_name}'", "piclet": piclet_data.get("canonical") } # Search for matching variation variations = piclet_data.get("variations", []) for variation in variations: var_attrs = set(variation.get("attributes", [])) search_attrs = set(attributes) # Check for close match (at least 50% overlap) overlap = len(var_attrs.intersection(search_attrs)) if overlap >= len(search_attrs) * 0.5: return { "status": "variation", "message": f"Found variation of '{object_name}'", "piclet": variation, "canonicalId": piclet_data["canonical"]["typeId"] } # No variation found, suggest creating one return { "status": "new_variation", "message": f"No variation found for '{object_name}' with attributes {attributes}", "canonicalId": piclet_data["canonical"]["typeId"], "piclet": None } def create_canonical(object_name: str, piclet_data: str, token_or_username: str) -> dict: """ Create a new canonical Piclet Args: object_name: The normalized object name (e.g., "pillow") piclet_data: JSON string of Piclet instance data token_or_username: Either OAuth token (starts with "hf_") or username for testing Returns: Dict with success status and piclet data """ try: piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data # Determine if this is a token or username user_info = None if token_or_username and token_or_username.startswith('hf_'): # OAuth token - verify it user_info = verify_hf_token(token_or_username) if not user_info: return { "success": False, "error": "Invalid OAuth token" } else: # Legacy username mode (for testing) user_info = { "sub": f"legacy_{token_or_username}", "preferred_username": token_or_username, "name": token_or_username, "picture": None } # Get or create user profile user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) # Create canonical entry with full discoverer info canonical_data = { "canonical": { "objectName": object_name, "typeId": f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical", "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": 1, "picletData": piclet_json }, "variations": [] } # Save to dataset if PicletDiscoveryService.save_piclet_data(object_name, canonical_data): # Update user profile user_profile["discoveries"].append(canonical_data["canonical"]["typeId"]) user_profile["uniqueFinds"] += 1 user_profile["totalFinds"] += 1 user_profile["rarityScore"] += 100 # Bonus for canonical discovery PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) return { "success": True, "message": f"Created canonical Piclet for '{object_name}'", "piclet": canonical_data["canonical"] } else: return { "success": False, "error": "Failed to save canonical Piclet" } except Exception as e: return { "success": False, "error": str(e) } def create_variation(canonical_id: str, attributes: List[str], piclet_data: str, token_or_username: str, object_name: str) -> dict: """ Create a variation of an existing canonical Piclet with OAuth verification Args: canonical_id: ID of the canonical Piclet attributes: List of variation attributes piclet_data: JSON data for the Piclet token_or_username: Either OAuth token (starts with "hf_") or username for testing object_name: Normalized object name Returns: Success/error dict with variation data """ try: piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data # Verify token or use legacy mode user_info = None if token_or_username and token_or_username.startswith('hf_'): user_info = verify_hf_token(token_or_username) if not user_info: return {"success": False, "error": "Invalid OAuth token"} else: # Legacy mode for testing user_info = { "sub": f"legacy_{token_or_username}", "preferred_username": token_or_username, "name": token_or_username, "picture": None } # Get or create user profile user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) # Load existing data existing_data = PicletDiscoveryService.load_piclet_data(object_name) if not existing_data: return { "success": False, "error": f"Canonical Piclet not found for '{object_name}'" } # Create variation entry variation_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_{len(existing_data['variations']) + 1:03d}" variation = { "typeId": variation_id, "attributes": attributes, "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": 1, "picletData": piclet_json } # Add to variations existing_data["variations"].append(variation) # Save updated data if PicletDiscoveryService.save_piclet_data(object_name, existing_data): # Update user profile user_profile["discoveries"].append(variation_id) user_profile["totalFinds"] += 1 user_profile["rarityScore"] += 50 # Bonus for variation discovery PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) return { "success": True, "message": f"Created variation of '{object_name}'", "piclet": variation } else: return { "success": False, "error": "Failed to save variation" } except Exception as e: return { "success": False, "error": str(e) } def increment_scan_count(piclet_id: str, object_name: str) -> dict: """ Increment the scan count for a Piclet """ try: data = PicletDiscoveryService.load_piclet_data(object_name) if not data: return { "success": False, "error": "Piclet not found" } # Check canonical if data["canonical"]["typeId"] == piclet_id: data["canonical"]["scanCount"] = data["canonical"].get("scanCount", 0) + 1 scan_count = data["canonical"]["scanCount"] else: # Check variations for variation in data["variations"]: if variation["typeId"] == piclet_id: variation["scanCount"] = variation.get("scanCount", 0) + 1 scan_count = variation["scanCount"] break else: return { "success": False, "error": "Piclet ID not found" } # Save updated data if PicletDiscoveryService.save_piclet_data(object_name, data): return { "success": True, "scanCount": scan_count } else: return { "success": False, "error": "Failed to update scan count" } except Exception as e: return { "success": False, "error": str(e) } def generate_piclet(image, hf_token: str) -> dict: """ Complete Piclet generation workflow - single endpoint Takes user's image and hf_token, returns generated Piclet with discovery status Args: image: Uploaded image file (Gradio file input) hf_token: User's HuggingFace OAuth token Returns: { "success": bool, "piclet": {complete piclet data}, "discoveryStatus": "new" | "variation" | "existing", "canonicalId": str (if variation/existing), "message": str } """ try: # Validate token and get user info user_info = verify_hf_token(hf_token) if not user_info: return { "success": False, "error": "Invalid HuggingFace token" } print(f"Generating Piclet for user: {user_info.get('preferred_username', 'unknown')}") # Get user profile (creates if doesn't exist) user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) # Get image path from Gradio (type="filepath" gives us a string path) image_path = image if isinstance(image, str) else str(image) # Step 1: Generate caption print("Step 1/5: Generating image caption...") caption = PicletGeneratorService.generate_enhanced_caption(image_path, hf_token) # Step 2: Generate concept print("Step 2/5: Generating Piclet concept...") concept_data = PicletGeneratorService.generate_piclet_concept(caption, hf_token) object_name = concept_data['objectName'] attributes = concept_data['attributes'] stats = concept_data['stats'] image_prompt = concept_data['imagePrompt'] concept_text = concept_data['concept'] # Step 3: Generate image print("Step 3/5: Generating Piclet image...") image_result = PicletGeneratorService.generate_piclet_image( image_prompt, stats['tier'], hf_token ) # Step 4: Check for canonical/variation print("Step 4/5: Checking for existing canonical...") existing_data = PicletDiscoveryService.load_piclet_data(object_name) discovery_status = 'new' canonical_id = None scan_count = 1 if existing_data: # Check if this is an exact canonical match (no attributes) if not attributes or len(attributes) == 0: discovery_status = 'existing' canonical_id = existing_data['canonical']['typeId'] # Increment scan count existing_data['canonical']['scanCount'] = existing_data['canonical'].get('scanCount', 0) + 1 scan_count = existing_data['canonical']['scanCount'] PicletDiscoveryService.save_piclet_data(object_name, existing_data) else: # Check for matching variation variations = existing_data.get('variations', []) matched_variation = None for variation in variations: var_attrs = set(variation.get('attributes', [])) search_attrs = set(attributes) overlap = len(var_attrs.intersection(search_attrs)) if overlap >= len(search_attrs) * 0.5: matched_variation = variation discovery_status = 'existing' canonical_id = existing_data['canonical']['typeId'] # Increment variation scan count variation['scanCount'] = variation.get('scanCount', 0) + 1 scan_count = variation['scanCount'] PicletDiscoveryService.save_piclet_data(object_name, existing_data) break if not matched_variation: discovery_status = 'variation' canonical_id = existing_data['canonical']['typeId'] # Step 5: Save new discovery if needed print("Step 5/5: Saving to dataset...") if discovery_status == 'new': # Create new canonical type_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical" # Upload image to dataset with canonical filename normalized_name = PicletDiscoveryService.normalize_object_name(object_name) image_filename = f"{normalized_name}_canonical.png" dataset_image_url = PicletGeneratorService.upload_image_to_dataset( image_result['imageUrl'], image_filename ) canonical_data = { "canonical": { "objectName": object_name, "typeId": type_id, "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": scan_count, "picletData": { "typeId": type_id, "nickname": stats['name'], "stats": stats, "imageUrl": dataset_image_url, "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "createdAt": datetime.now().isoformat() } }, "variations": [] } canonical_id = type_id PicletDiscoveryService.save_piclet_data(object_name, canonical_data) # Update user profile user_profile["discoveries"].append(type_id) user_profile["uniqueFinds"] = user_profile.get("uniqueFinds", 0) + 1 user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 100 PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) elif discovery_status == 'variation': # Create new variation existing_data = PicletDiscoveryService.load_piclet_data(object_name) variation_num = len(existing_data['variations']) + 1 normalized_name = PicletDiscoveryService.normalize_object_name(object_name) variation_id = f"{normalized_name}_{variation_num:03d}" # Upload image to dataset with variation filename image_filename = f"{normalized_name}_{variation_num:03d}.png" dataset_image_url = PicletGeneratorService.upload_image_to_dataset( image_result['imageUrl'], image_filename ) variation_data = { "typeId": variation_id, "attributes": attributes, "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": scan_count, "picletData": { "typeId": variation_id, "nickname": stats['name'], "stats": stats, "imageUrl": dataset_image_url, "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "createdAt": datetime.now().isoformat() } } existing_data['variations'].append(variation_data) PicletDiscoveryService.save_piclet_data(object_name, existing_data) # Update user profile user_profile["discoveries"].append(variation_id) user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 50 PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) # Build complete response # For existing piclets, get the stored data; for new/variation, use generated data if discovery_status == 'existing': # Load the existing piclet data to return existing_piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if existing_piclet_data and existing_piclet_data.get('canonical'): existing_canonical = existing_piclet_data['canonical'] piclet_data = existing_canonical.get('picletData', {}) piclet_data['discoveryStatus'] = discovery_status piclet_data['scanCount'] = existing_canonical.get('scanCount', 1) else: # Fallback if data not found piclet_data = { "typeId": canonical_id, "nickname": stats['name'], "stats": stats, "imageUrl": image_result.get('imageUrl', ''), "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "objectName": object_name, "attributes": attributes, "discoveryStatus": discovery_status, "scanCount": scan_count, "createdAt": datetime.now().isoformat() } else: # For new and variation, determine the correct dataset URL if discovery_status == 'new': normalized_name = PicletDiscoveryService.normalize_object_name(object_name) image_filename = f"{normalized_name}_canonical.png" else: # variation normalized_name = PicletDiscoveryService.normalize_object_name(object_name) existing_data = PicletDiscoveryService.load_piclet_data(object_name) variation_num = len(existing_data.get('variations', [])) image_filename = f"{normalized_name}_{variation_num:03d}.png" dataset_image_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/images/{image_filename}" piclet_data = { "typeId": canonical_id, "nickname": stats['name'], "stats": stats, "imageUrl": dataset_image_url, "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "objectName": object_name, "attributes": attributes, "discoveryStatus": discovery_status, "scanCount": scan_count, "createdAt": datetime.now().isoformat() } messages = { 'new': f"Congratulations! You discovered the first {object_name} Piclet!", 'variation': f"You found a new variation of {object_name}!", 'existing': f"You encountered a known {object_name} Piclet." } return { "success": True, "piclet": piclet_data, "discoveryStatus": discovery_status, "canonicalId": canonical_id, "message": messages.get(discovery_status, "Piclet generated!") } except Exception as e: print(f"Failed to generate Piclet: {e}") import traceback traceback.print_exc() return { "success": False, "error": str(e) } def get_object_details(object_name: str) -> dict: """ Get complete details for an object (canonical + all variations) Args: object_name: The object name (e.g., "pillow", "macbook") Returns: { "success": bool, "objectName": str, "canonical": {canonical data}, "variations": [list of variations], "totalScans": int } """ try: # Load the object data piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if not piclet_data: return { "success": False, "error": f"No piclet found for object '{object_name}'", "objectName": object_name } # Calculate total scans across canonical and variations total_scans = piclet_data['canonical'].get('scanCount', 0) for variation in piclet_data.get('variations', []): total_scans += variation.get('scanCount', 0) return { "success": True, "objectName": object_name, "canonical": piclet_data['canonical'], "variations": piclet_data.get('variations', []), "totalScans": total_scans, "variationCount": len(piclet_data.get('variations', [])) } except Exception as e: print(f"Failed to get object details: {e}") return { "success": False, "error": str(e), "objectName": object_name } def get_user_piclets(hf_token: str) -> dict: """ Get all Piclets discovered by a specific user Args: hf_token: User's HuggingFace OAuth token Returns: { "success": bool, "piclets": [list of piclet discoveries], "stats": {user stats} } """ try: # Verify token and get user info user_info = verify_hf_token(hf_token) if not user_info: return { "success": False, "error": "Invalid HuggingFace token", "piclets": [] } # Load user profile user_profile = PicletDiscoveryService.load_user_data(user_info['sub']) # Get list of discoveries discoveries = user_profile.get('discoveries', []) piclets = [] # Load each discovered piclet for type_id in discoveries: # Extract object name from type_id (e.g., "pillow_canonical" -> "pillow") object_name = type_id.rsplit('_', 1)[0] # Load the piclet data piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if piclet_data: # Check if it's canonical or variation if piclet_data['canonical']['typeId'] == type_id: piclets.append({ 'type': 'canonical', 'typeId': type_id, 'objectName': object_name, 'discoveredAt': piclet_data['canonical']['discoveredAt'], 'scanCount': piclet_data['canonical'].get('scanCount', 1), 'picletData': piclet_data['canonical'].get('picletData', {}) }) else: # Find matching variation for variation in piclet_data.get('variations', []): if variation['typeId'] == type_id: piclets.append({ 'type': 'variation', 'typeId': type_id, 'objectName': object_name, 'attributes': variation.get('attributes', []), 'discoveredAt': variation['discoveredAt'], 'scanCount': variation.get('scanCount', 1), 'picletData': variation.get('picletData', {}) }) break # Sort by discovery date (most recent first) piclets.sort(key=lambda x: x.get('discoveredAt', ''), reverse=True) return { "success": True, "piclets": piclets, "stats": { "username": user_info.get('preferred_username'), "name": user_info.get('name'), "picture": user_info.get('picture'), "totalFinds": user_profile.get('totalFinds', 0), "uniqueFinds": user_profile.get('uniqueFinds', 0), "rarityScore": user_profile.get('rarityScore', 0), "joinedAt": user_profile.get('joinedAt') } } except Exception as e: print(f"Failed to get user piclets: {e}") return { "success": False, "error": str(e), "piclets": [] } def get_recent_activity(limit: int = 20) -> dict: """ Get recent discoveries across all users """ try: activities = [] # List all piclet files try: files = list_repo_files( repo_id=DATASET_REPO, repo_type=DATASET_TYPE, token=HF_TOKEN ) piclet_files = [f for f in files if f.startswith("piclets/") and f.endswith(".json")] except: piclet_files = [] # Load recent piclets (simplified - in production, maintain a separate activity log) for file_path in piclet_files[-limit:]: try: object_name = file_path.replace("piclets/", "").replace(".json", "") data = PicletDiscoveryService.load_piclet_data(object_name) if data: # Add canonical discovery canonical = data["canonical"] activities.append({ "type": "discovery", "objectName": object_name, "typeId": canonical["typeId"], "discoveredBy": canonical["discoveredBy"], "discoveredAt": canonical["discoveredAt"], "scanCount": canonical.get("scanCount", 1) }) # Add recent variations for variation in data.get("variations", [])[-5:]: activities.append({ "type": "variation", "objectName": object_name, "typeId": variation["typeId"], "attributes": variation["attributes"], "discoveredBy": variation["discoveredBy"], "discoveredAt": variation["discoveredAt"], "scanCount": variation.get("scanCount", 1) }) except: continue # Sort by discovery date activities.sort(key=lambda x: x.get("discoveredAt", ""), reverse=True) return { "success": True, "activities": activities[:limit] } except Exception as e: return { "success": False, "error": str(e), "activities": [] } def get_leaderboard(limit: int = 10) -> dict: """ Get top discoverers """ try: leaderboard = [] # List all user files try: files = list_repo_files( repo_id=DATASET_REPO, repo_type=DATASET_TYPE, token=HF_TOKEN ) user_files = [f for f in files if f.startswith("users/") and f.endswith(".json")] except: user_files = [] # Load user data for file_path in user_files: try: username = file_path.replace("users/", "").replace(".json", "") user_data = PicletDiscoveryService.load_user_data(username) leaderboard.append({ "username": username, "totalFinds": user_data.get("totalFinds", 0), "uniqueFinds": user_data.get("uniqueFinds", 0), "rarityScore": user_data.get("rarityScore", 0) }) except: continue # Sort by rarity score leaderboard.sort(key=lambda x: x["rarityScore"], reverse=True) # Add ranks for i, entry in enumerate(leaderboard[:limit]): entry["rank"] = i + 1 return { "success": True, "leaderboard": leaderboard[:limit] } except Exception as e: return { "success": False, "error": str(e), "leaderboard": [] } # Create Gradio interface with gr.Blocks(title="Piclets Discovery Server") as app: gr.Markdown(""" # 🔍 Piclets Discovery Server Backend service for the Piclets discovery game. Each real-world object has ONE canonical Piclet! """) with gr.Tab("Generate Piclet"): gr.Markdown(""" ## 🎮 Complete Piclet Generator Upload an image and provide your HuggingFace token to generate a complete Piclet. This endpoint handles the entire workflow: captioning, concept generation, image creation, and dataset storage. """) with gr.Row(): with gr.Column(): gen_image = gr.Image(label="Upload Image", type="filepath") gen_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") gen_btn = gr.Button("Generate Piclet", variant="primary") with gr.Column(): gen_result = gr.JSON(label="Generated Piclet Result") gen_btn.click( fn=generate_piclet, inputs=[gen_image, gen_token], outputs=gen_result ) with gr.Tab("My Piclets"): gr.Markdown(""" ## 📚 Your Discovery Collection View all Piclets you've discovered (includes your stats). """) with gr.Row(): with gr.Column(): my_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") my_btn = gr.Button("Get My Piclets", variant="primary") with gr.Column(): my_result = gr.JSON(label="My Piclets") my_btn.click( fn=get_user_piclets, inputs=my_token, outputs=my_result ) with gr.Tab("Object Details"): gr.Markdown(""" ## 🔍 View Object Details Get complete information about an object (canonical + all variations). """) with gr.Row(): with gr.Column(): obj_name = gr.Textbox(label="Object Name", placeholder="e.g., pillow, macbook") obj_btn = gr.Button("Get Details", variant="primary") with gr.Column(): obj_result = gr.JSON(label="Object Details") obj_btn.click( fn=get_object_details, inputs=obj_name, outputs=obj_result ) with gr.Tab("Recent Activity"): activity_limit = gr.Slider(5, 50, value=20, label="Number of Activities") activity_btn = gr.Button("Get Recent Activity") activity_result = gr.JSON(label="Recent Discoveries") activity_btn.click( fn=get_recent_activity, inputs=activity_limit, outputs=activity_result ) with gr.Tab("Leaderboard"): leader_limit = gr.Slider(5, 20, value=10, label="Top N Discoverers") leader_btn = gr.Button("Get Leaderboard") leader_result = gr.JSON(label="Top Discoverers") leader_btn.click( fn=get_leaderboard, inputs=leader_limit, outputs=leader_result ) # API Documentation gr.Markdown(""" ## 🔌 Public API Endpoints All endpoints return JSON responses. The frontend only needs these 5 endpoints: ### 1. **generate_piclet** (Scanner) Complete Piclet generation workflow. - Input: `image` (File), `hf_token` (string) - Output: Generated Piclet with discovery status ### 2. **get_user_piclets** (User Collection) Get user's discovered Piclets and stats. - Input: `hf_token` (string) - Output: List of Piclets + user stats (total/unique finds, rarity score) ### 3. **get_object_details** (Object Data) Get complete object info (canonical + all variations). - Input: `object_name` (string) - Output: Canonical + variations + total scans ### 4. **get_recent_activity** (Activity Feed) Recent discoveries across all users. - Input: `limit` (int, default 20) - Output: Recent discoveries with timestamps ### 5. **get_leaderboard** (Top Users) Top discoverers by rarity score. - Input: `limit` (int, default 10) - Output: Ranked users with stats --- *Note: Internal helper functions (search_piclet, create_canonical, etc.) are used by generate_piclet but not exposed to frontend.* """) if __name__ == "__main__": # Protect web UI with authentication while allowing API access admin_password = os.getenv("ADMIN_PASSWORD", "changeme") # Configure for HuggingFace Space environment app.launch()