Spaces:
Running
on
Zero
Running
on
Zero
| import importlib | |
| import math | |
| import os | |
| import random | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| import torchsde | |
| from torchvision.utils import make_grid | |
| from tqdm.auto import trange | |
| from transformers import PretrainedConfig | |
| def seed_everything(seed): | |
| os.environ["PL_GLOBAL_SEED"] = str(seed) | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| torch.backends.cudnn.deterministic = True | |
| torch.backends.cudnn.benchmark = False | |
| def is_torch2_available(): | |
| return hasattr(F, "scaled_dot_product_attention") | |
| def instantiate_from_config(config): | |
| if "target" not in config: | |
| if config == '__is_first_stage__' or config == "__is_unconditional__": | |
| return None | |
| raise KeyError("Expected key `target` to instantiate.") | |
| return get_obj_from_str(config["target"])(**config.get("params", {})) | |
| def get_obj_from_str(string, reload=False): | |
| module, cls = string.rsplit(".", 1) | |
| if reload: | |
| module_imp = importlib.import_module(module) | |
| importlib.reload(module_imp) | |
| return getattr(importlib.import_module(module, package=None), cls) | |
| def drop_seq_token(seq, drop_rate=0.5): | |
| idx = torch.randperm(seq.size(1)) | |
| num_keep_tokens = int(len(idx) * (1 - drop_rate)) | |
| idx = idx[:num_keep_tokens] | |
| seq = seq[:, idx] | |
| return seq | |
| def import_model_class_from_model_name_or_path( | |
| pretrained_model_name_or_path: str, revision: str, subfolder: str = "text_encoder" | |
| ): | |
| text_encoder_config = PretrainedConfig.from_pretrained( | |
| pretrained_model_name_or_path, subfolder=subfolder, revision=revision | |
| ) | |
| model_class = text_encoder_config.architectures[0] | |
| if model_class == "CLIPTextModel": | |
| from transformers import CLIPTextModel | |
| return CLIPTextModel | |
| elif model_class == "CLIPTextModelWithProjection": # noqa RET505 | |
| from transformers import CLIPTextModelWithProjection | |
| return CLIPTextModelWithProjection | |
| else: | |
| raise ValueError(f"{model_class} is not supported.") | |
| def resize_numpy_image_long(image, resize_long_edge=768): | |
| h, w = image.shape[:2] | |
| if max(h, w) <= resize_long_edge: | |
| return image | |
| k = resize_long_edge / max(h, w) | |
| h = int(h * k) | |
| w = int(w * k) | |
| image = cv2.resize(image, (w, h), interpolation=cv2.INTER_LANCZOS4) | |
| return image | |
| # from basicsr | |
| def img2tensor(imgs, bgr2rgb=True, float32=True): | |
| """Numpy array to tensor. | |
| Args: | |
| imgs (list[ndarray] | ndarray): Input images. | |
| bgr2rgb (bool): Whether to change bgr to rgb. | |
| float32 (bool): Whether to change to float32. | |
| Returns: | |
| list[tensor] | tensor: Tensor images. If returned results only have | |
| one element, just return tensor. | |
| """ | |
| def _totensor(img, bgr2rgb, float32): | |
| if img.shape[2] == 3 and bgr2rgb: | |
| if img.dtype == 'float64': | |
| img = img.astype('float32') | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| img = torch.from_numpy(img.transpose(2, 0, 1)) | |
| if float32: | |
| img = img.float() | |
| return img | |
| if isinstance(imgs, list): | |
| return [_totensor(img, bgr2rgb, float32) for img in imgs] | |
| return _totensor(imgs, bgr2rgb, float32) | |
| def tensor2img(tensor, rgb2bgr=True, out_type=np.uint8, min_max=(0, 1)): | |
| """Convert torch Tensors into image numpy arrays. | |
| After clamping to [min, max], values will be normalized to [0, 1]. | |
| Args: | |
| tensor (Tensor or list[Tensor]): Accept shapes: | |
| 1) 4D mini-batch Tensor of shape (B x 3/1 x H x W); | |
| 2) 3D Tensor of shape (3/1 x H x W); | |
| 3) 2D Tensor of shape (H x W). | |
| Tensor channel should be in RGB order. | |
| rgb2bgr (bool): Whether to change rgb to bgr. | |
| out_type (numpy type): output types. If ``np.uint8``, transform outputs | |
| to uint8 type with range [0, 255]; otherwise, float type with | |
| range [0, 1]. Default: ``np.uint8``. | |
| min_max (tuple[int]): min and max values for clamp. | |
| Returns: | |
| (Tensor or list): 3D ndarray of shape (H x W x C) OR 2D ndarray of | |
| shape (H x W). The channel order is BGR. | |
| """ | |
| if not (torch.is_tensor(tensor) or (isinstance(tensor, list) and all(torch.is_tensor(t) for t in tensor))): | |
| raise TypeError(f'tensor or list of tensors expected, got {type(tensor)}') | |
| if torch.is_tensor(tensor): | |
| tensor = [tensor] | |
| result = [] | |
| for _tensor in tensor: | |
| _tensor = _tensor.squeeze(0).float().detach().cpu().clamp_(*min_max) | |
| _tensor = (_tensor - min_max[0]) / (min_max[1] - min_max[0]) | |
| n_dim = _tensor.dim() | |
| if n_dim == 4: | |
| img_np = make_grid(_tensor, nrow=int(math.sqrt(_tensor.size(0))), normalize=False).numpy() | |
| img_np = img_np.transpose(1, 2, 0) | |
| if rgb2bgr: | |
| img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) | |
| elif n_dim == 3: | |
| img_np = _tensor.numpy() | |
| img_np = img_np.transpose(1, 2, 0) | |
| if img_np.shape[2] == 1: # gray image | |
| img_np = np.squeeze(img_np, axis=2) | |
| else: | |
| if rgb2bgr: | |
| img_np = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) | |
| elif n_dim == 2: | |
| img_np = _tensor.numpy() | |
| else: | |
| raise TypeError(f'Only support 4D, 3D or 2D tensor. But received with dimension: {n_dim}') | |
| if out_type == np.uint8: | |
| # Unlike MATLAB, numpy.unit8() WILL NOT round by default. | |
| img_np = (img_np * 255.0).round() | |
| img_np = img_np.astype(out_type) | |
| result.append(img_np) | |
| if len(result) == 1: | |
| result = result[0] | |
| return result | |
| # We didn't find a correct configuration to make the diffusers scheduler align with dpm++2m (karras) in ComfyUI, | |
| # so we copied the ComfyUI code directly. | |
| def append_dims(x, target_dims): | |
| """Appends dimensions to the end of a tensor until it has target_dims dimensions.""" | |
| dims_to_append = target_dims - x.ndim | |
| if dims_to_append < 0: | |
| raise ValueError(f'input has {x.ndim} dims but target_dims is {target_dims}, which is less') | |
| expanded = x[(...,) + (None,) * dims_to_append] | |
| # MPS will get inf values if it tries to index into the new axes, but detaching fixes this. | |
| # https://github.com/pytorch/pytorch/issues/84364 | |
| return expanded.detach().clone() if expanded.device.type == 'mps' else expanded | |
| def to_d(x, sigma, denoised): | |
| """Converts a denoiser output to a Karras ODE derivative.""" | |
| return (x - denoised) / append_dims(sigma, x.ndim) | |
| def get_ancestral_step(sigma_from, sigma_to, eta=1.0): | |
| """Calculates the noise level (sigma_down) to step down to and the amount | |
| of noise to add (sigma_up) when doing an ancestral sampling step.""" | |
| if not eta: | |
| return sigma_to, 0.0 | |
| sigma_up = min(sigma_to, eta * (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5) | |
| sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 | |
| return sigma_down, sigma_up | |
| class BatchedBrownianTree: | |
| """A wrapper around torchsde.BrownianTree that enables batches of entropy.""" | |
| def __init__(self, x, t0, t1, seed=None, **kwargs): | |
| self.cpu_tree = True | |
| if "cpu" in kwargs: | |
| self.cpu_tree = kwargs.pop("cpu") | |
| t0, t1, self.sign = self.sort(t0, t1) | |
| w0 = kwargs.get('w0', torch.zeros_like(x)) | |
| if seed is None: | |
| seed = torch.randint(0, 2**63 - 1, []).item() | |
| self.batched = True | |
| try: | |
| assert len(seed) == x.shape[0] | |
| w0 = w0[0] | |
| except TypeError: | |
| seed = [seed] | |
| self.batched = False | |
| if self.cpu_tree: | |
| self.trees = [torchsde.BrownianTree(t0.cpu(), w0.cpu(), t1.cpu(), entropy=s, **kwargs) for s in seed] | |
| else: | |
| self.trees = [torchsde.BrownianTree(t0, w0, t1, entropy=s, **kwargs) for s in seed] | |
| def sort(a, b): | |
| return (a, b, 1) if a < b else (b, a, -1) | |
| def __call__(self, t0, t1): | |
| t0, t1, sign = self.sort(t0, t1) | |
| if self.cpu_tree: | |
| w = torch.stack( | |
| [tree(t0.cpu().float(), t1.cpu().float()).to(t0.dtype).to(t0.device) for tree in self.trees] | |
| ) * (self.sign * sign) | |
| else: | |
| w = torch.stack([tree(t0, t1) for tree in self.trees]) * (self.sign * sign) | |
| return w if self.batched else w[0] | |
| class BrownianTreeNoiseSampler: | |
| """A noise sampler backed by a torchsde.BrownianTree. | |
| Args: | |
| x (Tensor): The tensor whose shape, device and dtype to use to generate | |
| random samples. | |
| sigma_min (float): The low end of the valid interval. | |
| sigma_max (float): The high end of the valid interval. | |
| seed (int or List[int]): The random seed. If a list of seeds is | |
| supplied instead of a single integer, then the noise sampler will | |
| use one BrownianTree per batch item, each with its own seed. | |
| transform (callable): A function that maps sigma to the sampler's | |
| internal timestep. | |
| """ | |
| def __init__(self, x, sigma_min, sigma_max, seed=None, transform=lambda x: x, cpu=False): | |
| self.transform = transform | |
| t0, t1 = self.transform(torch.as_tensor(sigma_min)), self.transform(torch.as_tensor(sigma_max)) | |
| self.tree = BatchedBrownianTree(x, t0, t1, seed, cpu=cpu) | |
| def __call__(self, sigma, sigma_next): | |
| t0, t1 = self.transform(torch.as_tensor(sigma)), self.transform(torch.as_tensor(sigma_next)) | |
| return self.tree(t0, t1) / (t1 - t0).abs().sqrt() | |
| def sample_dpmpp_2m(model, x, sigmas, extra_args=None, callback=None, disable=None): | |
| """DPM-Solver++(2M).""" | |
| extra_args = {} if extra_args is None else extra_args | |
| s_in = x.new_ones([x.shape[0]]) | |
| sigma_fn = lambda t: t.neg().exp() | |
| t_fn = lambda sigma: sigma.log().neg() | |
| old_denoised = None | |
| for i in trange(len(sigmas) - 1, disable=disable): | |
| denoised = model(x, sigmas[i] * s_in, **extra_args) | |
| if callback is not None: | |
| callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised}) | |
| t, t_next = t_fn(sigmas[i]), t_fn(sigmas[i + 1]) | |
| h = t_next - t | |
| if old_denoised is None or sigmas[i + 1] == 0: | |
| x = (sigma_fn(t_next) / sigma_fn(t)) * x - (-h).expm1() * denoised | |
| else: | |
| h_last = t - t_fn(sigmas[i - 1]) | |
| r = h_last / h | |
| denoised_d = (1 + 1 / (2 * r)) * denoised - (1 / (2 * r)) * old_denoised | |
| x = (sigma_fn(t_next) / sigma_fn(t)) * x - (-h).expm1() * denoised_d | |
| old_denoised = denoised | |
| return x | |
| def sample_dpmpp_sde( | |
| model, x, sigmas, extra_args=None, callback=None, disable=None, eta=1.0, s_noise=1.0, noise_sampler=None, r=1 / 2 | |
| ): | |
| """DPM-Solver++ (stochastic).""" | |
| sigma_min, sigma_max = sigmas[sigmas > 0].min(), sigmas.max() | |
| seed = extra_args.get("seed", None) | |
| noise_sampler = ( | |
| BrownianTreeNoiseSampler(x, sigma_min, sigma_max, seed=seed, cpu=False) | |
| if noise_sampler is None | |
| else noise_sampler | |
| ) | |
| extra_args = {} if extra_args is None else extra_args | |
| s_in = x.new_ones([x.shape[0]]) | |
| sigma_fn = lambda t: t.neg().exp() | |
| t_fn = lambda sigma: sigma.log().neg() | |
| for i in trange(len(sigmas) - 1, disable=disable): | |
| denoised = model(x, sigmas[i] * s_in, **extra_args) | |
| if callback is not None: | |
| callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised}) | |
| if sigmas[i + 1] == 0: | |
| # Euler method | |
| d = to_d(x, sigmas[i], denoised) | |
| dt = sigmas[i + 1] - sigmas[i] | |
| x = x + d * dt | |
| else: | |
| # DPM-Solver++ | |
| t, t_next = t_fn(sigmas[i]), t_fn(sigmas[i + 1]) | |
| h = t_next - t | |
| s = t + h * r | |
| fac = 1 / (2 * r) | |
| # Step 1 | |
| sd, su = get_ancestral_step(sigma_fn(t), sigma_fn(s), eta) | |
| s_ = t_fn(sd) | |
| x_2 = (sigma_fn(s_) / sigma_fn(t)) * x - (t - s_).expm1() * denoised | |
| x_2 = x_2 + noise_sampler(sigma_fn(t), sigma_fn(s)) * s_noise * su | |
| denoised_2 = model(x_2, sigma_fn(s) * s_in, **extra_args) | |
| # Step 2 | |
| sd, su = get_ancestral_step(sigma_fn(t), sigma_fn(t_next), eta) | |
| t_next_ = t_fn(sd) | |
| denoised_d = (1 - fac) * denoised + fac * denoised_2 | |
| x = (sigma_fn(t_next_) / sigma_fn(t)) * x - (t - t_next_).expm1() * denoised_d | |
| x = x + noise_sampler(sigma_fn(t), sigma_fn(t_next)) * s_noise * su | |
| return x | |