Spaces:
Running
on
Zero
Running
on
Zero
| # ---- Flags ---- | |
| run_api = False | |
| SSD_1B = False | |
| # ---- Standard imports ---- | |
| import os | |
| import subprocess | |
| import numpy as np | |
| from IPython.display import clear_output | |
| # ---- Minimal, deterministic env bootstrap (optional) ---- | |
| # Prefer pinning in requirements.txt instead of installing here. | |
| def check_environment(): | |
| try: | |
| import torch # noqa: F401 | |
| print("Environment is already installed.") | |
| except ImportError: | |
| print("Environment not found. Installing pinned dependencies...") | |
| # Strongly prefer doing this via requirements.txt at build time. | |
| os.system("pip install --upgrade pip") | |
| os.system("pip install diffusers==0.30.0 transformers>=4.41.0 accelerate>=0.31.0 huggingface_hub>=0.23.4 safetensors>=0.4.2 gradio==4.37.1 python-dotenv") | |
| clear_output() | |
| print("Environment installed successfully.") | |
| check_environment() | |
| # ---- App imports (safe after environment check) ---- | |
| import torch | |
| import gradio as gr | |
| from PIL import Image | |
| from diffusers import UNet2DConditionModel, DiffusionPipeline, LCMScheduler | |
| # Optional: only imported if SSD_1B=True | |
| # from diffusers import AutoPipelineForText2Image | |
| # ---- Config / constants ---- | |
| current_dir = os.getcwd() | |
| cache_path = os.path.join(current_dir, "cache") | |
| os.makedirs(cache_path, exist_ok=True) | |
| MAX_SEED = np.iinfo(np.int32).max | |
| MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "1024")) | |
| SECRET_TOKEN = os.getenv("SECRET_TOKEN", "default_secret") | |
| # ---- GPU / NVML detection (robust) ---- | |
| def print_nvidia_smi(): | |
| try: | |
| proc = subprocess.run(["nvidia-smi"], capture_output=True, text=True) | |
| if proc.returncode == 0: | |
| print(proc.stdout) | |
| else: | |
| # Show the stderr to aid debugging, but don't trust it for logic | |
| print(proc.stderr or "nvidia-smi returned a non-zero exit code.") | |
| except FileNotFoundError: | |
| print("nvidia-smi not found on PATH.") | |
| print_nvidia_smi() | |
| is_gpu = torch.cuda.is_available() | |
| print(f"CUDA available: {is_gpu}") | |
| # dtype & device | |
| dtype = torch.float16 if is_gpu else torch.float32 | |
| device = torch.device("cuda") if is_gpu else torch.device("cpu") | |
| # Optional: fewer surprises when CUDA is flaky | |
| if not is_gpu: | |
| # Avoid cuda-related env flags when no GPU | |
| os.environ.pop("CUDA_LAUNCH_BLOCKING", None) | |
| # ---- Pipeline setup ---- | |
| if not SSD_1B: | |
| # SDXL base + LCM UNet | |
| unet = UNet2DConditionModel.from_pretrained( | |
| "latent-consistency/lcm-sdxl", | |
| torch_dtype=dtype, | |
| variant="fp16" if is_gpu else None, | |
| cache_dir=cache_path, | |
| ) | |
| pipe = DiffusionPipeline.from_pretrained( | |
| "stabilityai/stable-diffusion-xl-base-1.0", | |
| unet=unet, | |
| torch_dtype=dtype, | |
| variant="fp16" if is_gpu else None, | |
| cache_dir=cache_path, | |
| ) | |
| pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config) | |
| pipe.to(device) | |
| else: | |
| # SSD-1B + LCM LoRA | |
| from diffusers import AutoPipelineForText2Image # local import | |
| pipe = AutoPipelineForText2Image.from_pretrained( | |
| "segmind/SSD-1B", | |
| torch_dtype=dtype, | |
| variant="fp16" if is_gpu else None, | |
| cache_dir=cache_path, | |
| ) | |
| pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config) | |
| pipe.to(device) | |
| pipe.load_lora_weights("latent-consistency/lcm-lora-ssd-1b") | |
| pipe.fuse_lora() | |
| # ---- Core generate function ---- | |
| def generate( | |
| prompt: str, | |
| negative_prompt: str = "", | |
| seed: int = 0, | |
| width: int = 1024, | |
| height: int = 1024, | |
| guidance_scale: float = 0.0, | |
| num_inference_steps: int = 4, | |
| secret_token: str = "", | |
| ) -> Image.Image: | |
| if secret_token != SECRET_TOKEN: | |
| raise gr.Error("Invalid secret token. Set SECRET_TOKEN on the server or pass the correct token.") | |
| # Make sure sizes are sane on CPU | |
| width = int(np.clip(width, 256, MAX_IMAGE_SIZE)) | |
| height = int(np.clip(height, 256, MAX_IMAGE_SIZE)) | |
| generator = torch.Generator(device=device) | |
| if seed is not None: | |
| generator = generator.manual_seed(int(seed)) | |
| out = pipe( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| width=width, | |
| height=height, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| generator=generator, | |
| output_type="pil", | |
| ) | |
| return out.images[0] | |
| clear_output() | |
| # ---- Optional notebook helper ---- | |
| def generate_image(prompt="A scenic watercolor landscape, mountains at dawn"): | |
| img = generate( | |
| prompt=prompt, | |
| negative_prompt="", | |
| seed=0, | |
| width=1024, | |
| height=1024, | |
| guidance_scale=0.0, | |
| num_inference_steps=4, | |
| secret_token=SECRET_TOKEN, | |
| ) | |
| from IPython.display import display | |
| display(img) | |
| # ---- UI ---- | |
| if not run_api: | |
| secret_token = gr.Textbox( | |
| label="Secret Token", | |
| placeholder="Enter your secret token", | |
| type="password", | |
| ) | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| show_label=True, | |
| max_lines=2, | |
| placeholder="Enter your prompt", | |
| ) | |
| negative_prompt = gr.Textbox( | |
| label="Negative prompt", | |
| max_lines=2, | |
| placeholder="Enter a negative prompt (optional)", | |
| ) | |
| seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0) | |
| width = gr.Slider(label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) | |
| height = gr.Slider(label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) | |
| guidance_scale = gr.Slider(label="Guidance scale", minimum=0, maximum=2, step=0.1, value=0.0) | |
| num_inference_steps = gr.Slider(label="Inference steps", minimum=1, maximum=8, step=1, value=4) | |
| iface = gr.Interface( | |
| fn=generate, | |
| inputs=[prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps, secret_token], | |
| outputs=gr.Image(label="Result"), | |
| title="Image Generator (LCM)", | |
| description="Fast SDXL/SSD-1B image generation with LCM. Uses CPU if CUDA is unavailable.", | |
| ) | |
| iface.launch() | |
| if run_api: | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| "### REST API for LCM Text-to-Image\n" | |
| "Use the `/run` endpoint programmatically with your secret." | |
| ) | |
| secret_token = gr.Textbox(label="Secret Token", type="password") | |
| prompt = gr.Textbox(label="Prompt") | |
| negative_prompt = gr.Textbox(label="Negative prompt") | |
| seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0) | |
| width = gr.Slider(label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) | |
| height = gr.Slider(label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) | |
| guidance_scale = gr.Slider(label="Guidance scale", minimum=0, maximum=2, step=0.1, value=0.0) | |
| num_inference_steps = gr.Slider(label="Inference steps", minimum=1, maximum=8, step=1, value=4) | |
| inputs = [prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps, secret_token] | |
| prompt.submit(fn=generate, inputs=inputs, outputs=gr.Image(), api_name="run") | |
| demo.queue(max_size=32).launch(debug=False) | |