Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """Copy of compose_glide.ipynb | |
| Automatically generated by Colaboratory. | |
| Original file is located at | |
| https://colab.research.google.com/drive/19xx6Nu4FeiGj-TzTUFxBf-15IkeuFx_F | |
| """ | |
| # from PIL import Image | |
| # from IPython.display import display | |
| import torch as th | |
| import numpy as np | |
| from glide_text2im.download import load_checkpoint | |
| from glide_text2im.model_creation import ( | |
| create_model_and_diffusion, | |
| model_and_diffusion_defaults, | |
| model_and_diffusion_defaults_upsampler | |
| ) | |
| from composable_diffusion.download import download_model | |
| from composable_diffusion.model_creation import create_model_and_diffusion as create_model_and_diffusion_for_clevr | |
| from composable_diffusion.model_creation import model_and_diffusion_defaults as model_and_diffusion_defaults_for_clevr | |
| from PIL import Image, ImageDraw, ImageFont | |
| from torch import autocast | |
| from diffusers import StableDiffusionPipeline | |
| # This notebook supports both CPU and GPU. | |
| # On CPU, generating one sample may take on the order of 20 minutes. | |
| # On a GPU, it should be under a minute. | |
| has_cuda = False | |
| device = th.device('cpu' if not th.cuda.is_available() else 'cuda') | |
| cpu = th.device('cpu') | |
| # iniatilize stable diffusion model | |
| pipe = StableDiffusionPipeline.from_pretrained( | |
| "CompVis/stable-diffusion-v1-4", | |
| use_auth_token='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' | |
| ).to(cpu) | |
| # Create base model. | |
| timestep_respacing = 100 # @param{type: 'number'} | |
| options = model_and_diffusion_defaults() | |
| options['use_fp16'] = has_cuda | |
| options['timestep_respacing'] = str(timestep_respacing) # use 100 diffusion steps for fast sampling | |
| model, diffusion = create_model_and_diffusion(**options) | |
| model.eval() | |
| if has_cuda: | |
| model.convert_to_fp16() | |
| model.to(cpu) | |
| model.load_state_dict(load_checkpoint('base', cpu)) | |
| print('total base parameters', sum(x.numel() for x in model.parameters())) | |
| # Create upsampler model. | |
| options_up = model_and_diffusion_defaults_upsampler() | |
| options_up['use_fp16'] = has_cuda | |
| options_up['timestep_respacing'] = 'fast27' # use 27 diffusion steps for very fast sampling | |
| model_up, diffusion_up = create_model_and_diffusion(**options_up) | |
| model_up.eval() | |
| if has_cuda: | |
| model_up.convert_to_fp16() | |
| model_up.to(cpu) | |
| model_up.load_state_dict(load_checkpoint('upsample', cpu)) | |
| print('total upsampler parameters', sum(x.numel() for x in model_up.parameters())) | |
| def show_images(batch: th.Tensor): | |
| """ Display a batch of images inline. """ | |
| scaled = ((batch + 1) * 127.5).round().clamp(0, 255).to(th.uint8).cpu() | |
| reshaped = scaled.permute(2, 0, 3, 1).reshape([batch.shape[2], -1, 3]) | |
| display(Image.fromarray(reshaped.numpy())) | |
| def compose_language_descriptions(prompt, guidance_scale, steps): | |
| options['timestep_respacing'] = str(steps) | |
| _, diffusion = create_model_and_diffusion(**options) | |
| # @markdown `prompt`: when composing multiple sentences, using `|` as the delimiter. | |
| prompts = [x.strip() for x in prompt.split('|')] | |
| batch_size = 1 | |
| # Tune this parameter to control the sharpness of 256x256 images. | |
| # A value of 1.0 is sharper, but sometimes results in grainy artifacts. | |
| upsample_temp = 0.980 # @param{type: 'number'} | |
| masks = [True] * len(prompts) + [False] | |
| # coefficients = th.tensor([0.5, 0.5], device=device).reshape(-1, 1, 1, 1) | |
| masks = th.tensor(masks, dtype=th.bool, device=device) | |
| # sampling function | |
| def model_fn(x_t, ts, **kwargs): | |
| half = x_t[:1] | |
| combined = th.cat([half] * x_t.size(0), dim=0) | |
| model_out = model(combined, ts, **kwargs) | |
| eps, rest = model_out[:, :3], model_out[:, 3:] | |
| cond_eps = eps[masks].mean(dim=0, keepdim=True) | |
| # cond_eps = (coefficients * eps[masks]).sum(dim=0)[None] | |
| uncond_eps = eps[~masks].mean(dim=0, keepdim=True) | |
| half_eps = uncond_eps + guidance_scale * (cond_eps - uncond_eps) | |
| eps = th.cat([half_eps] * x_t.size(0), dim=0) | |
| return th.cat([eps, rest], dim=1) | |
| ############################## | |
| # Sample from the base model # | |
| ############################## | |
| # Create the text tokens to feed to the model. | |
| def sample_64(prompts): | |
| tokens_list = [model.tokenizer.encode(prompt) for prompt in prompts] | |
| outputs = [model.tokenizer.padded_tokens_and_mask( | |
| tokens, options['text_ctx'] | |
| ) for tokens in tokens_list] | |
| cond_tokens, cond_masks = zip(*outputs) | |
| cond_tokens, cond_masks = list(cond_tokens), list(cond_masks) | |
| full_batch_size = batch_size * (len(prompts) + 1) | |
| uncond_tokens, uncond_mask = model.tokenizer.padded_tokens_and_mask( | |
| [], options['text_ctx'] | |
| ) | |
| # Pack the tokens together into model kwargs. | |
| model_kwargs = dict( | |
| tokens=th.tensor( | |
| cond_tokens + [uncond_tokens], device=device | |
| ), | |
| mask=th.tensor( | |
| cond_masks + [uncond_mask], | |
| dtype=th.bool, | |
| device=device, | |
| ), | |
| ) | |
| # Sample from the base model. | |
| model.del_cache() | |
| samples = diffusion.p_sample_loop( | |
| model_fn, | |
| (full_batch_size, 3, options["image_size"], options["image_size"]), | |
| device=device, | |
| clip_denoised=True, | |
| progress=True, | |
| model_kwargs=model_kwargs, | |
| cond_fn=None, | |
| )[:batch_size] | |
| model.del_cache() | |
| # Show the output | |
| return samples | |
| ############################## | |
| # Upsample the 64x64 samples # | |
| ############################## | |
| def upsampling_256(prompts, samples): | |
| tokens = model_up.tokenizer.encode("".join(prompts)) | |
| tokens, mask = model_up.tokenizer.padded_tokens_and_mask( | |
| tokens, options_up['text_ctx'] | |
| ) | |
| # Create the model conditioning dict. | |
| model_kwargs = dict( | |
| # Low-res image to upsample. | |
| low_res=((samples + 1) * 127.5).round() / 127.5 - 1, | |
| # Text tokens | |
| tokens=th.tensor( | |
| [tokens] * batch_size, device=device | |
| ), | |
| mask=th.tensor( | |
| [mask] * batch_size, | |
| dtype=th.bool, | |
| device=device, | |
| ), | |
| ) | |
| # Sample from the base model. | |
| model_up.del_cache() | |
| up_shape = (batch_size, 3, options_up["image_size"], options_up["image_size"]) | |
| up_samples = diffusion_up.ddim_sample_loop( | |
| model_up, | |
| up_shape, | |
| noise=th.randn(up_shape, device=device) * upsample_temp, | |
| device=device, | |
| clip_denoised=True, | |
| progress=True, | |
| model_kwargs=model_kwargs, | |
| cond_fn=None, | |
| )[:batch_size] | |
| model_up.del_cache() | |
| # Show the output | |
| return up_samples | |
| # sampling 64x64 images | |
| samples = sample_64(prompts) | |
| # show_images(samples) | |
| # upsample from 64x64 to 256x256 | |
| upsamples = upsampling_256(prompts, samples) | |
| # show_images(upsamples) | |
| out_img = upsamples[0].permute(1, 2, 0) | |
| out_img = (out_img + 1) / 2 | |
| out_img = (out_img.detach().cpu() * 255.).to(th.uint8) | |
| out_img = out_img.numpy() | |
| return out_img | |
| # create model for CLEVR Objects | |
| clevr_options = model_and_diffusion_defaults_for_clevr() | |
| flags = { | |
| "image_size": 128, | |
| "num_channels": 192, | |
| "num_res_blocks": 2, | |
| "learn_sigma": True, | |
| "use_scale_shift_norm": False, | |
| "raw_unet": True, | |
| "noise_schedule": "squaredcos_cap_v2", | |
| "rescale_learned_sigmas": False, | |
| "rescale_timesteps": False, | |
| "num_classes": '2', | |
| "dataset": "clevr_pos", | |
| "use_fp16": has_cuda, | |
| "timestep_respacing": '100' | |
| } | |
| for key, val in flags.items(): | |
| clevr_options[key] = val | |
| clevr_model, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) | |
| clevr_model.eval() | |
| if has_cuda: | |
| clevr_model.convert_to_fp16() | |
| clevr_model.to(th.device('cpu')) | |
| clevr_model.load_state_dict(th.load(download_model('clevr_pos'), th.device('cpu'))) | |
| print('total clevr_pos parameters', sum(x.numel() for x in clevr_model.parameters())) | |
| def compose_clevr_objects(prompt, guidance_scale, steps): | |
| coordinates = [[float(x.split(',')[0].strip()), float(x.split(',')[1].strip())] | |
| for x in prompt.split('|')] | |
| coordinates += [[-1, -1]] # add unconditional score label | |
| batch_size = 1 | |
| clevr_options['timestep_respacing'] = str(int(steps)) | |
| _, clevr_diffusion = create_model_and_diffusion_for_clevr(**clevr_options) | |
| def model_fn(x_t, ts, **kwargs): | |
| half = x_t[:1] | |
| combined = th.cat([half] * kwargs['y'].size(0), dim=0) | |
| model_out = clevr_model(combined, ts, **kwargs) | |
| eps, rest = model_out[:, :3], model_out[:, 3:] | |
| masks = kwargs.get('masks') | |
| cond_eps = eps[masks].mean(dim=0, keepdim=True) | |
| uncond_eps = eps[~masks].mean(dim=0, keepdim=True) | |
| half_eps = uncond_eps + guidance_scale * (cond_eps - uncond_eps) | |
| eps = th.cat([half_eps] * x_t.size(0), dim=0) | |
| return th.cat([eps, rest], dim=1) | |
| def sample(coordinates): | |
| masks = [True] * (len(coordinates) - 1) + [False] | |
| model_kwargs = dict( | |
| y=th.tensor(coordinates, dtype=th.float, device=device), | |
| masks=th.tensor(masks, dtype=th.bool, device=device) | |
| ) | |
| samples = clevr_diffusion.p_sample_loop( | |
| model_fn, | |
| (len(coordinates), 3, clevr_options["image_size"], clevr_options["image_size"]), | |
| device=device, | |
| clip_denoised=True, | |
| progress=True, | |
| model_kwargs=model_kwargs, | |
| cond_fn=None, | |
| )[:batch_size] | |
| return samples | |
| samples = sample(coordinates) | |
| out_img = samples[0].permute(1, 2, 0) | |
| out_img = (out_img + 1) / 2 | |
| out_img = (out_img.detach().cpu() * 255.).to(th.uint8) | |
| out_img = out_img.numpy() | |
| return out_img | |
| def stable_diffusion_compose(prompt, scale, steps): | |
| with autocast('cpu' if not th.cuda.is_available() else 'cuda'): | |
| image = pipe(prompt, guidance_scale=scale, num_inference_steps=steps)["sample"][0] | |
| return image | |
| def compose(prompt, version, guidance_scale, steps): | |
| try: | |
| with th.no_grad(): | |
| if version == 'GLIDE': | |
| clevr_model.to(cpu) | |
| pipe.to(cpu) | |
| model.to(device) | |
| model_up.to(device) | |
| return compose_language_descriptions(prompt, guidance_scale, steps) | |
| elif version == 'Stable_Diffusion_1v_4': | |
| clevr_model.to(cpu) | |
| model.to(cpu) | |
| model_up.to(cpu) | |
| pipe.to(device) | |
| return stable_diffusion_compose(prompt, guidance_scale, steps) | |
| else: | |
| pipe.to(cpu) | |
| model.to(cpu) | |
| model_up.to(cpu) | |
| clevr_model.to(device) | |
| # simple check | |
| is_text = True | |
| for char in prompt: | |
| if char.isdigit(): | |
| is_text = False | |
| break | |
| if is_text: | |
| img = Image.new('RGB', (512, 512), color=(255, 255, 255)) | |
| d = ImageDraw.Draw(img) | |
| font = ImageFont.load_default() | |
| d.text((0, 256), "input should be similar to the example using 2D coordinates.", fill=(0, 0, 0), font=font) | |
| return img | |
| else: | |
| return compose_clevr_objects(prompt, guidance_scale, steps) | |
| except Exception as e: | |
| print(e) | |
| return None | |
| examples_1 = 'a camel | a forest' | |
| examples_2 = 'A blue sky | A mountain in the horizon | Cherry Blossoms in front of the mountain' | |
| examples_3 = '0.1, 0.5 | 0.3, 0.5 | 0.5, 0.5 | 0.7, 0.5 | 0.9, 0.5' | |
| examples_4 = 'a blue house | a desert' | |
| examples_5 = 'a white church | lightning in the background' | |
| examples_6 = 'a camel | arctic' | |
| examples_7 = 'A lake | A mountain | Cherry Blossoms next to the lake' | |
| examples = [ | |
| [examples_7, 'Stable_Diffusion_1v_4', 15, 50], | |
| [examples_5, 'Stable_Diffusion_1v_4', 15, 50], | |
| [examples_4, 'Stable_Diffusion_1v_4', 15, 50], | |
| [examples_6, 'Stable_Diffusion_1v_4', 15, 50], | |
| [examples_1, 'GLIDE', 15, 100], | |
| [examples_2, 'GLIDE', 15, 100], | |
| [examples_3, 'CLEVR Objects', 10, 100] | |
| ] | |
| import gradio as gr | |
| title = 'Compositional Visual Generation with Composable Diffusion Models' | |
| description = '<p>Demo for Composable Diffusion<ul><li>~30s per GLIDE/Stable-Diffusion example</li><li>~10s per CLEVR Object example</li>(<b>Note</b>: time is varied depending on what gpu is used.)</ul></p><p>See more information from our <a href="https://energy-based-model.github.io/Compositional-Visual-Generation-with-Composable-Diffusion-Models/">Project Page</a>.</p><ul><li>One version is based on the released <a href="https://github.com/openai/glide-text2im">GLIDE</a> and <a href="https://github.com/CompVis/stable-diffusion/">Stable Diffusion</a> for composing natural language description.</li><li>Another is based on our pre-trained CLEVR Object Model for composing objects. <br>(<b>Note</b>: We recommend using <b><i>x</i></b> in range <b><i>[0.1, 0.9]</i></b> and <b><i>y</i></b> in range <b><i>[0.25, 0.7]</i></b>, since the training dataset labels are in given ranges.)</li></ul><p>When composing multiple sentences, use `|` as the delimiter, see given examples below.</p><p><b>Note</b>: When using Stable Diffusion, black images will be returned if the given prompt is detected as problematic.</p>' | |
| iface = gr.Interface(compose, | |
| inputs=[ | |
| "text", | |
| gr.Radio(['Stable_Diffusion_1v_4', 'GLIDE', 'CLEVR Objects'], type="value", label='version'), | |
| gr.Slider(2, 30), | |
| gr.Slider(10, 200) | |
| ], | |
| outputs='image', cache_examples=False, | |
| title=title, description=description, examples=examples) | |
| iface.launch(enable_queue=True, show_error=True) | |