import gradio as gr import os import tempfile import shutil from typing import Optional, Tuple, Union from huggingface_hub import InferenceClient, whoami from pathlib import Path # Initialize Hugging Face Inference Client with fal-ai provider client = InferenceClient( provider="fal-ai", api_key=os.environ.get("HF_TOKEN"), bill_to="huggingface", ) def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool: """Verifies if the user is a Hugging Face PRO user or part of an enterprise org.""" if not token: return False if isinstance(token, gr.OAuthToken): token_str = token.token elif isinstance(token, str): token_str = token else: return False try: user_info = whoami(token=token_str) return ( user_info.get("isPro", False) or any(org.get("isEnterprise", False) for org in user_info.get("orgs", [])) ) except Exception as e: print(f"Could not verify user's PRO/Enterprise status: {e}") return False def cleanup_temp_files(): """Clean up old temporary video files to prevent storage overflow.""" try: temp_dir = tempfile.gettempdir() # Clean up old .mp4 files in temp directory for file_path in Path(temp_dir).glob("*.mp4"): try: # Remove files older than 5 minutes import time if file_path.stat().st_mtime < (time.time() - 300): file_path.unlink(missing_ok=True) except Exception: pass except Exception as e: print(f"Cleanup error: {e}") def generate_video( prompt: str, duration: int = 8, size: str = "1280x720", api_key: Optional[str] = None ) -> Tuple[Optional[str], str]: """Generate video using Sora-2 through Hugging Face Inference API with fal-ai provider.""" cleanup_temp_files() try: if api_key: temp_client = InferenceClient( provider="fal-ai", api_key=api_key, bill_to="huggingface", ) else: temp_client = client if not os.environ.get("HF_TOKEN") and not api_key: return None, "❌ Please set HF_TOKEN environment variable." video_bytes = temp_client.text_to_video( prompt, model="akhaliq/sora-2", ) temp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) try: temp_file.write(video_bytes) temp_file.flush() video_path = temp_file.name finally: temp_file.close() return video_path, "✅ Video generated successfully!" except Exception as e: return None, f"❌ Error generating video: {str(e)}" # --- NEW: image -> video support --- def generate_video_from_image( image: Union[str, bytes], prompt: str, api_key: Optional[str] = None ) -> Tuple[Optional[str], str]: """Generate a video from a single input image + prompt using Sora-2 image-to-video.""" cleanup_temp_files() if not prompt or prompt.strip() == "": return None, "❌ Please enter a prompt" try: if api_key: temp_client = InferenceClient( provider="fal-ai", api_key=api_key, bill_to="huggingface", ) else: temp_client = client if not os.environ.get("HF_TOKEN") and not api_key: return None, "❌ Please set HF_TOKEN environment variable." if isinstance(image, str): with open(image, "rb") as f: input_image = f.read() elif isinstance(image, (bytes, bytearray)): input_image = image else: return None, "❌ Invalid image input. Please upload an image." video_bytes = temp_client.image_to_video( input_image, prompt=prompt, model="akhaliq/sora-2-image-to-video", ) temp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) try: temp_file.write(video_bytes) temp_file.flush() video_path = temp_file.name finally: temp_file.close() return video_path, "✅ Video generated from image successfully!" except Exception as e: return None, f"❌ Error generating video from image: {str(e)}" def generate_with_pro_auth( prompt: str, oauth_token: Optional[gr.OAuthToken] = None ) -> Tuple[Optional[str], str]: """Wrapper function that checks if user is PRO before generating video.""" if not verify_pro_status(oauth_token): raise gr.Error("Access Denied. This app is exclusively for Hugging Face PRO users.") if not prompt or prompt.strip() == "": return None, "❌ Please enter a prompt" return generate_video( prompt, duration=8, size="1280x720", api_key=None ) # --- NEW: PRO-gated wrapper for image -> video --- def generate_with_pro_auth_image( prompt: str, image_path: Optional[str] = None, oauth_token: Optional[gr.OAuthToken] = None ) -> Tuple[Optional[str], str]: """Checks PRO status then calls image->video generator.""" if not verify_pro_status(oauth_token): raise gr.Error("Access Denied. This app is exclusively for Hugging Face PRO users.") if not image_path: return None, "❌ Please upload an image" return generate_video_from_image(image=image_path, prompt=prompt, api_key=None) def simple_generate(prompt: str) -> Optional[str]: """Simplified wrapper for examples that only returns video.""" if not prompt or prompt.strip() == "": return None video_path, _ = generate_video(prompt, duration=8, size="1280x720", api_key=None) return video_path def create_ui(): css = ''' .logo-dark{display: none} .dark .logo-dark{display: block !important} .dark .logo-light{display: none} #sub_title{margin-top: -20px !important} .pro-badge{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 4px 12px; border-radius: 20px; font-size: 0.9em; font-weight: bold; display: inline-block; margin-left: 8px; } ''' with gr.Blocks(title="Sora-2 Text-to-Video Generator", theme=gr.themes.Soft(), css=css) as demo: gr.HTML("""

🎬 Sora-2 Text-to-Video Generator PRO

Generate stunning videos using OpenAI's Sora-2 model

Exclusive access for Hugging Face PRO users. Subscribe to PRO →

Built with anycoder

""") gr.LoginButton() pro_message = gr.Markdown(visible=False) main_interface = gr.Column(visible=False) with main_interface: gr.HTML("""

✨ Welcome PRO User! You have full access to Sora-2.

""") # Text -> Video with gr.Row(): with gr.Column(scale=1): prompt_input = gr.Textbox( label="Enter your prompt", placeholder="Describe the video you want to create...", lines=4 ) generate_btn = gr.Button("🎥 Generate Video", variant="primary", size="lg") with gr.Column(scale=1): video_output = gr.Video(label="Generated Video", height=400, interactive=False, show_download_button=True) status_output = gr.Textbox(label="Status", interactive=False, visible=True) generate_btn.click( fn=generate_with_pro_auth, inputs=[prompt_input], outputs=[video_output, status_output], queue=False ) # --- NEW: Image -> Video UI --- gr.HTML("""

🖼️ ➜ 🎬 Image → Video (beta)

Turn a single image into a short video with a guiding prompt.

""") with gr.Row(): with gr.Column(scale=1): img_prompt_input = gr.Textbox( label="Describe how the scene should evolve", placeholder="e.g., The cat starts to dance and spins playfully", lines=3, ) image_input = gr.Image(label="Upload an image", type="filepath") generate_img_btn = gr.Button("🎥 Generate from Image", variant="primary") with gr.Column(scale=1): video_output_img = gr.Video(label="Generated Video (from Image)", height=400, interactive=False, show_download_button=True) status_output_img = gr.Textbox(label="Status", interactive=False, visible=True) generate_img_btn.click( fn=generate_with_pro_auth_image, inputs=[img_prompt_input, image_input], outputs=[video_output_img, status_output_img], queue=False ) gr.HTML("""

Thank you for being a PRO user! 🤗

""") def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None): if not profile: return gr.update(visible=False), gr.update(visible=False) if verify_pro_status(oauth_token): return gr.update(visible=True), gr.update(visible=False) else: message = "## ✨ Exclusive Access for PRO Users\n\nThis tool is available exclusively for Hugging Face **PRO** members." return gr.update(visible=False), gr.update(visible=True, value=message) demo.load(control_access, inputs=None, outputs=[main_interface, pro_message]) return demo if __name__ == "__main__": try: cleanup_temp_files() if os.path.exists("gradio_cached_examples"): shutil.rmtree("gradio_cached_examples", ignore_errors=True) except Exception as e: print(f"Initial cleanup error: {e}") app = create_ui() app.launch(show_api=False, enable_monitoring=False, quiet=True, max_threads=10)