import gradio as gr import threading import os import shutil import tempfile import time from util import process_image_edit, process_local_image_edit, download_and_check_result_nsfw from nfsw import NSFWDetector # Configuration parameters TIP_TRY_N = 4 # Show like button tip after 12 tries FREE_TRY_N = 8 # Free phase: first 15 tries without restrictions SLOW_TRY_N = 15 # Slow phase start: 25 tries SLOW2_TRY_N = 20 # Slow phase start: 32 tries RATE_LIMIT_60 = 25 # Full restriction: blocked after 40 tries # Time window configuration (minutes) PHASE_1_WINDOW = 3 # 15-25 tries: 5 minutes PHASE_2_WINDOW = 10 # 25-32 tries: 10 minutes PHASE_3_WINDOW = 20 # 32-40 tries: 20 minutes MAX_IMAGES_PER_WINDOW = 2 # Max images per time window IP_Dict = {} # IP generation statistics and time window tracking IP_Generation_Count = {} # Record total generation count for each IP IP_Rate_Limit_Track = {} # Record generation count and timestamp in current time window for each IP IP_Country_Cache = {} # Cache IP country information to avoid repeated queries # Country usage statistics Country_Usage_Stats = {} # Track usage count by country Total_Request_Count = 0 # Total request counter for periodic printing PRINT_STATS_INTERVAL = 10 # Print stats every N requests # Restricted countries list (these countries have lower usage limits) RESTRICTED_COUNTRIES = ["印度", "巴基斯坦"] RESTRICTED_COUNTRY_LIMIT = 5 # Max usage for restricted countries def query_ip_country(client_ip): """ Query IP address geo information (only query once per IP) Returns: dict: {"country": str, "region": str, "city": str} """ # Check cache first - no API call for subsequent visits if client_ip in IP_Country_Cache: return IP_Country_Cache[client_ip] # First time visit - query API (silent query) try: import requests api_url = f"https://api.vore.top/api/IPdata?ip={client_ip}" response = requests.get(api_url, timeout=5) if response.status_code == 200: data = response.json() if data.get("code") == 200 and "ipdata" in data: ipdata = data["ipdata"] geo_info = { "country": ipdata.get("info1", "Unknown"), "region": ipdata.get("info2", "Unknown"), "city": ipdata.get("info3", "Unknown") } IP_Country_Cache[client_ip] = geo_info return geo_info # Query failed, cache as default default_geo = {"country": "Unknown", "region": "Unknown", "city": "Unknown"} IP_Country_Cache[client_ip] = default_geo return default_geo except Exception as e: # Exception occurred, cache as default default_geo = {"country": "Unknown", "region": "Unknown", "city": "Unknown"} IP_Country_Cache[client_ip] = default_geo return default_geo def is_restricted_country_ip(client_ip): """ Check if IP is from a restricted country Returns: bool: True if from restricted country """ geo_info = query_ip_country(client_ip) country = geo_info["country"] return country in RESTRICTED_COUNTRIES def get_ip_max_limit(client_ip): """ Get max usage limit for IP based on country Returns: int: Max usage limit """ if is_restricted_country_ip(client_ip): return RESTRICTED_COUNTRY_LIMIT else: return RATE_LIMIT_60 def get_ip_generation_count(client_ip): """ Get IP generation count """ if client_ip not in IP_Generation_Count: IP_Generation_Count[client_ip] = 0 return IP_Generation_Count[client_ip] def increment_ip_generation_count(client_ip): """ Increment IP generation count """ if client_ip not in IP_Generation_Count: IP_Generation_Count[client_ip] = 0 IP_Generation_Count[client_ip] += 1 return IP_Generation_Count[client_ip] def get_ip_phase(client_ip): """ Get current phase for IP Returns: str: 'free', 'rate_limit_1', 'rate_limit_2', 'rate_limit_3', 'blocked' """ count = get_ip_generation_count(client_ip) max_limit = get_ip_max_limit(client_ip) # For restricted countries, check if they've reached their limit if is_restricted_country_ip(client_ip): if count >= max_limit: return 'blocked' elif count >= max_limit - 2: # Last 2 attempts return 'rate_limit_3' elif count >= max_limit - 3: # 3rd attempt from end return 'rate_limit_2' elif count >= max_limit - 4: # 4th attempt from end return 'rate_limit_1' else: return 'free' # For normal countries, use standard limits if count < FREE_TRY_N: return 'free' elif count < SLOW_TRY_N: return 'rate_limit_1' # NSFW blur + 5 minutes 2 images elif count < SLOW2_TRY_N: return 'rate_limit_2' # NSFW blur + 10 minutes 2 images elif count < max_limit: return 'rate_limit_3' # NSFW blur + 20 minutes 2 images else: return 'blocked' # Generation blocked def check_rate_limit_for_phase(client_ip, phase): """ Check rate limit for specific phase Returns: tuple: (is_limited, wait_time_minutes, current_count) """ if phase not in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: return False, 0, 0 # Determine time window if phase == 'rate_limit_1': window_minutes = PHASE_1_WINDOW elif phase == 'rate_limit_2': window_minutes = PHASE_2_WINDOW else: # rate_limit_3 window_minutes = PHASE_3_WINDOW current_time = time.time() window_key = f"{client_ip}_{phase}" # Clean expired records if window_key in IP_Rate_Limit_Track: track_data = IP_Rate_Limit_Track[window_key] # Check if within current time window if current_time - track_data['start_time'] > window_minutes * 60: # Time window expired, reset IP_Rate_Limit_Track[window_key] = { 'count': 0, 'start_time': current_time, 'last_generation': current_time } else: # Initialize IP_Rate_Limit_Track[window_key] = { 'count': 0, 'start_time': current_time, 'last_generation': current_time } track_data = IP_Rate_Limit_Track[window_key] # Check if exceeded limit if track_data['count'] >= MAX_IMAGES_PER_WINDOW: # Calculate remaining wait time elapsed = current_time - track_data['start_time'] wait_time = (window_minutes * 60) - elapsed wait_minutes = max(0, wait_time / 60) return True, wait_minutes, track_data['count'] return False, 0, track_data['count'] def update_country_stats(client_ip): """ Update country usage statistics and print periodically """ global Total_Request_Count, Country_Usage_Stats # Get country info geo_info = IP_Country_Cache.get(client_ip, {"country": "Unknown", "region": "Unknown", "city": "Unknown"}) country = geo_info["country"] # Update country stats if country not in Country_Usage_Stats: Country_Usage_Stats[country] = 0 Country_Usage_Stats[country] += 1 # Increment total request counter Total_Request_Count += 1 # Print stats every N requests if Total_Request_Count % PRINT_STATS_INTERVAL == 0: print("\n" + "="*60) print(f"📊 国家使用统计 (总请求数: {Total_Request_Count})") print("="*60) # Sort by usage count (descending) sorted_stats = sorted(Country_Usage_Stats.items(), key=lambda x: x[1], reverse=True) for country_name, count in sorted_stats: percentage = (count / Total_Request_Count) * 100 print(f" {country_name}: {count} 次 ({percentage:.1f}%)") print("="*60 + "\n") def record_generation_attempt(client_ip, phase): """ Record generation attempt """ # Increment total count increment_ip_generation_count(client_ip) # Update country statistics update_country_stats(client_ip) # Record time window count if phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: window_key = f"{client_ip}_{phase}" current_time = time.time() if window_key in IP_Rate_Limit_Track: IP_Rate_Limit_Track[window_key]['count'] += 1 IP_Rate_Limit_Track[window_key]['last_generation'] = current_time else: IP_Rate_Limit_Track[window_key] = { 'count': 1, 'start_time': current_time, 'last_generation': current_time } def apply_gaussian_blur_to_image_url(image_url, blur_strength=50): """ Apply Gaussian blur to image URL Args: image_url (str): Original image URL blur_strength (int): Blur strength, default 50 (heavy blur) Returns: PIL.Image: Blurred PIL Image object """ try: import requests from PIL import Image, ImageFilter import io # Download image response = requests.get(image_url, timeout=30) if response.status_code != 200: return None # Convert to PIL Image image_data = io.BytesIO(response.content) image = Image.open(image_data) # Apply heavy Gaussian blur blurred_image = image.filter(ImageFilter.GaussianBlur(radius=blur_strength)) return blurred_image except Exception as e: print(f"⚠️ Failed to apply Gaussian blur: {e}") return None # Initialize NSFW detector (download from Hugging Face) try: nsfw_detector = NSFWDetector() # Auto download falconsai_yolov9_nsfw_model_quantized.pt from Hugging Face print("✅ NSFW detector initialized successfully") except Exception as e: print(f"❌ NSFW detector initialization failed: {e}") nsfw_detector = None def edit_image_interface(input_image, prompt, request: gr.Request, progress=gr.Progress()): """ Interface function for processing image editing with phase-based limitations """ try: # Extract user IP client_ip = request.client.host x_forwarded_for = dict(request.headers).get('x-forwarded-for') if x_forwarded_for: client_ip = x_forwarded_for if client_ip not in IP_Dict: IP_Dict[client_ip] = 0 IP_Dict[client_ip] += 1 if input_image is None: return None, "Please upload an image first", gr.update(visible=False) if not prompt or prompt.strip() == "": return None, "Please enter editing prompt", gr.update(visible=False) # Check if prompt length is greater than 3 characters if len(prompt.strip()) <= 3: return None, "❌ Editing prompt must be more than 3 characters", gr.update(visible=False) except Exception as e: print(f"⚠️ Request preprocessing error: {e}") return None, "❌ Request processing error", gr.update(visible=False) # Get user current phase current_phase = get_ip_phase(client_ip) current_count = get_ip_generation_count(client_ip) geo_info = IP_Country_Cache.get(client_ip, {"country": "Unknown", "region": "Unknown", "city": "Unknown"}) print(f"📊 User phase info - IP: {client_ip}, Location: {geo_info['country']}/{geo_info['region']}/{geo_info['city']}, Phase: {current_phase}, Count: {current_count}") # Check if user reached the like button tip threshold show_like_tip = (current_count >= TIP_TRY_N) # Check if completely blocked if current_phase == 'blocked': # Generate blocked limit button blocked_button_html = f"""
🚀 Unlimited Generation
""" # Use same message for all users to avoid discrimination perception blocked_message = f"❌ You have reached Hugging Face's free generation limit. Please visit https://omnicreator.net/#generator for unlimited generation" return None, blocked_message, gr.update(value=blocked_button_html, visible=True) # Check rate limit (applies to rate_limit phases) if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: is_limited, wait_minutes, window_count = check_rate_limit_for_phase(client_ip, current_phase) if is_limited: wait_minutes_int = int(wait_minutes) + 1 # Generate rate limit button rate_limit_button_html = f"""
⏰ Skip Wait - Unlimited Generation
""" return None, f"❌ You have reached Hugging Face's free generation limit. Please visit https://omnicreator.net/#generator for unlimited generation, or wait {wait_minutes_int} minutes before generating again", gr.update(value=rate_limit_button_html, visible=True) # Handle NSFW detection based on phase is_nsfw_task = False # Track if this task involves NSFW content # Skip NSFW detection in free phase if current_phase != 'free' and nsfw_detector is not None and input_image is not None: try: nsfw_result = nsfw_detector.predict_pil_label_only(input_image) if nsfw_result.lower() == "nsfw": is_nsfw_task = True print(f"🔍 Input NSFW detected in {current_phase} phase: ❌❌❌ {nsfw_result} - IP: {client_ip} (will blur result)") else: print(f"🔍 Input NSFW check passed: ✅✅✅ {nsfw_result} - IP: {client_ip}") except Exception as e: print(f"⚠️ Input NSFW detection failed: {e}") # Allow continuation when detection fails result_url = None status_message = "" def progress_callback(message): try: nonlocal status_message status_message = message # Add error handling to prevent progress update failure if progress is not None: # Enhanced progress display with better formatting if "Queue:" in message or "tasks ahead" in message: # Queue status - show with different progress value to indicate waiting progress(0.1, desc=message) elif "Processing" in message or "AI is processing" in message: # Processing status progress(0.7, desc=message) elif "Generating" in message or "Almost done" in message: # Generation status progress(0.9, desc=message) else: # Default status progress(0.5, desc=message) except Exception as e: print(f"⚠️ Progress update failed: {e}") try: # Record generation attempt (before actual generation to ensure correct count) record_generation_attempt(client_ip, current_phase) updated_count = get_ip_generation_count(client_ip) print(f"✅ Processing started - IP: {client_ip}, phase: {current_phase}, total count: {updated_count}, prompt: {prompt.strip()}", flush=True) # Call image editing processing function input_image_url, result_url, message, task_uuid = process_image_edit(input_image, prompt.strip(), None, progress_callback) if result_url: print(f"✅ Processing completed successfully - IP: {client_ip}, result_url: {result_url}, task_uuid: {task_uuid}", flush=True) # Detect result image NSFW content (only in rate limit phases) if nsfw_detector is not None and current_phase != 'free': try: if progress is not None: progress(0.9, desc="Checking result image...") is_nsfw, nsfw_error = download_and_check_result_nsfw(result_url, nsfw_detector) if nsfw_error: print(f"⚠️ Result image NSFW detection error - IP: {client_ip}, error: {nsfw_error}") elif is_nsfw: is_nsfw_task = True # Mark task as NSFW print(f"🔍 Result image NSFW detected in {current_phase} phase: ❌❌❌ - IP: {client_ip} (will blur result)") else: print(f"🔍 Result image NSFW check passed: ✅✅✅ - IP: {client_ip}") except Exception as e: print(f"⚠️ Result image NSFW detection exception - IP: {client_ip}, error: {str(e)}") # Apply blur if this is an NSFW task in rate limit phases should_blur = False if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3'] and is_nsfw_task: should_blur = True # Apply blur processing if should_blur: if progress is not None: progress(0.95, desc="Applying content filter...") blurred_image = apply_gaussian_blur_to_image_url(result_url) if blurred_image is not None: final_result = blurred_image # Return PIL Image object final_message = f"⚠️ Content filter applied due to Hugging Face community guidelines. For unlimited creative freedom, visit our official website https://omnicreator.net/#generator" print(f"🔒 Applied Gaussian blur for NSFW content - IP: {client_ip}") else: # Blur failed, return original URL with warning final_result = result_url final_message = f"⚠️ Content review required. Please visit https://omnicreator.net/#generator for better experience" # Generate NSFW button for blurred content nsfw_action_buttons_html = f"""
🔥 Unlimited Creative Generation
""" return final_result, final_message, gr.update(value=nsfw_action_buttons_html, visible=True) else: final_result = result_url final_message = "✅ " + message try: if progress is not None: progress(1.0, desc="Processing completed") except Exception as e: print(f"⚠️ Final progress update failed: {e}") # Generate action buttons HTML like Trump AI Voice action_buttons_html = "" if task_uuid: task_detail_url = f"https://omnicreator.net/my-creations/task/{task_uuid}" # Create image-to-video URL with input image, end image, and prompt from urllib.parse import quote # Use result URL as end_image, original upload URL as input_image encoded_prompt = quote(prompt.strip()) image_to_video_url = f"https://omnicreator.net/image-to-video?input_image={input_image_url}&end_image={result_url}&prompt={encoded_prompt}" action_buttons_html = f"""
🖼 Download HD Image 🎥 Turn Image to Video
""" # Add popup script if needed (using different approach) if show_like_tip: action_buttons_html += """
👉 Click the ❤️ Like button to unlock more free trial attempts!
""" return final_result, final_message, gr.update(value=action_buttons_html, visible=True) else: print(f"❌ Processing failed - IP: {client_ip}, error: {message}", flush=True) return None, "❌ " + message, gr.update(visible=False) except Exception as e: print(f"❌ Processing exception - IP: {client_ip}, error: {str(e)}") return None, f"❌ Error occurred during processing: {str(e)}", gr.update(visible=False) def local_edit_interface(image_dict, prompt, reference_image, request: gr.Request, progress=gr.Progress()): """ Handle local editing requests (with phase-based limitations) """ try: # Extract user IP client_ip = request.client.host x_forwarded_for = dict(request.headers).get('x-forwarded-for') if x_forwarded_for: client_ip = x_forwarded_for if client_ip not in IP_Dict: IP_Dict[client_ip] = 0 IP_Dict[client_ip] += 1 if image_dict is None: return None, "Please upload an image and draw the area to edit", gr.update(visible=False) # Handle different input formats for ImageEditor if isinstance(image_dict, dict): # ImageEditor dict format if "background" not in image_dict or "layers" not in image_dict: return None, "Please draw the area to edit on the image", gr.update(visible=False) base_image = image_dict["background"] layers = image_dict["layers"] # Special handling: if background is None but composite exists, use composite if base_image is None and "composite" in image_dict and image_dict["composite"] is not None: print("🔧 Background is None, using composite instead") base_image = image_dict["composite"] else: # Simple case: Direct PIL Image (from example) base_image = image_dict layers = [] # Check for special example case - bypass mask requirement is_example_case = prompt and prompt.startswith("EXAMPLE_PANDA_CAT_") # Debug: check current state if is_example_case: print(f"🔍 Example case detected - base_image is None: {base_image is None}") # Special handling for example case: load image directly from file if is_example_case and base_image is None: try: from PIL import Image import os main_path = "datas/panda01.jpeg" print(f"🔍 Trying to load: {main_path}, exists: {os.path.exists(main_path)}") if os.path.exists(main_path): base_image = Image.open(main_path) print(f"✅ Successfully loaded example image: {base_image.size}") else: return None, f"❌ Example image not found: {main_path}", gr.update(visible=False) except Exception as e: return None, f"❌ Failed to load example image: {str(e)}", gr.update(visible=False) # Additional check for base_image if base_image is None: if is_example_case: print(f"❌ Example case but base_image still None!") return None, "❌ No image found. Please upload an image first.", gr.update(visible=False) if not layers and not is_example_case: return None, "Please draw the area to edit on the image", gr.update(visible=False) if not prompt or prompt.strip() == "": return None, "Please enter editing prompt", gr.update(visible=False) # Check prompt length if len(prompt.strip()) <= 3: return None, "❌ Editing prompt must be more than 3 characters", gr.update(visible=False) except Exception as e: print(f"⚠️ Local edit request preprocessing error: {e}") return None, "❌ Request processing error", gr.update(visible=False) # Get user current phase current_phase = get_ip_phase(client_ip) current_count = get_ip_generation_count(client_ip) geo_info = IP_Country_Cache.get(client_ip, {"country": "Unknown", "region": "Unknown", "city": "Unknown"}) print(f"📊 Local edit user phase info - IP: {client_ip}, Location: {geo_info['country']}/{geo_info['region']}/{geo_info['city']}, Phase: {current_phase}, Count: {current_count}") # Check if user reached the like button tip threshold show_like_tip = (current_count >= TIP_TRY_N) # Check if completely blocked if current_phase == 'blocked': # Generate blocked limit button blocked_button_html = f"""
🚀 Unlimited Generation
""" # Use same message for all users to avoid discrimination perception blocked_message = f"❌ You have reached Hugging Face's free generation limit. Please visit https://omnicreator.net/#generator for unlimited generation" return None, blocked_message, gr.update(value=blocked_button_html, visible=True) # Check rate limit (applies to rate_limit phases) if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3']: is_limited, wait_minutes, window_count = check_rate_limit_for_phase(client_ip, current_phase) if is_limited: wait_minutes_int = int(wait_minutes) + 1 # Generate rate limit button rate_limit_button_html = f"""
⏰ Skip Wait - Unlimited Generation
""" return None, f"❌ You have reached Hugging Face's free generation limit. Please visit https://omnicreator.net/#generator for unlimited generation, or wait {wait_minutes_int} minutes before generating again", gr.update(value=rate_limit_button_html, visible=True) # Handle NSFW detection based on phase is_nsfw_task = False # Track if this task involves NSFW content # Skip NSFW detection in free phase if current_phase != 'free' and nsfw_detector is not None and base_image is not None: try: nsfw_result = nsfw_detector.predict_pil_label_only(base_image) if nsfw_result.lower() == "nsfw": is_nsfw_task = True print(f"🔍 Local edit input NSFW detected in {current_phase} phase: ❌❌❌ {nsfw_result} - IP: {client_ip} (will blur result)") else: print(f"🔍 Local edit input NSFW check passed: ✅✅✅ {nsfw_result} - IP: {client_ip}") except Exception as e: print(f"⚠️ Local edit input NSFW detection failed: {e}") # Allow continuation when detection fails result_url = None status_message = "" def progress_callback(message): try: nonlocal status_message status_message = message # Add error handling to prevent progress update failure if progress is not None: # Enhanced progress display with better formatting for local editing if "Queue:" in message or "tasks ahead" in message: # Queue status - show with different progress value to indicate waiting progress(0.1, desc=message) elif "Processing" in message or "AI is processing" in message: # Processing status progress(0.7, desc=message) elif "Generating" in message or "Almost done" in message: # Generation status progress(0.9, desc=message) else: # Default status progress(0.5, desc=message) except Exception as e: print(f"⚠️ Local edit progress update failed: {e}") try: # Record generation attempt (before actual generation to ensure correct count) record_generation_attempt(client_ip, current_phase) updated_count = get_ip_generation_count(client_ip) print(f"✅ Local editing started - IP: {client_ip}, phase: {current_phase}, total count: {updated_count}, prompt: {prompt.strip()}", flush=True) # Clean prompt for API call clean_prompt = prompt.strip() if clean_prompt.startswith("EXAMPLE_PANDA_CAT_"): clean_prompt = clean_prompt[18:] # Remove the prefix # Call local image editing processing function if is_example_case: # For example case, pass special flag to use local mask file input_image_url, result_url, message, task_uuid = process_local_image_edit(base_image, layers, clean_prompt, reference_image, progress_callback, use_example_mask="datas/panda01m.jpeg") else: # Normal case input_image_url, result_url, message, task_uuid = process_local_image_edit(base_image, layers, clean_prompt, reference_image, progress_callback) if result_url: print(f"✅ Local editing completed successfully - IP: {client_ip}, result_url: {result_url}, task_uuid: {task_uuid}", flush=True) # Detect result image NSFW content (only in rate limit phases) if nsfw_detector is not None and current_phase != 'free': try: if progress is not None: progress(0.9, desc="Checking result image...") is_nsfw, nsfw_error = download_and_check_result_nsfw(result_url, nsfw_detector) if nsfw_error: print(f"⚠️ Local edit result image NSFW detection error - IP: {client_ip}, error: {nsfw_error}") elif is_nsfw: is_nsfw_task = True # Mark task as NSFW print(f"🔍 Local edit result image NSFW detected in {current_phase} phase: ❌❌❌ - IP: {client_ip} (will blur result)") else: print(f"🔍 Local edit result image NSFW check passed: ✅✅✅ - IP: {client_ip}") except Exception as e: print(f"⚠️ Local edit result image NSFW detection exception - IP: {client_ip}, error: {str(e)}") # Apply blur if this is an NSFW task in rate limit phases should_blur = False if current_phase in ['rate_limit_1', 'rate_limit_2', 'rate_limit_3'] and is_nsfw_task: should_blur = True # Apply blur processing if should_blur: if progress is not None: progress(0.95, desc="Applying content filter...") blurred_image = apply_gaussian_blur_to_image_url(result_url) if blurred_image is not None: final_result = blurred_image # Return PIL Image object final_message = f"⚠️ Content filter applied due to Hugging Face community guidelines. For unlimited creative freedom, visit our official website https://omnicreator.net/#generator" print(f"🔒 Local edit applied Gaussian blur for NSFW content - IP: {client_ip}") else: # Blur failed, return original URL with warning final_result = result_url final_message = f"⚠️ Content review required. Please visit https://omnicreator.net/#generator for better experience" # Generate NSFW button for blurred content nsfw_action_buttons_html = f"""
🔥 Unlimited Creative Generation
""" return final_result, final_message, gr.update(value=nsfw_action_buttons_html, visible=True) else: final_result = result_url final_message = "✅ " + message try: if progress is not None: progress(1.0, desc="Processing completed") except Exception as e: print(f"⚠️ Local edit final progress update failed: {e}") # Generate action buttons HTML like Trump AI Voice action_buttons_html = "" if task_uuid: task_detail_url = f"https://omnicreator.net/my-creations/task/{task_uuid}" # Create image-to-video URL with input image, end image, and prompt from urllib.parse import quote # Use result URL as end_image, original upload URL as input_image encoded_prompt = quote(clean_prompt) image_to_video_url = f"https://omnicreator.net/image-to-video?input_image={input_image_url}&end_image={result_url}&prompt={encoded_prompt}" action_buttons_html = f"""
🖼 Download HD Image 🎥 Turn Image to Video
""" # Add popup script if needed (using different approach) if show_like_tip: action_buttons_html += """
👉 Click the ❤️ Like button to unlock more free trial attempts!
""" return final_result, final_message, gr.update(value=action_buttons_html, visible=True) else: print(f"❌ Local editing processing failed - IP: {client_ip}, error: {message}", flush=True) return None, "❌ " + message, gr.update(visible=False) except Exception as e: print(f"❌ Local editing exception - IP: {client_ip}, error: {str(e)}") return None, f"❌ Error occurred during processing: {str(e)}", gr.update(visible=False) # Create Gradio interface def create_app(): with gr.Blocks( title="AI Image Editor", theme=gr.themes.Soft(), css=""" .main-container { max-width: 1200px; margin: 0 auto; } .upload-area { border: 2px dashed #ccc; border-radius: 10px; padding: 20px; text-align: center; } .result-area { margin-top: 20px; padding: 20px; border-radius: 10px; background-color: #f8f9fa; } .use-as-input-btn { margin-top: 10px; width: 100%; } """, # Improve concurrency performance configuration head=""" """ ) as app: # Main title - styled like Trump AI Voice gr.HTML("""

🎨 AI Image Editor

""", padding=False) # 🌟 NEW: Multi-Image Editing Announcement Banner with breathing effect gr.HTML("""
🚀 NEWS: World's First Multi-Image Editing Tool →
""", padding=False) with gr.Tabs(): with gr.Tab("🌍 Global Editor"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📸 Upload Image") input_image = gr.Image( label="Select image to edit", type="pil", height=512, elem_classes=["upload-area"] ) gr.Markdown("### ✍️ Editing Instructions") prompt_input = gr.Textbox( label="Enter editing prompt", placeholder="For example: change background to beach, add rainbow, remove background, etc...", lines=3, max_lines=5 ) edit_button = gr.Button( "🚀 Start Editing", variant="primary", size="lg" ) with gr.Column(scale=1): gr.Markdown("### 🎯 Editing Result") output_image = gr.Image( label="Edited image", height=320, elem_classes=["result-area"] ) use_as_input_btn = gr.Button( "🔄 Use as Input", variant="secondary", size="sm", elem_classes=["use-as-input-btn"] ) status_output = gr.Textbox( label="Processing status", lines=2, max_lines=3, interactive=False ) action_buttons = gr.HTML(visible=False) gr.Markdown("### 💡 Prompt Examples") with gr.Row(): example_prompts = [ "Set the background to a grand opera stage with red curtains", "Change the outfit into a traditional Chinese hanfu with flowing sleeves", "Give the character blue dragon-like eyes with glowing pupils", "Change lighting to soft dreamy pastel glow", "Change pose to sitting cross-legged on the ground" ] for prompt in example_prompts: gr.Button( prompt, size="sm" ).click( lambda p=prompt: p, outputs=prompt_input ) edit_button.click( fn=edit_image_interface, inputs=[input_image, prompt_input], outputs=[output_image, status_output, action_buttons], show_progress=True, concurrency_limit=10, api_name="global_edit" ) def simple_use_as_input(output_img): if output_img is not None: return output_img return None use_as_input_btn.click( fn=simple_use_as_input, inputs=[output_image], outputs=[input_image] ) with gr.Tab("🖌️ Local Inpaint"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📸 Upload Image and Draw Mask") local_input_image = gr.ImageEditor( label="Upload image and draw mask", type="pil", height=512, brush=gr.Brush(colors=["#ff0000"], default_size=180), elem_classes=["upload-area"] ) gr.Markdown("### 🖼️ Reference Image(Optional)") local_reference_image = gr.Image( label="Upload reference image (optional)", type="pil", height=256 ) gr.Markdown("### ✍️ Editing Instructions") local_prompt_input = gr.Textbox( label="Enter local editing prompt", placeholder="For example: change selected area hair to golden, add patterns to selected object, change selected area color, etc...", lines=3, max_lines=5 ) local_edit_button = gr.Button( "🎯 Start Local Editing", variant="primary", size="lg" ) with gr.Column(scale=1): gr.Markdown("### 🎯 Editing Result") local_output_image = gr.Image( label="Local edited image", height=320, elem_classes=["result-area"] ) local_use_as_input_btn = gr.Button( "🔄 Use as Input", variant="secondary", size="sm", elem_classes=["use-as-input-btn"] ) local_status_output = gr.Textbox( label="Processing status", lines=2, max_lines=3, interactive=False ) local_action_buttons = gr.HTML(visible=False) local_edit_button.click( fn=local_edit_interface, inputs=[local_input_image, local_prompt_input, local_reference_image], outputs=[local_output_image, local_status_output, local_action_buttons], show_progress=True, concurrency_limit=8, api_name="local_edit" ) def simple_local_use_as_input(output_img): if output_img is not None: return { "background": output_img, "layers": [], "composite": output_img } return None local_use_as_input_btn.click( fn=simple_local_use_as_input, inputs=[local_output_image], outputs=[local_input_image] ) # Local inpaint example gr.Markdown("### 💡 Local Inpaint Example") def load_local_example(): """Load panda to cat transformation example - simplified, mask handled in backend""" try: from PIL import Image import os # Check file paths main_path = "datas/panda01.jpeg" ref_path = "datas/cat01.webp" # Load main image if not os.path.exists(main_path): return None, None, "EXAMPLE_PANDA_CAT_let the cat ride on the panda" main_img = Image.open(main_path) # Load reference image if not os.path.exists(ref_path): ref_img = None else: ref_img = Image.open(ref_path) # ImageEditor format editor_data = { "background": main_img, "layers": [], "composite": main_img } # Special prompt to indicate this is the example case prompt = "EXAMPLE_PANDA_CAT_let the cat ride on the panda" # Return just the PIL image instead of dict format to avoid UI state issues return main_img, ref_img, prompt except Exception as e: return None, None, "EXAMPLE_PANDA_CAT_Transform the panda head into a cute cat head, keeping the body" # Example display gr.Markdown("#### 🐼➡️🐱 Example: Panda to Cat Transformation") with gr.Row(): with gr.Column(scale=2): # Preview images for local example with gr.Row(): try: gr.Image("datas/panda01.jpeg", label="Main Image", height=120, width=120, show_label=True, interactive=False) gr.Image("datas/panda01m.jpeg", label="Mask", height=120, width=120, show_label=True, interactive=False) gr.Image("datas/cat01.webp", label="Reference", height=120, width=120, show_label=True, interactive=False) except: gr.Markdown("*Preview images not available*") gr.Markdown("**Prompt**: let the cat ride on the panda \n**Note**: Mask will be automatically applied when you submit this example") with gr.Column(scale=1): gr.Button( "🎨 Load Panda Example", size="lg", variant="secondary" ).click( fn=load_local_example, outputs=[local_input_image, local_reference_image, local_prompt_input] ) # Add a refresh button to fix UI state issues gr.Button( "🔄 Refresh Image Editor", size="sm", variant="secondary" ).click( fn=lambda: gr.update(), outputs=[local_input_image] ) # SEO Content Section gr.HTML("""

🎨 Unlimited AI Image Generation & Editing

Experience the ultimate freedom in AI image creation! Generate and edit unlimited images without restrictions, with complete creative freedom and no limits on your imagination with our premium AI image editing platform.

🚀 Get Unlimited Access Now

Join thousands of creators who trust Omni Creator for unrestricted AI image generation!

⭐ Professional AI Image Editor - No Restrictions

Transform your creative vision into reality with our advanced AI image editing platform. Whether you're creating art, editing photos, designing content, or working with any type of imagery - our powerful AI removes all limitations and gives you complete creative freedom.

🎯 Unlimited Generation

Premium users enjoy unlimited image generation without daily limits, rate restrictions, or content barriers. Create as many images as you need, whenever you need them.

🔓 Creative Freedom

Generate and edit any type of content with complete creative freedom and no limits on your imagination. Unlimited possibilities for artists, designers, and content creators.

⚡ Lightning Fast Processing

Advanced AI infrastructure delivers high-quality results in seconds. No waiting in queues, no processing delays - just instant, professional-grade image editing.

🎨 Advanced Editing Tools

Global transformations, precision local editing, style transfer, object removal, background replacement, and dozens of other professional editing capabilities.

💎 Premium Quality

State-of-the-art AI models trained on millions of images deliver exceptional quality and realism. Professional results suitable for commercial use and high-end projects.

🌍 Multi-Modal Support

Support for all image formats, styles, and use cases. From photorealistic portraits to artistic creations, product photography to digital art - we handle everything.

💎 Why Choose Omni Creator Premium?

🚫 No Rate Limits

Generate unlimited images without waiting periods or daily restrictions

🎭 Unlimited Imagination

Create any type of content with complete creative freedom and no limits

⚡ Priority Processing

Skip queues and get instant results with dedicated processing power

🎨 Advanced Features

Access to latest AI models and cutting-edge editing capabilities

⭐ Start Creating Now

💡 Pro Tips for Best Results

📝 Clear Descriptions:

Use detailed, specific prompts for better results. Describe colors, styles, lighting, and composition clearly.

🎯 Local Editing:

Use precise brush strokes to select areas for local editing. Smaller, focused edits often yield better results.

⚡ Iterative Process:

Use "Use as Input" feature to refine results. Multiple iterations can achieve complex transformations.

🖼 Image Quality:

Higher resolution input images (up to 10MB) generally produce better editing results and finer details.

🚀 Perfect For Every Creative Need

🎨 Digital Art

  • Character design
  • Concept art
  • Style transfer
  • Artistic effects

📸 Photography

  • Background replacement
  • Object removal
  • Lighting adjustment
  • Portrait enhancement

🛍️ E-commerce

  • Product photography
  • Lifestyle shots
  • Color variations
  • Context placement

📱 Social Media

  • Content creation
  • Meme generation
  • Brand visuals
  • Viral content
🎯 Start Your Project Now

Powered by Omni Creator

The ultimate AI image generation and editing platform • Unlimited creativity, zero restrictions

""", padding=False) return app if __name__ == "__main__": app = create_app() # Improve queue configuration to handle high concurrency and prevent SSE connection issues app.queue( default_concurrency_limit=20, # Default concurrency limit max_size=50, # Maximum queue size api_open=False # Close API access to reduce resource consumption ) app.launch( server_name="0.0.0.0", show_error=True, # Show detailed error information quiet=False, # Keep log output max_threads=40, # Increase thread pool size height=800, favicon_path=None # Reduce resource loading )