# ---- Flags ---- run_api = False SSD_1B = False # ---- Standard imports ---- import os import subprocess import numpy as np from IPython.display import clear_output # ---- Minimal, deterministic env bootstrap (optional) ---- # Prefer pinning in requirements.txt instead of installing here. def check_environment(): try: import torch # noqa: F401 print("Environment is already installed.") except ImportError: print("Environment not found. Installing pinned dependencies...") # Strongly prefer doing this via requirements.txt at build time. os.system("pip install --upgrade pip") os.system("pip install diffusers==0.30.0 transformers>=4.41.0 accelerate>=0.31.0 huggingface_hub>=0.23.4 safetensors>=0.4.2 gradio==4.37.1 python-dotenv") clear_output() print("Environment installed successfully.") check_environment() # ---- App imports (safe after environment check) ---- import torch import gradio as gr from PIL import Image from diffusers import UNet2DConditionModel, DiffusionPipeline, LCMScheduler # Optional: only imported if SSD_1B=True # from diffusers import AutoPipelineForText2Image # ---- Config / constants ---- current_dir = os.getcwd() cache_path = os.path.join(current_dir, "cache") os.makedirs(cache_path, exist_ok=True) MAX_SEED = np.iinfo(np.int32).max MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "1024")) SECRET_TOKEN = os.getenv("SECRET_TOKEN", "default_secret") # ---- GPU / NVML detection (robust) ---- def print_nvidia_smi(): try: proc = subprocess.run(["nvidia-smi"], capture_output=True, text=True) if proc.returncode == 0: print(proc.stdout) else: # Show the stderr to aid debugging, but don't trust it for logic print(proc.stderr or "nvidia-smi returned a non-zero exit code.") except FileNotFoundError: print("nvidia-smi not found on PATH.") print_nvidia_smi() is_gpu = torch.cuda.is_available() print(f"CUDA available: {is_gpu}") # dtype & device dtype = torch.float16 if is_gpu else torch.float32 device = torch.device("cuda") if is_gpu else torch.device("cpu") # Optional: fewer surprises when CUDA is flaky if not is_gpu: # Avoid cuda-related env flags when no GPU os.environ.pop("CUDA_LAUNCH_BLOCKING", None) # ---- Pipeline setup ---- if not SSD_1B: # SDXL base + LCM UNet unet = UNet2DConditionModel.from_pretrained( "latent-consistency/lcm-sdxl", torch_dtype=dtype, variant="fp16" if is_gpu else None, cache_dir=cache_path, ) pipe = DiffusionPipeline.from_pretrained( "stabilityai/stable-diffusion-xl-base-1.0", unet=unet, torch_dtype=dtype, variant="fp16" if is_gpu else None, cache_dir=cache_path, ) pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config) pipe.to(device) else: # SSD-1B + LCM LoRA from diffusers import AutoPipelineForText2Image # local import pipe = AutoPipelineForText2Image.from_pretrained( "segmind/SSD-1B", torch_dtype=dtype, variant="fp16" if is_gpu else None, cache_dir=cache_path, ) pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config) pipe.to(device) pipe.load_lora_weights("latent-consistency/lcm-lora-ssd-1b") pipe.fuse_lora() # ---- Core generate function ---- def generate( prompt: str, negative_prompt: str = "", seed: int = 0, width: int = 1024, height: int = 1024, guidance_scale: float = 0.0, num_inference_steps: int = 4, secret_token: str = "", ) -> Image.Image: if secret_token != SECRET_TOKEN: raise gr.Error("Invalid secret token. Set SECRET_TOKEN on the server or pass the correct token.") # Make sure sizes are sane on CPU width = int(np.clip(width, 256, MAX_IMAGE_SIZE)) height = int(np.clip(height, 256, MAX_IMAGE_SIZE)) generator = torch.Generator(device=device) if seed is not None: generator = generator.manual_seed(int(seed)) out = pipe( prompt=prompt, negative_prompt=negative_prompt, width=width, height=height, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, generator=generator, output_type="pil", ) return out.images[0] clear_output() # ---- Optional notebook helper ---- def generate_image(prompt="A scenic watercolor landscape, mountains at dawn"): img = generate( prompt=prompt, negative_prompt="", seed=0, width=1024, height=1024, guidance_scale=0.0, num_inference_steps=4, secret_token=SECRET_TOKEN, ) from IPython.display import display display(img) # ---- UI ---- if not run_api: secret_token = gr.Textbox( label="Secret Token", placeholder="Enter your secret token", type="password", ) prompt = gr.Textbox( label="Prompt", show_label=True, max_lines=2, placeholder="Enter your prompt", ) negative_prompt = gr.Textbox( label="Negative prompt", max_lines=2, placeholder="Enter a negative prompt (optional)", ) seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0) width = gr.Slider(label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) height = gr.Slider(label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) guidance_scale = gr.Slider(label="Guidance scale", minimum=0, maximum=2, step=0.1, value=0.0) num_inference_steps = gr.Slider(label="Inference steps", minimum=1, maximum=8, step=1, value=4) iface = gr.Interface( fn=generate, inputs=[prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps, secret_token], outputs=gr.Image(label="Result"), title="Image Generator (LCM)", description="Fast SDXL/SSD-1B image generation with LCM. Uses CPU if CUDA is unavailable.", ) iface.launch() if run_api: with gr.Blocks() as demo: gr.Markdown( "### REST API for LCM Text-to-Image\n" "Use the `/run` endpoint programmatically with your secret." ) secret_token = gr.Textbox(label="Secret Token", type="password") prompt = gr.Textbox(label="Prompt") negative_prompt = gr.Textbox(label="Negative prompt") seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0) width = gr.Slider(label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) height = gr.Slider(label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024) guidance_scale = gr.Slider(label="Guidance scale", minimum=0, maximum=2, step=0.1, value=0.0) num_inference_steps = gr.Slider(label="Inference steps", minimum=1, maximum=8, step=1, value=4) inputs = [prompt, negative_prompt, seed, width, height, guidance_scale, num_inference_steps, secret_token] prompt.submit(fn=generate, inputs=inputs, outputs=gr.Image(), api_name="run") demo.queue(max_size=32).launch(debug=False)