Spaces:
Running
on
A10G
Running
on
A10G
| import logging | |
| from typing import List, Tuple, Dict | |
| import streamlit as st | |
| import torch | |
| import gc | |
| import time | |
| import numpy as np | |
| from PIL import Image | |
| from time import perf_counter | |
| from contextlib import contextmanager | |
| from scipy.signal import fftconvolve | |
| from PIL import ImageFilter | |
| from diffusers import ControlNetModel, UniPCMultistepScheduler | |
| from diffusers import StableDiffusionInpaintPipeline | |
| from config import WIDTH, HEIGHT | |
| from stable_diffusion_controlnet_inpaint_img2img import StableDiffusionControlNetInpaintImg2ImgPipeline | |
| from helpers import flush | |
| LOGGING = logging.getLogger(__name__) | |
| class ControlNetPipeline: | |
| def __init__(self): | |
| self.in_use = False | |
| self.controlnet = ControlNetModel.from_pretrained( | |
| "BertChristiaens/controlnet-seg-room", torch_dtype=torch.float16) | |
| self.pipe = StableDiffusionControlNetInpaintImg2ImgPipeline.from_pretrained( | |
| "runwayml/stable-diffusion-inpainting", | |
| controlnet=self.controlnet, | |
| safety_checker=None, | |
| torch_dtype=torch.float16 | |
| ) | |
| self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config) | |
| self.pipe.enable_xformers_memory_efficient_attention() | |
| self.pipe = self.pipe.to("cuda") | |
| self.waiting_queue = [] | |
| self.count = 0 | |
| def queue_size(self): | |
| return len(self.waiting_queue) | |
| def __call__(self, **kwargs): | |
| self.count += 1 | |
| number = self.count | |
| self.waiting_queue.append(number) | |
| # wait until the next number in the queue is the current number | |
| while self.waiting_queue[0] != number: | |
| print(f"Wait for your turn {number} in queue {self.waiting_queue}") | |
| time.sleep(0.5) | |
| pass | |
| # it's your turn, so remove the number from the queue | |
| # and call the function | |
| print("It's the turn of", self.count) | |
| results = self.pipe(**kwargs) | |
| self.waiting_queue.pop(0) | |
| flush() | |
| return results | |
| class SDPipeline: | |
| def __init__(self): | |
| self.pipe = StableDiffusionInpaintPipeline.from_pretrained( | |
| "stabilityai/stable-diffusion-2-inpainting", | |
| torch_dtype=torch.float16, | |
| safety_checker=None, | |
| ) | |
| self.pipe.enable_xformers_memory_efficient_attention() | |
| self.pipe = self.pipe.to("cuda") | |
| self.waiting_queue = [] | |
| self.count = 0 | |
| def queue_size(self): | |
| return len(self.waiting_queue) | |
| def __call__(self, **kwargs): | |
| self.count += 1 | |
| number = self.count | |
| self.waiting_queue.append(number) | |
| # wait until the next number in the queue is the current number | |
| while self.waiting_queue[0] != number: | |
| print(f"Wait for your turn {number} in queue {self.waiting_queue}") | |
| time.sleep(0.5) | |
| pass | |
| # it's your turn, so remove the number from the queue | |
| # and call the function | |
| print("It's the turn of", self.count) | |
| results = self.pipe(**kwargs) | |
| self.waiting_queue.pop(0) | |
| flush() | |
| return results | |
| def get_controlnet(): | |
| """Method to load the controlnet model | |
| Returns: | |
| ControlNetModel: controlnet model | |
| """ | |
| pipe = ControlNetPipeline() | |
| return pipe | |
| def get_inpainting_pipeline(): | |
| """Method to load the inpainting pipeline | |
| Returns: | |
| StableDiffusionInpaintPipeline: inpainting pipeline | |
| """ | |
| pipe = SDPipeline() | |
| return pipe | |