piclets-server / app.py
Fraser's picture
new image generator (drops tier designs)
17077fb
import gradio as gr
from gradio_client import Client, handle_file
import json
import os
import re
from datetime import datetime
from typing import List, Optional
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
from pathlib import Path
import tempfile
from auth import verify_hf_token
# HuggingFace configuration
HF_TOKEN = os.getenv("HF_TOKEN") # Required for writing to dataset
DATASET_REPO = "Fraser/piclets" # Public dataset repository
DATASET_TYPE = "dataset"
# Initialize HuggingFace API with token if available
api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi()
# Cache directory for local operations
CACHE_DIR = Path("cache")
CACHE_DIR.mkdir(exist_ok=True)
class PicletDiscoveryService:
"""Manages Piclet discovery using HuggingFace datasets"""
@staticmethod
def normalize_object_name(name: str) -> str:
"""
Normalize object names for consistent storage and lookup
Examples: "The Blue Pillow" -> "pillow", "wooden chairs" -> "wooden_chair"
"""
if not name:
return "unknown"
# Convert to lowercase and strip
name = name.lower().strip()
# Remove articles (the, a, an)
name = re.sub(r'^(the|a|an)\s+', '', name)
# Remove special characters except spaces
name = re.sub(r'[^a-z0-9\s]', '', name)
# Handle common plurals (basic pluralization rules)
if name.endswith('ies') and len(name) > 4:
name = name[:-3] + 'y' # berries -> berry
elif name.endswith('ves') and len(name) > 4:
name = name[:-3] + 'f' # leaves -> leaf
elif name.endswith('es') and len(name) > 3:
# Check if it's a special case like "glasses"
if not name.endswith(('ses', 'xes', 'zes', 'ches', 'shes')):
name = name[:-2] # boxes -> box (but keep glasses)
elif name.endswith('s') and len(name) > 2 and not name.endswith('ss'):
name = name[:-1] # chairs -> chair (but keep glass)
# Replace spaces with underscores
name = re.sub(r'\s+', '_', name.strip())
return name
@staticmethod
def load_piclet_data(object_name: str) -> Optional[dict]:
"""Load Piclet data from HuggingFace dataset"""
try:
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
file_path = f"piclets/{normalized_name}.json"
# Download the file from HuggingFace
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=file_path,
repo_type=DATASET_TYPE,
token=HF_TOKEN,
cache_dir=str(CACHE_DIR)
)
with open(local_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Could not load piclet data for {object_name}: {e}")
return None
@staticmethod
def save_piclet_data(object_name: str, data: dict) -> bool:
"""Save Piclet data to HuggingFace dataset"""
try:
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
file_path = f"piclets/{normalized_name}.json"
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(data, f, indent=2)
temp_path = f.name
# Upload to HuggingFace
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=file_path,
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
commit_message=f"Update piclet: {normalized_name}"
)
# Clean up
os.unlink(temp_path)
return True
except Exception as e:
print(f"Failed to save piclet data: {e}")
return False
@staticmethod
def load_user_data(sub: str) -> dict:
"""
Load user profile from dataset by HF user ID (sub)
Args:
sub: HuggingFace user ID (stable identifier)
Returns:
User profile dict or default profile if not found
"""
try:
file_path = f"users/{sub}.json"
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=file_path,
repo_type=DATASET_TYPE,
token=HF_TOKEN,
cache_dir=str(CACHE_DIR)
)
with open(local_path, 'r') as f:
return json.load(f)
except:
# Return default user profile if not found
# Will be populated with actual data on first save
return {
"sub": sub,
"preferred_username": None,
"name": None,
"picture": None,
"joinedAt": datetime.now().isoformat(),
"lastSeen": datetime.now().isoformat(),
"discoveries": [],
"uniqueFinds": 0,
"totalFinds": 0,
"rarityScore": 0,
"visibility": "public"
}
@staticmethod
def save_user_data(sub: str, data: dict) -> bool:
"""
Save user profile to dataset by HF user ID (sub)
Args:
sub: HuggingFace user ID (stable identifier)
data: User profile dict
Returns:
True if successful, False otherwise
"""
try:
file_path = f"users/{sub}.json"
# Update lastSeen timestamp
data["lastSeen"] = datetime.now().isoformat()
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(data, f, indent=2)
temp_path = f.name
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=file_path,
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
commit_message=f"Update user profile: {data.get('preferred_username', sub)}"
)
os.unlink(temp_path)
return True
except Exception as e:
print(f"Failed to save user data: {e}")
return False
@staticmethod
def get_or_create_user_profile(user_info: dict) -> dict:
"""
Get existing user profile or create new one from OAuth user_info
Refreshes cached profile data on each call
Args:
user_info: OAuth user info from HF (sub, preferred_username, name, picture)
Returns:
User profile dict
"""
sub = user_info['sub']
# Load existing profile
profile = PicletDiscoveryService.load_user_data(sub)
# Update cached profile fields from OAuth
profile['sub'] = sub
profile['preferred_username'] = user_info.get('preferred_username')
profile['name'] = user_info.get('name')
profile['picture'] = user_info.get('picture')
profile['email'] = user_info.get('email')
# Set joinedAt only if this is a new profile
if 'joinedAt' not in profile or not profile['joinedAt']:
profile['joinedAt'] = datetime.now().isoformat()
return profile
@staticmethod
def update_global_stats() -> dict:
"""Update and return global statistics"""
try:
# Try to load existing stats
try:
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename="metadata/stats.json",
repo_type=DATASET_TYPE,
token=HF_TOKEN,
cache_dir=str(CACHE_DIR)
)
with open(local_path, 'r') as f:
stats = json.load(f)
except:
stats = {
"totalDiscoveries": 0,
"uniqueObjects": 0,
"totalVariations": 0,
"lastUpdated": datetime.now().isoformat()
}
return stats
except Exception as e:
print(f"Failed to update global stats: {e}")
return {}
class PicletGeneratorService:
"""
Orchestrates Piclet generation by calling external AI services
Uses user's hf_token to consume their GPU quota
"""
# Space endpoints
JOY_CAPTION_SPACE = "fancyfeast/joy-caption-alpha-two"
GPT_OSS_SPACE = "amd/gpt-oss-120b-chatbot"
QWEN_IMAGE_SPACE = "multimodalart/Qwen-Image-Fast"
@staticmethod
def generate_enhanced_caption(image_path: str, hf_token: str) -> str:
"""Generate detailed image description using JoyCaption
Args:
image_path: Path to image file
hf_token: User's HuggingFace token
"""
try:
print(f"Connecting to JoyCaption space with user token...")
client = Client(
PicletGeneratorService.JOY_CAPTION_SPACE,
hf_token=hf_token
)
print(f"Generating caption for image...")
result = client.predict(
handle_file(image_path), # Wrap path so client uploads file
"Descriptive", # caption_type
"medium-length", # caption_length
[], # extra_options
"", # name_input
"Describe this image in detail, identifying any recognizable objects, brands, logos, or specific models. Be specific about product names and types.", # custom_prompt
api_name="/stream_chat"
)
# JoyCaption returns tuple: (prompt_used, caption_text) in .data
result_data = result.data if hasattr(result, 'data') else result
caption = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else str(result_data)
print(f"Caption generated: {caption[:100]}...")
return caption
except Exception as e:
print(f"Failed to generate caption: {e}")
raise Exception(f"Caption generation failed: {str(e)}")
@staticmethod
def generate_text_with_gpt(prompt: str, hf_token: str) -> str:
"""Generate text using GPT-OSS-120B"""
try:
print(f"Connecting to GPT-OSS space...")
client = Client(
PicletGeneratorService.GPT_OSS_SPACE,
hf_token=hf_token
)
print(f"Generating text...")
result = client.predict(
prompt, # message (positional)
"You are a helpful assistant that creates Pokรฉmon-style monster concepts based on real-world objects.", # system_prompt (positional)
0.7, # temperature (positional)
api_name="/chat"
)
# Extract response text (GPT-OSS formats with Analysis and Response)
result_data = result.data if hasattr(result, 'data') else result
response_text = result_data[0] if isinstance(result_data, (list, tuple)) else str(result_data)
# Try to extract Response section
response_match = re.search(r'\*\*๐Ÿ’ฌ Response:\*\*\s*\n\n([\s\S]*)', response_text)
if response_match:
return response_match.group(1).strip()
# Fallback: extract after "assistantfinal"
final_match = re.search(r'assistantfinal\s*([\s\S]*)', response_text)
if final_match:
return final_match.group(1).strip()
return response_text
except Exception as e:
print(f"Failed to generate text: {e}")
raise Exception(f"Text generation failed: {str(e)}")
@staticmethod
def generate_piclet_concept(caption: str, hf_token: str) -> dict:
"""
Generate complete Piclet concept from image caption
Returns parsed concept with object name, variation, stats, etc.
"""
concept_prompt = f"""You are analyzing an image to create a Pokรฉmon-style creature. Here's the image description:
"{caption}"
STEP 1 - REASONING (think through these before writing):
1. What is the PRIMARY PHYSICAL OBJECT? Be SPECIFIC (e.g., "f-16 fighting falcon" not "jet", "macbook pro" not "laptop")
2. What is this object's real-world PURPOSE and FUNCTION?
3. What PERSONALITY traits would naturally emerge from this object's characteristics?
- If it's fast โ†’ energetic, agile
- If it's protective โ†’ loyal, defensive
- If it's precise โ†’ disciplined, focused
4. What NATURAL HABITAT suits this object-turned-creature?
- Electronics โ†’ urban/tech environments
- Vehicles โ†’ roads, skies, waters they traverse
- Tools โ†’ workshops, sites where they're used
5. What BEHAVIORS and ABILITIES reflect the object's function?
- What does the object DO in real life?
- How would those actions become creature abilities?
6. What are the object's most ICONIC VISUAL FEATURES that define it?
STEP 2 - FORMAT your complete concept EXACTLY as follows:
```md
# Canonical Object
{{Specific object name: "macbook", "eiffel tower", "iphone", "tesla", "le creuset mug", "nintendo switch"}}
{{NOT generic terms like: "laptop", "tower", "phone", "car", "mug", "console"}}
{{Include brand/model/landmark name when identifiable}}
# Variation
{{OPTIONAL: one distinctive attribute like "silver", "pro", "night", "gaming", OR use "canonical" if this is the standard/default version with no special variation}}
# Object Rarity
{{common, uncommon, rare, epic, or legendary based on object uniqueness}}
# Monster Name
{{Creative 8-11 letter name based on the SPECIFIC object, e.g., "Macbyte" for MacBook, "Towerfell" for Eiffel Tower}}
# Primary Type
{{beast, bug, aquatic, flora, mineral, space, machina, structure, culture, or cuisine}}
# Physical Stats
Height: {{e.g., "1.2m" or "3'5\\""}}
Weight: {{e.g., "15kg" or "33 lbs"}}
# Personality
{{1-2 sentences describing personality traits based on the object's real-world function and characteristics}}
# Physical Appearance
{{2-3 paragraphs describing how the SPECIFIC object's visual features translate into monster features. Reference the actual object by name. Describe body structure, colors, textures, materials, distinctive markings, and how each physical element relates to the source object.}}
# Lore & Behavior
{{2-3 paragraphs describing the creature's behavior, habitat, abilities, and role in its ecosystem. What does it DO? Where does it live? How does it interact with its environment? What are its natural behaviors and powers that reflect the object's real-world function? This is the creature's background story and behavioral profile.}}
# Monster Image Prompt
{{Detailed 3-4 sentence visual description for anime-style image generation. Describe body structure, colors, textures, materials, distinctive features, personality-driven pose/expression, dynamic action or stance, environment/background setting, and atmospheric lighting. Be specific and detailed about visual elements. DO NOT mention the source object name or include phrases like "Inspired by [object]".}}
```
CRITICAL RULES:
- Canonical Object MUST be SPECIFIC: "f-16 fighting falcon" not "jet", "macbook pro" not "laptop", "coca cola" not "soda"
- If you can identify a brand, model, or proper name from the description, USE IT
- Variation should be meaningful and distinctive (material, style, color, context, or model variant)
- Physical Appearance must describe the CREATURE'S BODY with references to the specific object's visual features
- Lore & Behavior must describe WHAT THE CREATURE DOES, not how it looks
- Monster Image Prompt must be a detailed (3-4 sentences) pure visual description without mentioning the source object name
- Monster Image Prompt must NOT include the monster's name or style prefixes like "Anime-style" or "Pokรฉmon-style"
- Primary Type must match the object category (machina for electronics/vehicles, structure for buildings, etc.)"""
response_text = PicletGeneratorService.generate_text_with_gpt(concept_prompt, hf_token)
# Parse the concept
return PicletGeneratorService.parse_concept(response_text)
@staticmethod
def parse_concept(concept_text: str) -> dict:
"""Parse structured concept text into dict"""
# Remove code block markers if present
if '```' in concept_text:
code_block_match = re.search(r'```(?:md|markdown)?\s*\n([\s\S]*?)```', concept_text)
if code_block_match:
concept_text = code_block_match.group(1).strip()
def extract_section(text: str, section: str) -> str:
"""Extract content of a markdown section"""
pattern = rf'\*{{0,2}}#\s*{re.escape(section)}\s*\*{{0,2}}\s*\n([\s\S]*?)(?=^\*{{0,2}}#|$)'
match = re.search(pattern, text, re.MULTILINE)
if match:
content = match.group(1).strip()
# Remove curly braces and quotes that GPT sometimes adds
content = re.sub(r'^[{"]|["}]$', '', content)
content = re.sub(r'^.*:\s*["\']|["\']$', '', content)
return content.strip()
return ''
# Extract all sections
object_name = extract_section(concept_text, 'Canonical Object').lower()
variation_text = extract_section(concept_text, 'Variation')
rarity_text = extract_section(concept_text, 'Object Rarity').lower()
monster_name = extract_section(concept_text, 'Monster Name')
primary_type = extract_section(concept_text, 'Primary Type').lower()
# Extract both appearance and lore sections separately (keep them separate!)
physical_appearance = extract_section(concept_text, 'Physical Appearance')
lore_behavior = extract_section(concept_text, 'Lore & Behavior')
image_prompt = extract_section(concept_text, 'Monster Image Prompt')
# Parse physical stats
physical_stats_text = extract_section(concept_text, 'Physical Stats')
height_match = re.search(r'Height:\s*(.+)', physical_stats_text, re.IGNORECASE)
weight_match = re.search(r'Weight:\s*(.+)', physical_stats_text, re.IGNORECASE)
height = height_match.group(1).strip() if height_match else None
weight = weight_match.group(1).strip() if weight_match else None
personality = extract_section(concept_text, 'Personality')
# Clean monster name
if monster_name:
monster_name = re.sub(r'\*+', '', monster_name) # Remove asterisks
if ',' in monster_name:
monster_name = monster_name.split(',')[0]
if len(monster_name) > 12:
monster_name = monster_name[:12]
# Parse variation
attributes = []
if variation_text and variation_text.lower() not in ['none', 'canonical', '']:
attributes = [variation_text.lower()]
# Map rarity to tier
tier = 'medium'
if 'common' in rarity_text:
tier = 'low'
elif 'uncommon' in rarity_text:
tier = 'medium'
elif 'rare' in rarity_text and 'epic' not in rarity_text:
tier = 'high'
elif 'legendary' in rarity_text or 'epic' in rarity_text or 'mythical' in rarity_text:
tier = 'legendary'
return {
'objectName': object_name,
'attributes': attributes,
'concept': concept_text,
'stats': {
'name': monster_name or 'Unknown',
'physicalAppearance': physical_appearance,
'lore': lore_behavior,
'tier': tier,
'primaryType': primary_type or 'beast',
'height': height,
'weight': weight,
'personality': personality
},
'imagePrompt': image_prompt
}
@staticmethod
def generate_piclet_image(image_prompt: str, tier: str, hf_token: str) -> dict:
"""Generate Piclet image using Qwen-Image-Fast"""
try:
print(f"Connecting to Qwen-Image-Fast space...")
client = Client(
PicletGeneratorService.QWEN_IMAGE_SPACE,
hf_token=hf_token
)
# Build enhanced prompt for Pokemon-style anime art
full_prompt = f"{image_prompt} Pokรฉmon anime art style, idle pose, centered, full body visible in frame."
print(f"Generating image with Qwen-Image-Fast...")
print(f"Prompt: {full_prompt[:100]}...")
# Qwen-Image-Fast API: infer(prompt, seed, randomize_seed, aspect_ratio, guidance_scale, num_inference_steps, prompt_enhance)
result = client.predict(
full_prompt, # prompt
0, # seed (will be randomized)
True, # randomize_seed
"3:4", # aspect_ratio (768x1024 - portrait)
1.0, # guidance_scale (default)
8, # num_inference_steps (default, optimized with Lightning LoRA)
True, # prompt_enhance (uses LLM to enhance prompt)
api_name="/infer"
)
# Qwen returns: (PIL.Image, seed) tuple
result_data = result.data if hasattr(result, 'data') else result
image_data = result_data[0] if isinstance(result_data, (list, tuple)) else result_data
seed = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else 0
# Handle different return formats (URL or PIL Image object)
image_url = None
if isinstance(image_data, str):
image_url = image_data
elif isinstance(image_data, dict):
image_url = image_data.get('url') or image_data.get('path')
elif hasattr(image_data, 'url'):
image_url = image_data.url
if not image_url:
raise Exception("Failed to extract image URL from Qwen response")
return {
'imageUrl': image_url,
'seed': seed,
'prompt': image_prompt
}
except Exception as e:
print(f"Failed to generate image: {e}")
raise Exception(f"Image generation failed: {str(e)}")
@staticmethod
def upload_image_to_dataset(image_path: str, file_name: str) -> str:
"""
Upload image to HuggingFace dataset
Args:
image_path: Local path to the image file (or URL to download from)
file_name: Name for the file (e.g., "pillow_canonical.png")
Returns:
URL to the uploaded image in the dataset
"""
try:
print(f"Uploading image to dataset: {file_name}")
# Handle both local paths and URLs
if image_path.startswith('http'):
# Download from URL first
import requests
response = requests.get(image_path)
with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f:
f.write(response.content)
temp_path = f.name
else:
# Use local path directly
temp_path = image_path
# Upload to HuggingFace dataset
file_path = f"images/{file_name}"
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=file_path,
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
commit_message=f"Add piclet image: {file_name}"
)
# Clean up temp file if we downloaded it
if image_path.startswith('http'):
os.unlink(temp_path)
# Return the dataset URL
dataset_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_path}"
print(f"Image uploaded successfully: {dataset_url}")
return dataset_url
except Exception as e:
print(f"Failed to upload image: {e}")
raise Exception(f"Image upload failed: {str(e)}")
# API Endpoints
def search_piclet(object_name: str, attributes: List[str]) -> dict:
"""
Search for canonical Piclet or variations
Returns matching piclet or None
"""
piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if not piclet_data:
return {
"status": "new",
"message": f"No Piclet found for '{object_name}'",
"piclet": None
}
# Check if searching for canonical (no attributes)
if not attributes or len(attributes) == 0:
return {
"status": "existing",
"message": f"Found canonical Piclet for '{object_name}'",
"piclet": piclet_data.get("canonical")
}
# Search for matching variation
variations = piclet_data.get("variations", [])
for variation in variations:
var_attrs = set(variation.get("attributes", []))
search_attrs = set(attributes)
# Check for close match (at least 50% overlap)
overlap = len(var_attrs.intersection(search_attrs))
if overlap >= len(search_attrs) * 0.5:
return {
"status": "variation",
"message": f"Found variation of '{object_name}'",
"piclet": variation,
"canonicalId": piclet_data["canonical"]["typeId"]
}
# No variation found, suggest creating one
return {
"status": "new_variation",
"message": f"No variation found for '{object_name}' with attributes {attributes}",
"canonicalId": piclet_data["canonical"]["typeId"],
"piclet": None
}
def create_canonical(object_name: str, piclet_data: str, token_or_username: str) -> dict:
"""
Create a new canonical Piclet
Args:
object_name: The normalized object name (e.g., "pillow")
piclet_data: JSON string of Piclet instance data
token_or_username: Either OAuth token (starts with "hf_") or username for testing
Returns:
Dict with success status and piclet data
"""
try:
piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data
# Determine if this is a token or username
user_info = None
if token_or_username and token_or_username.startswith('hf_'):
# OAuth token - verify it
user_info = verify_hf_token(token_or_username)
if not user_info:
return {
"success": False,
"error": "Invalid OAuth token"
}
else:
# Legacy username mode (for testing)
user_info = {
"sub": f"legacy_{token_or_username}",
"preferred_username": token_or_username,
"name": token_or_username,
"picture": None
}
# Get or create user profile
user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info)
# Create canonical entry with full discoverer info
canonical_data = {
"canonical": {
"objectName": object_name,
"typeId": f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical",
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": 1,
"picletData": piclet_json
},
"variations": []
}
# Save to dataset
if PicletDiscoveryService.save_piclet_data(object_name, canonical_data):
# Update user profile
user_profile["discoveries"].append(canonical_data["canonical"]["typeId"])
user_profile["uniqueFinds"] += 1
user_profile["totalFinds"] += 1
user_profile["rarityScore"] += 100 # Bonus for canonical discovery
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
return {
"success": True,
"message": f"Created canonical Piclet for '{object_name}'",
"piclet": canonical_data["canonical"]
}
else:
return {
"success": False,
"error": "Failed to save canonical Piclet"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def create_variation(canonical_id: str, attributes: List[str], piclet_data: str, token_or_username: str, object_name: str) -> dict:
"""
Create a variation of an existing canonical Piclet with OAuth verification
Args:
canonical_id: ID of the canonical Piclet
attributes: List of variation attributes
piclet_data: JSON data for the Piclet
token_or_username: Either OAuth token (starts with "hf_") or username for testing
object_name: Normalized object name
Returns:
Success/error dict with variation data
"""
try:
piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data
# Verify token or use legacy mode
user_info = None
if token_or_username and token_or_username.startswith('hf_'):
user_info = verify_hf_token(token_or_username)
if not user_info:
return {"success": False, "error": "Invalid OAuth token"}
else:
# Legacy mode for testing
user_info = {
"sub": f"legacy_{token_or_username}",
"preferred_username": token_or_username,
"name": token_or_username,
"picture": None
}
# Get or create user profile
user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info)
# Load existing data
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
if not existing_data:
return {
"success": False,
"error": f"Canonical Piclet not found for '{object_name}'"
}
# Create variation entry
variation_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_{len(existing_data['variations']) + 1:03d}"
variation = {
"typeId": variation_id,
"attributes": attributes,
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": 1,
"picletData": piclet_json
}
# Add to variations
existing_data["variations"].append(variation)
# Save updated data
if PicletDiscoveryService.save_piclet_data(object_name, existing_data):
# Update user profile
user_profile["discoveries"].append(variation_id)
user_profile["totalFinds"] += 1
user_profile["rarityScore"] += 50 # Bonus for variation discovery
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
return {
"success": True,
"message": f"Created variation of '{object_name}'",
"piclet": variation
}
else:
return {
"success": False,
"error": "Failed to save variation"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def increment_scan_count(piclet_id: str, object_name: str) -> dict:
"""
Increment the scan count for a Piclet
"""
try:
data = PicletDiscoveryService.load_piclet_data(object_name)
if not data:
return {
"success": False,
"error": "Piclet not found"
}
# Check canonical
if data["canonical"]["typeId"] == piclet_id:
data["canonical"]["scanCount"] = data["canonical"].get("scanCount", 0) + 1
scan_count = data["canonical"]["scanCount"]
else:
# Check variations
for variation in data["variations"]:
if variation["typeId"] == piclet_id:
variation["scanCount"] = variation.get("scanCount", 0) + 1
scan_count = variation["scanCount"]
break
else:
return {
"success": False,
"error": "Piclet ID not found"
}
# Save updated data
if PicletDiscoveryService.save_piclet_data(object_name, data):
return {
"success": True,
"scanCount": scan_count
}
else:
return {
"success": False,
"error": "Failed to update scan count"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def generate_piclet(image, hf_token: str) -> dict:
"""
Complete Piclet generation workflow - single endpoint
Takes user's image and hf_token, returns generated Piclet with discovery status
Args:
image: Uploaded image file (Gradio file input)
hf_token: User's HuggingFace OAuth token
Returns:
{
"success": bool,
"piclet": {complete piclet data},
"discoveryStatus": "new" | "variation" | "existing",
"canonicalId": str (if variation/existing),
"message": str
}
"""
try:
# Validate token and get user info
user_info = verify_hf_token(hf_token)
if not user_info:
return {
"success": False,
"error": "Invalid HuggingFace token"
}
print(f"Generating Piclet for user: {user_info.get('preferred_username', 'unknown')}")
# Get user profile (creates if doesn't exist)
user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info)
# Get image path from Gradio (type="filepath" gives us a string path)
image_path = image if isinstance(image, str) else str(image)
# Step 1: Generate caption
print("Step 1/5: Generating image caption...")
caption = PicletGeneratorService.generate_enhanced_caption(image_path, hf_token)
# Step 2: Generate concept
print("Step 2/5: Generating Piclet concept...")
concept_data = PicletGeneratorService.generate_piclet_concept(caption, hf_token)
object_name = concept_data['objectName']
attributes = concept_data['attributes']
stats = concept_data['stats']
image_prompt = concept_data['imagePrompt']
concept_text = concept_data['concept']
# Step 3: Generate image
print("Step 3/5: Generating Piclet image...")
image_result = PicletGeneratorService.generate_piclet_image(
image_prompt,
stats['tier'],
hf_token
)
# Step 4: Check for canonical/variation
print("Step 4/5: Checking for existing canonical...")
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
discovery_status = 'new'
canonical_id = None
scan_count = 1
if existing_data:
# Check if this is an exact canonical match (no attributes)
if not attributes or len(attributes) == 0:
discovery_status = 'existing'
canonical_id = existing_data['canonical']['typeId']
# Increment scan count
existing_data['canonical']['scanCount'] = existing_data['canonical'].get('scanCount', 0) + 1
scan_count = existing_data['canonical']['scanCount']
PicletDiscoveryService.save_piclet_data(object_name, existing_data)
else:
# Check for matching variation
variations = existing_data.get('variations', [])
matched_variation = None
for variation in variations:
var_attrs = set(variation.get('attributes', []))
search_attrs = set(attributes)
overlap = len(var_attrs.intersection(search_attrs))
if overlap >= len(search_attrs) * 0.5:
matched_variation = variation
discovery_status = 'existing'
canonical_id = existing_data['canonical']['typeId']
# Increment variation scan count
variation['scanCount'] = variation.get('scanCount', 0) + 1
scan_count = variation['scanCount']
PicletDiscoveryService.save_piclet_data(object_name, existing_data)
break
if not matched_variation:
discovery_status = 'variation'
canonical_id = existing_data['canonical']['typeId']
# Step 5: Save new discovery if needed
print("Step 5/5: Saving to dataset...")
if discovery_status == 'new':
# Create new canonical
type_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical"
# Upload image to dataset with canonical filename
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
image_filename = f"{normalized_name}_canonical.png"
dataset_image_url = PicletGeneratorService.upload_image_to_dataset(
image_result['imageUrl'],
image_filename
)
canonical_data = {
"canonical": {
"objectName": object_name,
"typeId": type_id,
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": scan_count,
"picletData": {
"typeId": type_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": dataset_image_url,
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"createdAt": datetime.now().isoformat()
}
},
"variations": []
}
canonical_id = type_id
PicletDiscoveryService.save_piclet_data(object_name, canonical_data)
# Update user profile
user_profile["discoveries"].append(type_id)
user_profile["uniqueFinds"] = user_profile.get("uniqueFinds", 0) + 1
user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1
user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 100
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
elif discovery_status == 'variation':
# Create new variation
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
variation_num = len(existing_data['variations']) + 1
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
variation_id = f"{normalized_name}_{variation_num:03d}"
# Upload image to dataset with variation filename
image_filename = f"{normalized_name}_{variation_num:03d}.png"
dataset_image_url = PicletGeneratorService.upload_image_to_dataset(
image_result['imageUrl'],
image_filename
)
variation_data = {
"typeId": variation_id,
"attributes": attributes,
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": scan_count,
"picletData": {
"typeId": variation_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": dataset_image_url,
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"createdAt": datetime.now().isoformat()
}
}
existing_data['variations'].append(variation_data)
PicletDiscoveryService.save_piclet_data(object_name, existing_data)
# Update user profile
user_profile["discoveries"].append(variation_id)
user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1
user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 50
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
# Build complete response
# For existing piclets, get the stored data; for new/variation, use generated data
if discovery_status == 'existing':
# Load the existing piclet data to return
existing_piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if existing_piclet_data and existing_piclet_data.get('canonical'):
existing_canonical = existing_piclet_data['canonical']
piclet_data = existing_canonical.get('picletData', {})
piclet_data['discoveryStatus'] = discovery_status
piclet_data['scanCount'] = existing_canonical.get('scanCount', 1)
else:
# Fallback if data not found
piclet_data = {
"typeId": canonical_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": image_result.get('imageUrl', ''),
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"objectName": object_name,
"attributes": attributes,
"discoveryStatus": discovery_status,
"scanCount": scan_count,
"createdAt": datetime.now().isoformat()
}
else:
# For new and variation, determine the correct dataset URL
if discovery_status == 'new':
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
image_filename = f"{normalized_name}_canonical.png"
else: # variation
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
variation_num = len(existing_data.get('variations', []))
image_filename = f"{normalized_name}_{variation_num:03d}.png"
dataset_image_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/images/{image_filename}"
piclet_data = {
"typeId": canonical_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": dataset_image_url,
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"objectName": object_name,
"attributes": attributes,
"discoveryStatus": discovery_status,
"scanCount": scan_count,
"createdAt": datetime.now().isoformat()
}
messages = {
'new': f"Congratulations! You discovered the first {object_name} Piclet!",
'variation': f"You found a new variation of {object_name}!",
'existing': f"You encountered a known {object_name} Piclet."
}
return {
"success": True,
"piclet": piclet_data,
"discoveryStatus": discovery_status,
"canonicalId": canonical_id,
"message": messages.get(discovery_status, "Piclet generated!")
}
except Exception as e:
print(f"Failed to generate Piclet: {e}")
import traceback
traceback.print_exc()
return {
"success": False,
"error": str(e)
}
def get_object_details(object_name: str) -> dict:
"""
Get complete details for an object (canonical + all variations)
Args:
object_name: The object name (e.g., "pillow", "macbook")
Returns:
{
"success": bool,
"objectName": str,
"canonical": {canonical data},
"variations": [list of variations],
"totalScans": int
}
"""
try:
# Load the object data
piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if not piclet_data:
return {
"success": False,
"error": f"No piclet found for object '{object_name}'",
"objectName": object_name
}
# Calculate total scans across canonical and variations
total_scans = piclet_data['canonical'].get('scanCount', 0)
for variation in piclet_data.get('variations', []):
total_scans += variation.get('scanCount', 0)
return {
"success": True,
"objectName": object_name,
"canonical": piclet_data['canonical'],
"variations": piclet_data.get('variations', []),
"totalScans": total_scans,
"variationCount": len(piclet_data.get('variations', []))
}
except Exception as e:
print(f"Failed to get object details: {e}")
return {
"success": False,
"error": str(e),
"objectName": object_name
}
def get_user_piclets(hf_token: str) -> dict:
"""
Get all Piclets discovered by a specific user
Args:
hf_token: User's HuggingFace OAuth token
Returns:
{
"success": bool,
"piclets": [list of piclet discoveries],
"stats": {user stats}
}
"""
try:
# Verify token and get user info
user_info = verify_hf_token(hf_token)
if not user_info:
return {
"success": False,
"error": "Invalid HuggingFace token",
"piclets": []
}
# Load user profile
user_profile = PicletDiscoveryService.load_user_data(user_info['sub'])
# Get list of discoveries
discoveries = user_profile.get('discoveries', [])
piclets = []
# Load each discovered piclet
for type_id in discoveries:
# Extract object name from type_id (e.g., "pillow_canonical" -> "pillow")
object_name = type_id.rsplit('_', 1)[0]
# Load the piclet data
piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if piclet_data:
# Check if it's canonical or variation
if piclet_data['canonical']['typeId'] == type_id:
piclets.append({
'type': 'canonical',
'typeId': type_id,
'objectName': object_name,
'discoveredAt': piclet_data['canonical']['discoveredAt'],
'scanCount': piclet_data['canonical'].get('scanCount', 1),
'picletData': piclet_data['canonical'].get('picletData', {})
})
else:
# Find matching variation
for variation in piclet_data.get('variations', []):
if variation['typeId'] == type_id:
piclets.append({
'type': 'variation',
'typeId': type_id,
'objectName': object_name,
'attributes': variation.get('attributes', []),
'discoveredAt': variation['discoveredAt'],
'scanCount': variation.get('scanCount', 1),
'picletData': variation.get('picletData', {})
})
break
# Sort by discovery date (most recent first)
piclets.sort(key=lambda x: x.get('discoveredAt', ''), reverse=True)
return {
"success": True,
"piclets": piclets,
"stats": {
"username": user_info.get('preferred_username'),
"name": user_info.get('name'),
"picture": user_info.get('picture'),
"totalFinds": user_profile.get('totalFinds', 0),
"uniqueFinds": user_profile.get('uniqueFinds', 0),
"rarityScore": user_profile.get('rarityScore', 0),
"joinedAt": user_profile.get('joinedAt')
}
}
except Exception as e:
print(f"Failed to get user piclets: {e}")
return {
"success": False,
"error": str(e),
"piclets": []
}
def get_recent_activity(limit: int = 20) -> dict:
"""
Get recent discoveries across all users
"""
try:
activities = []
# List all piclet files
try:
files = list_repo_files(
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
token=HF_TOKEN
)
piclet_files = [f for f in files if f.startswith("piclets/") and f.endswith(".json")]
except:
piclet_files = []
# Load recent piclets (simplified - in production, maintain a separate activity log)
for file_path in piclet_files[-limit:]:
try:
object_name = file_path.replace("piclets/", "").replace(".json", "")
data = PicletDiscoveryService.load_piclet_data(object_name)
if data:
# Add canonical discovery
canonical = data["canonical"]
activities.append({
"type": "discovery",
"objectName": object_name,
"typeId": canonical["typeId"],
"discoveredBy": canonical["discoveredBy"],
"discoveredAt": canonical["discoveredAt"],
"scanCount": canonical.get("scanCount", 1)
})
# Add recent variations
for variation in data.get("variations", [])[-5:]:
activities.append({
"type": "variation",
"objectName": object_name,
"typeId": variation["typeId"],
"attributes": variation["attributes"],
"discoveredBy": variation["discoveredBy"],
"discoveredAt": variation["discoveredAt"],
"scanCount": variation.get("scanCount", 1)
})
except:
continue
# Sort by discovery date
activities.sort(key=lambda x: x.get("discoveredAt", ""), reverse=True)
return {
"success": True,
"activities": activities[:limit]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"activities": []
}
def get_leaderboard(limit: int = 10) -> dict:
"""
Get top discoverers
"""
try:
leaderboard = []
# List all user files
try:
files = list_repo_files(
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
token=HF_TOKEN
)
user_files = [f for f in files if f.startswith("users/") and f.endswith(".json")]
except:
user_files = []
# Load user data
for file_path in user_files:
try:
username = file_path.replace("users/", "").replace(".json", "")
user_data = PicletDiscoveryService.load_user_data(username)
leaderboard.append({
"username": username,
"totalFinds": user_data.get("totalFinds", 0),
"uniqueFinds": user_data.get("uniqueFinds", 0),
"rarityScore": user_data.get("rarityScore", 0)
})
except:
continue
# Sort by rarity score
leaderboard.sort(key=lambda x: x["rarityScore"], reverse=True)
# Add ranks
for i, entry in enumerate(leaderboard[:limit]):
entry["rank"] = i + 1
return {
"success": True,
"leaderboard": leaderboard[:limit]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"leaderboard": []
}
# Create Gradio interface
with gr.Blocks(title="Piclets Discovery Server") as app:
gr.Markdown("""
# ๐Ÿ” Piclets Discovery Server
Backend service for the Piclets discovery game. Each real-world object has ONE canonical Piclet!
""")
with gr.Tab("Generate Piclet"):
gr.Markdown("""
## ๐ŸŽฎ Complete Piclet Generator
Upload an image and provide your HuggingFace token to generate a complete Piclet.
This endpoint handles the entire workflow: captioning, concept generation, image creation, and dataset storage.
""")
with gr.Row():
with gr.Column():
gen_image = gr.Image(label="Upload Image", type="filepath")
gen_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password")
gen_btn = gr.Button("Generate Piclet", variant="primary")
with gr.Column():
gen_result = gr.JSON(label="Generated Piclet Result")
gen_btn.click(
fn=generate_piclet,
inputs=[gen_image, gen_token],
outputs=gen_result
)
with gr.Tab("My Piclets"):
gr.Markdown("""
## ๐Ÿ“š Your Discovery Collection
View all Piclets you've discovered (includes your stats).
""")
with gr.Row():
with gr.Column():
my_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password")
my_btn = gr.Button("Get My Piclets", variant="primary")
with gr.Column():
my_result = gr.JSON(label="My Piclets")
my_btn.click(
fn=get_user_piclets,
inputs=my_token,
outputs=my_result
)
with gr.Tab("Object Details"):
gr.Markdown("""
## ๐Ÿ” View Object Details
Get complete information about an object (canonical + all variations).
""")
with gr.Row():
with gr.Column():
obj_name = gr.Textbox(label="Object Name", placeholder="e.g., pillow, macbook")
obj_btn = gr.Button("Get Details", variant="primary")
with gr.Column():
obj_result = gr.JSON(label="Object Details")
obj_btn.click(
fn=get_object_details,
inputs=obj_name,
outputs=obj_result
)
with gr.Tab("Recent Activity"):
activity_limit = gr.Slider(5, 50, value=20, label="Number of Activities")
activity_btn = gr.Button("Get Recent Activity")
activity_result = gr.JSON(label="Recent Discoveries")
activity_btn.click(
fn=get_recent_activity,
inputs=activity_limit,
outputs=activity_result
)
with gr.Tab("Leaderboard"):
leader_limit = gr.Slider(5, 20, value=10, label="Top N Discoverers")
leader_btn = gr.Button("Get Leaderboard")
leader_result = gr.JSON(label="Top Discoverers")
leader_btn.click(
fn=get_leaderboard,
inputs=leader_limit,
outputs=leader_result
)
# API Documentation
gr.Markdown("""
## ๐Ÿ”Œ Public API Endpoints
All endpoints return JSON responses. The frontend only needs these 5 endpoints:
### 1. **generate_piclet** (Scanner)
Complete Piclet generation workflow.
- Input: `image` (File), `hf_token` (string)
- Output: Generated Piclet with discovery status
### 2. **get_user_piclets** (User Collection)
Get user's discovered Piclets and stats.
- Input: `hf_token` (string)
- Output: List of Piclets + user stats (total/unique finds, rarity score)
### 3. **get_object_details** (Object Data)
Get complete object info (canonical + all variations).
- Input: `object_name` (string)
- Output: Canonical + variations + total scans
### 4. **get_recent_activity** (Activity Feed)
Recent discoveries across all users.
- Input: `limit` (int, default 20)
- Output: Recent discoveries with timestamps
### 5. **get_leaderboard** (Top Users)
Top discoverers by rarity score.
- Input: `limit` (int, default 10)
- Output: Ranked users with stats
---
*Note: Internal helper functions (search_piclet, create_canonical, etc.) are used by generate_piclet but not exposed to frontend.*
""")
if __name__ == "__main__":
# Protect web UI with authentication while allowing API access
admin_password = os.getenv("ADMIN_PASSWORD", "changeme")
# Configure for HuggingFace Space environment
app.launch()