|
|
import gradio as gr |
|
|
import os |
|
|
import tempfile |
|
|
import shutil |
|
|
from typing import Optional, Tuple, Union |
|
|
from huggingface_hub import InferenceClient, whoami |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
client = InferenceClient( |
|
|
provider="fal-ai", |
|
|
api_key=os.environ.get("HF_TOKEN"), |
|
|
bill_to="huggingface", |
|
|
) |
|
|
|
|
|
def verify_pro_status(token: Optional[Union[gr.OAuthToken, str]]) -> bool: |
|
|
"""Verifies if the user is a Hugging Face PRO user or part of an enterprise org.""" |
|
|
if not token: |
|
|
return False |
|
|
|
|
|
if isinstance(token, gr.OAuthToken): |
|
|
token_str = token.token |
|
|
elif isinstance(token, str): |
|
|
token_str = token |
|
|
else: |
|
|
return False |
|
|
|
|
|
try: |
|
|
user_info = whoami(token=token_str) |
|
|
return ( |
|
|
user_info.get("isPro", False) or |
|
|
any(org.get("isEnterprise", False) for org in user_info.get("orgs", [])) |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Could not verify user's PRO/Enterprise status: {e}") |
|
|
return False |
|
|
|
|
|
def cleanup_temp_files(): |
|
|
"""Clean up old temporary video files to prevent storage overflow.""" |
|
|
try: |
|
|
temp_dir = tempfile.gettempdir() |
|
|
|
|
|
for file_path in Path(temp_dir).glob("*.mp4"): |
|
|
try: |
|
|
|
|
|
import time |
|
|
if file_path.stat().st_mtime < (time.time() - 300): |
|
|
file_path.unlink(missing_ok=True) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception as e: |
|
|
print(f"Cleanup error: {e}") |
|
|
|
|
|
def generate_video( |
|
|
prompt: str, |
|
|
duration: int = 8, |
|
|
size: str = "1280x720", |
|
|
api_key: Optional[str] = None |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Generate video using Sora-2 through Hugging Face Inference API with fal-ai provider.""" |
|
|
cleanup_temp_files() |
|
|
try: |
|
|
if api_key: |
|
|
temp_client = InferenceClient( |
|
|
provider="fal-ai", |
|
|
api_key=api_key, |
|
|
bill_to="huggingface", |
|
|
) |
|
|
else: |
|
|
temp_client = client |
|
|
if not os.environ.get("HF_TOKEN") and not api_key: |
|
|
return None, "β Please set HF_TOKEN environment variable." |
|
|
|
|
|
video_bytes = temp_client.text_to_video( |
|
|
prompt, |
|
|
model="akhaliq/sora-2", |
|
|
) |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
|
|
try: |
|
|
temp_file.write(video_bytes) |
|
|
temp_file.flush() |
|
|
video_path = temp_file.name |
|
|
finally: |
|
|
temp_file.close() |
|
|
|
|
|
return video_path, "β
Video generated successfully!" |
|
|
except Exception as e: |
|
|
return None, f"β Error generating video: {str(e)}" |
|
|
|
|
|
|
|
|
def generate_video_from_image( |
|
|
image: Union[str, bytes], |
|
|
prompt: str, |
|
|
api_key: Optional[str] = None |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Generate a video from a single input image + prompt using Sora-2 image-to-video.""" |
|
|
cleanup_temp_files() |
|
|
if not prompt or prompt.strip() == "": |
|
|
return None, "β Please enter a prompt" |
|
|
try: |
|
|
if api_key: |
|
|
temp_client = InferenceClient( |
|
|
provider="fal-ai", |
|
|
api_key=api_key, |
|
|
bill_to="huggingface", |
|
|
) |
|
|
else: |
|
|
temp_client = client |
|
|
if not os.environ.get("HF_TOKEN") and not api_key: |
|
|
return None, "β Please set HF_TOKEN environment variable." |
|
|
|
|
|
if isinstance(image, str): |
|
|
with open(image, "rb") as f: |
|
|
input_image = f.read() |
|
|
elif isinstance(image, (bytes, bytearray)): |
|
|
input_image = image |
|
|
else: |
|
|
return None, "β Invalid image input. Please upload an image." |
|
|
|
|
|
video_bytes = temp_client.image_to_video( |
|
|
input_image, |
|
|
prompt=prompt, |
|
|
model="akhaliq/sora-2-image-to-video", |
|
|
) |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
|
|
try: |
|
|
temp_file.write(video_bytes) |
|
|
temp_file.flush() |
|
|
video_path = temp_file.name |
|
|
finally: |
|
|
temp_file.close() |
|
|
|
|
|
return video_path, "β
Video generated from image successfully!" |
|
|
except Exception as e: |
|
|
return None, f"β Error generating video from image: {str(e)}" |
|
|
|
|
|
def generate_with_pro_auth( |
|
|
prompt: str, |
|
|
oauth_token: Optional[gr.OAuthToken] = None |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Wrapper function that checks if user is PRO before generating video.""" |
|
|
if not verify_pro_status(oauth_token): |
|
|
raise gr.Error("Access Denied. This app is exclusively for Hugging Face PRO users.") |
|
|
|
|
|
if not prompt or prompt.strip() == "": |
|
|
return None, "β Please enter a prompt" |
|
|
|
|
|
return generate_video( |
|
|
prompt, |
|
|
duration=8, |
|
|
size="1280x720", |
|
|
api_key=None |
|
|
) |
|
|
|
|
|
|
|
|
def generate_with_pro_auth_image( |
|
|
prompt: str, |
|
|
image_path: Optional[str] = None, |
|
|
oauth_token: Optional[gr.OAuthToken] = None |
|
|
) -> Tuple[Optional[str], str]: |
|
|
"""Checks PRO status then calls image->video generator.""" |
|
|
if not verify_pro_status(oauth_token): |
|
|
raise gr.Error("Access Denied. This app is exclusively for Hugging Face PRO users.") |
|
|
if not image_path: |
|
|
return None, "β Please upload an image" |
|
|
return generate_video_from_image(image=image_path, prompt=prompt, api_key=None) |
|
|
|
|
|
def simple_generate(prompt: str) -> Optional[str]: |
|
|
"""Simplified wrapper for examples that only returns video.""" |
|
|
if not prompt or prompt.strip() == "": |
|
|
return None |
|
|
video_path, _ = generate_video(prompt, duration=8, size="1280x720", api_key=None) |
|
|
return video_path |
|
|
|
|
|
def create_ui(): |
|
|
css = ''' |
|
|
.logo-dark{display: none} |
|
|
.dark .logo-dark{display: block !important} |
|
|
.dark .logo-light{display: none} |
|
|
#sub_title{margin-top: -20px !important} |
|
|
.pro-badge{ |
|
|
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
|
|
color: white; |
|
|
padding: 4px 12px; |
|
|
border-radius: 20px; |
|
|
font-size: 0.9em; |
|
|
font-weight: bold; |
|
|
display: inline-block; |
|
|
margin-left: 8px; |
|
|
} |
|
|
''' |
|
|
|
|
|
with gr.Blocks(title="Sora-2 Text-to-Video Generator", theme=gr.themes.Soft(), css=css) as demo: |
|
|
gr.HTML(""" |
|
|
<div style="text-align: center; max-width: 800px; margin: 0 auto;"> |
|
|
<h1 style="font-size: 2.5em; margin-bottom: 0.5em;"> |
|
|
π¬ Sora-2 Text-to-Video Generator |
|
|
<span class="pro-badge">PRO</span> |
|
|
</h1> |
|
|
<p style="font-size: 1.1em; color: #666; margin-bottom: 20px;">Generate stunning videos using OpenAI's Sora-2 model</p> |
|
|
<p id="sub_title" style="font-size: 1em; margin-top: 20px; margin-bottom: 15px;"> |
|
|
<strong>Exclusive access for Hugging Face PRO users.</strong> |
|
|
<a href="http://huggingface.co/subscribe/pro?source=sora2_video" target="_blank" style="color: #667eea;">Subscribe to PRO β</a> |
|
|
</p> |
|
|
<p style="font-size: 0.9em; color: #999; margin-top: 15px;"> |
|
|
Built with <a href="https://huggingface.co/spaces/akhaliq/anycoder" target="_blank" style="color: #667eea;">anycoder</a> |
|
|
</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
gr.LoginButton() |
|
|
pro_message = gr.Markdown(visible=False) |
|
|
main_interface = gr.Column(visible=False) |
|
|
|
|
|
with main_interface: |
|
|
gr.HTML("""<div style="text-align: center; margin: 20px 0;"> |
|
|
<p style="color: #28a745; font-weight: bold;">β¨ Welcome PRO User! You have full access to Sora-2.</p> |
|
|
</div>""") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
prompt_input = gr.Textbox( |
|
|
label="Enter your prompt", |
|
|
placeholder="Describe the video you want to create...", |
|
|
lines=4 |
|
|
) |
|
|
generate_btn = gr.Button("π₯ Generate Video", variant="primary", size="lg") |
|
|
with gr.Column(scale=1): |
|
|
video_output = gr.Video(label="Generated Video", height=400, interactive=False, show_download_button=True) |
|
|
status_output = gr.Textbox(label="Status", interactive=False, visible=True) |
|
|
|
|
|
generate_btn.click( |
|
|
fn=generate_with_pro_auth, |
|
|
inputs=[prompt_input], |
|
|
outputs=[video_output, status_output], |
|
|
queue=False |
|
|
) |
|
|
|
|
|
|
|
|
gr.HTML(""" |
|
|
<div style="text-align: center; margin: 40px 0 10px;"> |
|
|
<h3 style="margin-bottom: 8px;">πΌοΈ β π¬ Image β Video (beta)</h3> |
|
|
<p style="color:#666; margin:0;">Turn a single image into a short video with a guiding prompt.</p> |
|
|
</div> |
|
|
""") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
img_prompt_input = gr.Textbox( |
|
|
label="Describe how the scene should evolve", |
|
|
placeholder="e.g., The cat starts to dance and spins playfully", |
|
|
lines=3, |
|
|
) |
|
|
image_input = gr.Image(label="Upload an image", type="filepath") |
|
|
generate_img_btn = gr.Button("π₯ Generate from Image", variant="primary") |
|
|
with gr.Column(scale=1): |
|
|
video_output_img = gr.Video(label="Generated Video (from Image)", height=400, interactive=False, show_download_button=True) |
|
|
status_output_img = gr.Textbox(label="Status", interactive=False, visible=True) |
|
|
|
|
|
generate_img_btn.click( |
|
|
fn=generate_with_pro_auth_image, |
|
|
inputs=[img_prompt_input, image_input], |
|
|
outputs=[video_output_img, status_output_img], |
|
|
queue=False |
|
|
) |
|
|
|
|
|
gr.HTML("""<div style="text-align: center; margin-top: 40px; padding: 20px; border-top: 1px solid #e0e0e0;"> |
|
|
<h3 style="color: #667eea;">Thank you for being a PRO user! π€</h3> |
|
|
</div>""") |
|
|
|
|
|
def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None): |
|
|
if not profile: |
|
|
return gr.update(visible=False), gr.update(visible=False) |
|
|
if verify_pro_status(oauth_token): |
|
|
return gr.update(visible=True), gr.update(visible=False) |
|
|
else: |
|
|
message = "## β¨ Exclusive Access for PRO Users\n\nThis tool is available exclusively for Hugging Face **PRO** members." |
|
|
return gr.update(visible=False), gr.update(visible=True, value=message) |
|
|
|
|
|
demo.load(control_access, inputs=None, outputs=[main_interface, pro_message]) |
|
|
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
cleanup_temp_files() |
|
|
if os.path.exists("gradio_cached_examples"): |
|
|
shutil.rmtree("gradio_cached_examples", ignore_errors=True) |
|
|
except Exception as e: |
|
|
print(f"Initial cleanup error: {e}") |
|
|
|
|
|
app = create_ui() |
|
|
app.launch(show_api=False, enable_monitoring=False, quiet=True, max_threads=10) |
|
|
|