Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright (c) Alibaba, Inc. and its affiliates. | |
| import torch | |
| import torch.nn.functional as F | |
| import copy | |
| import math | |
| import random | |
| from contextlib import nullcontext | |
| from einops import rearrange | |
| from scepter.modules.model.network.ldm import LatentDiffusion | |
| from scepter.modules.model.registry import MODELS, DIFFUSIONS, BACKBONES, LOSSES, TOKENIZERS, EMBEDDERS | |
| from scepter.modules.model.utils.basic_utils import check_list_of_list, to_device, pack_imagelist_into_tensor, \ | |
| limit_batch_data, unpack_tensor_into_imagelist, count_params, disabled_train | |
| from scepter.modules.utils.config import dict_to_yaml | |
| from scepter.modules.utils.distribute import we | |
| class LatentDiffusionACEPlus(LatentDiffusion): | |
| para_dict = LatentDiffusion.para_dict | |
| def __init__(self, cfg, logger=None): | |
| super().__init__(cfg, logger=logger) | |
| self.guide_scale = cfg.get('GUIDE_SCALE', 1.0) | |
| def init_params(self): | |
| self.parameterization = self.cfg.get('PARAMETERIZATION', 'rf') | |
| assert self.parameterization in [ | |
| 'eps', 'x0', 'v', 'rf' | |
| ], 'currently only supporting "eps" and "x0" and "v" and "rf"' | |
| diffusion_cfg = self.cfg.get("DIFFUSION", None) | |
| assert diffusion_cfg is not None | |
| if self.cfg.have("WORK_DIR"): | |
| diffusion_cfg.WORK_DIR = self.cfg.WORK_DIR | |
| self.diffusion = DIFFUSIONS.build(diffusion_cfg, logger=self.logger) | |
| self.pretrained_model = self.cfg.get('PRETRAINED_MODEL', None) | |
| self.ignore_keys = self.cfg.get('IGNORE_KEYS', []) | |
| self.model_config = self.cfg.DIFFUSION_MODEL | |
| self.first_stage_config = self.cfg.FIRST_STAGE_MODEL | |
| self.cond_stage_config = self.cfg.COND_STAGE_MODEL | |
| self.tokenizer_config = self.cfg.get('TOKENIZER', None) | |
| self.loss_config = self.cfg.get('LOSS', None) | |
| self.scale_factor = self.cfg.get('SCALE_FACTOR', 0.18215) | |
| self.size_factor = self.cfg.get('SIZE_FACTOR', 16) | |
| self.default_n_prompt = self.cfg.get('DEFAULT_N_PROMPT', '') | |
| self.default_n_prompt = '' if self.default_n_prompt is None else self.default_n_prompt | |
| self.p_zero = self.cfg.get('P_ZERO', 0.0) | |
| self.train_n_prompt = self.cfg.get('TRAIN_N_PROMPT', '') | |
| if self.default_n_prompt is None: | |
| self.default_n_prompt = '' | |
| if self.train_n_prompt is None: | |
| self.train_n_prompt = '' | |
| self.use_ema = self.cfg.get('USE_EMA', False) | |
| self.model_ema_config = self.cfg.get('DIFFUSION_MODEL_EMA', None) | |
| def construct_network(self): | |
| # embedding_context = torch.device("meta") if self.model_config.get("PRETRAINED_MODEL", None) else nullcontext() | |
| # with embedding_context: | |
| self.model = BACKBONES.build(self.model_config, logger=self.logger).to(torch.bfloat16) | |
| self.logger.info('all parameters:{}'.format(count_params(self.model))) | |
| if self.use_ema: | |
| if self.model_ema_config: | |
| self.model_ema = BACKBONES.build(self.model_ema_config, | |
| logger=self.logger) | |
| else: | |
| self.model_ema = copy.deepcopy(self.model) | |
| self.model_ema = self.model_ema.eval() | |
| for param in self.model_ema.parameters(): | |
| param.requires_grad = False | |
| if self.loss_config: | |
| self.loss = LOSSES.build(self.loss_config, logger=self.logger) | |
| if self.tokenizer_config is not None: | |
| self.tokenizer = TOKENIZERS.build(self.tokenizer_config, | |
| logger=self.logger) | |
| if self.first_stage_config: | |
| self.first_stage_model = MODELS.build(self.first_stage_config, | |
| logger=self.logger) | |
| self.first_stage_model = self.first_stage_model.eval() | |
| self.first_stage_model.train = disabled_train | |
| for param in self.first_stage_model.parameters(): | |
| param.requires_grad = False | |
| else: | |
| self.first_stage_model = None | |
| if self.tokenizer_config is not None: | |
| self.cond_stage_config.KWARGS = { | |
| 'vocab_size': self.tokenizer.vocab_size | |
| } | |
| if self.cond_stage_config == '__is_unconditional__': | |
| print( | |
| f'Training {self.__class__.__name__} as an unconditional model.' | |
| ) | |
| self.cond_stage_model = None | |
| else: | |
| model = EMBEDDERS.build(self.cond_stage_config, logger=self.logger) | |
| self.cond_stage_model = model.eval().requires_grad_(False) | |
| self.cond_stage_model.train = disabled_train | |
| def encode_first_stage(self, x, **kwargs): | |
| def run_one_image(u): | |
| zu = self.first_stage_model.encode(u) | |
| if isinstance(zu, (tuple, list)): | |
| zu = zu[0] | |
| return zu | |
| z = [run_one_image(u.unsqueeze(0) if u.dim() == 3 else u) for u in x] | |
| return z | |
| def decode_first_stage(self, z): | |
| return [self.first_stage_model.decode(zu) for zu in z] | |
| def noise_sample(self, num_samples, h, w, seed, dtype=torch.bfloat16): | |
| noise = torch.randn( | |
| num_samples, | |
| 16, | |
| # allow for packing | |
| 2 * math.ceil(h / 16), | |
| 2 * math.ceil(w / 16), | |
| device=we.device_id, | |
| dtype=dtype, | |
| generator=torch.Generator(device=we.device_id).manual_seed(seed), | |
| ) | |
| return noise | |
| def resize_func(self, x, size): | |
| if x is None: return x | |
| return F.interpolate(x.unsqueeze(0), size = size, mode='nearest-exact') | |
| def parse_ref_and_edit(self, src_image, | |
| modify_image, | |
| src_image_mask, | |
| text_embedding, | |
| #text_mask, | |
| edit_id): | |
| edit_image = [] | |
| modi_image = [] | |
| edit_mask = [] | |
| ref_image = [] | |
| ref_mask = [] | |
| ref_context = [] | |
| ref_y = [] | |
| ref_id = [] | |
| txt = [] | |
| txt_y = [] | |
| for sample_id, (one_src, | |
| one_modify, | |
| one_src_mask, | |
| one_text_embedding, | |
| one_text_y, | |
| # one_text_mask, | |
| one_edit_id) in enumerate(zip(src_image, | |
| modify_image, | |
| src_image_mask, | |
| text_embedding["context"], | |
| text_embedding["y"], | |
| #text_mask, | |
| edit_id) | |
| ): | |
| ref_id.append([i for i in range(len(one_src))]) | |
| if hasattr(self, "ref_cond_stage_model") and self.ref_cond_stage_model: | |
| ref_image.append(self.ref_cond_stage_model.encode_list([((i + 1.0) / 2.0 * 255).type(torch.uint8) for i in one_src])) | |
| else: | |
| ref_image.append(one_src) | |
| ref_mask.append(one_src_mask) | |
| # process edit image & edit image mask | |
| current_edit_image = to_device([one_src[i] for i in one_edit_id], strict=False) | |
| current_edit_image = [v.squeeze(0) for v in self.encode_first_stage(current_edit_image)] | |
| # process modi image | |
| current_modify_image = to_device([one_modify[i] for i in one_edit_id], | |
| strict=False) | |
| current_modify_image = [ | |
| v.squeeze(0) | |
| for v in self.encode_first_stage(current_modify_image) | |
| ] | |
| current_edit_image_mask = to_device( | |
| [one_src_mask[i] for i in one_edit_id], strict=False) | |
| current_edit_image_mask = [ | |
| self.reshape_func(m).squeeze(0) | |
| for m in current_edit_image_mask | |
| ] | |
| edit_image.append(current_edit_image) | |
| modi_image.append(current_modify_image) | |
| edit_mask.append(current_edit_image_mask) | |
| ref_context.append(one_text_embedding[:len(ref_id[-1])]) | |
| ref_y.append(one_text_y[:len(ref_id[-1])]) | |
| if not sum(len(src_) for src_ in src_image) > 0: | |
| ref_image = None | |
| ref_context = None | |
| ref_y = None | |
| for sample_id, (one_text_embedding, one_text_y) in enumerate(zip(text_embedding["context"], | |
| text_embedding["y"])): | |
| txt.append(one_text_embedding[-1].squeeze(0)) | |
| txt_y.append(one_text_y[-1]) | |
| return { | |
| "edit": edit_image, | |
| 'modify': modi_image, | |
| "edit_mask": edit_mask, | |
| "edit_id": edit_id, | |
| "ref_context": ref_context, | |
| "ref_y": ref_y, | |
| "context": txt, | |
| "y": txt_y, | |
| "ref_x": ref_image, | |
| "ref_mask": ref_mask, | |
| "ref_id": ref_id | |
| } | |
| def reshape_func(self, mask): | |
| mask = mask.to(torch.bfloat16) | |
| mask = mask.view((-1, mask.shape[-2], mask.shape[-1])) | |
| mask = rearrange( | |
| mask, | |
| "c (h ph) (w pw) -> c (ph pw) h w", | |
| ph=8, | |
| pw=8, | |
| ) | |
| return mask | |
| def forward_train(self, | |
| src_image_list=[], | |
| modify_image_list=[], | |
| src_mask_list=[], | |
| edit_id=[], | |
| image=None, | |
| image_mask=None, | |
| noise=None, | |
| prompt=[], | |
| **kwargs): | |
| ''' | |
| Args: | |
| src_image: list of list of src_image | |
| src_image_mask: list of list of src_image_mask | |
| image: target image | |
| image_mask: target image mask | |
| noise: default is None, generate automaticly | |
| ref_prompt: list of list of text | |
| prompt: list of text | |
| **kwargs: | |
| Returns: | |
| ''' | |
| assert check_list_of_list(src_image_list) and check_list_of_list( | |
| src_mask_list) | |
| assert self.cond_stage_model is not None | |
| gc_seg = kwargs.pop("gc_seg", []) | |
| gc_seg = int(gc_seg[0]) if len(gc_seg) > 0 else 0 | |
| align = kwargs.pop("align", []) | |
| prompt_ = [[pp] if isinstance(pp, str) else pp for pp in prompt] | |
| if len(align) < 1: align = [0] * len(prompt_) | |
| context = getattr(self.cond_stage_model, 'encode_list_of_list')(prompt_) | |
| guide_scale = self.guide_scale | |
| if guide_scale is not None: | |
| guide_scale = torch.full((len(prompt_),), guide_scale, device=we.device_id) | |
| else: | |
| guide_scale = None | |
| # image and image_mask | |
| # print("is list of list", check_list_of_list(image)) | |
| if check_list_of_list(image): | |
| image = [to_device(ix) for ix in image] | |
| x_start = [self.encode_first_stage(ix, **kwargs) for ix in image] | |
| noise = [[torch.randn_like(ii) for ii in ix] for ix in x_start] | |
| x_start = [torch.cat(ix, dim=-1) for ix in x_start] | |
| noise = [torch.cat(ix, dim=-1) for ix in noise] | |
| noise, _ = pack_imagelist_into_tensor(noise) | |
| image_mask = [to_device(im, strict=False) for im in image_mask] | |
| x_mask = [[self.reshape_func(i).squeeze(0) for i in im] if im is not None else [None] * len(ix) for ix, im in zip(image, image_mask)] | |
| x_mask = [torch.cat(im, dim=-1) for im in x_mask] | |
| else: | |
| image = to_device(image) | |
| x_start = self.encode_first_stage(image, **kwargs) | |
| image_mask = to_device(image_mask, strict=False) | |
| x_mask = [self.reshape_func(i).squeeze(0) for i in image_mask] if image_mask is not None else [None] * len( | |
| image) | |
| loss_mask, _ = pack_imagelist_into_tensor( | |
| tuple(torch.ones_like(ix, dtype=torch.bool, device=ix.device) for ix in x_start)) | |
| x_start, x_shapes = pack_imagelist_into_tensor(x_start) | |
| context['x_shapes'] = x_shapes | |
| context['align'] = align | |
| # process image mask | |
| context['x_mask'] = x_mask | |
| ref_edit_context = self.parse_ref_and_edit(src_image_list, modify_image_list, src_mask_list, context, edit_id) | |
| context.update(ref_edit_context) | |
| teacher_context = copy.deepcopy(context) | |
| teacher_context["context"] = torch.cat(teacher_context["context"], dim=0) | |
| teacher_context["y"] = torch.cat(teacher_context["y"], dim=0) | |
| loss = self.diffusion.loss(x_0=x_start, | |
| model=self.model, | |
| model_kwargs={"cond": context, | |
| "gc_seg": gc_seg, | |
| "guidance": guide_scale}, | |
| noise=noise, | |
| reduction='none', | |
| **kwargs) | |
| loss = loss[loss_mask].mean() | |
| ret = {'loss': loss, 'probe_data': {'prompt': prompt}} | |
| return ret | |
| def forward_test(self, | |
| src_image_list=[], | |
| modify_image_list=[], | |
| src_mask_list=[], | |
| edit_id=[], | |
| image=None, | |
| image_mask=None, | |
| prompt=[], | |
| sampler='flow_euler', | |
| sample_steps=20, | |
| seed=2023, | |
| guide_scale=3.5, | |
| guide_rescale=0.0, | |
| show_process=False, | |
| log_num=-1, | |
| **kwargs): | |
| outputs = self.forward_editing( | |
| src_image_list=src_image_list, | |
| src_mask_list=src_mask_list, | |
| modify_image_list=modify_image_list, | |
| edit_id=edit_id, | |
| image=image, | |
| image_mask=image_mask, | |
| prompt=prompt, | |
| sampler=sampler, | |
| sample_steps=sample_steps, | |
| seed=seed, | |
| guide_scale=guide_scale, | |
| guide_rescale=guide_rescale, | |
| show_process=show_process, | |
| log_num=log_num, | |
| **kwargs | |
| ) | |
| return outputs | |
| def forward_editing(self, | |
| src_image_list=[], | |
| modify_image_list=None, | |
| src_mask_list=[], | |
| edit_id=[], | |
| image=None, | |
| image_mask=None, | |
| prompt=[], | |
| sampler='flow_euler', | |
| sample_steps=20, | |
| seed=2023, | |
| guide_scale=3.5, | |
| log_num=-1, | |
| **kwargs | |
| ): | |
| # gc_seg is unused | |
| prompt, image, image_mask, src_image, modify_image, src_image_mask, edit_id = limit_batch_data( | |
| [prompt, image, image_mask, src_image_list, modify_image_list, src_mask_list, edit_id], log_num) | |
| assert check_list_of_list(src_image) and check_list_of_list(src_image_mask) | |
| assert self.cond_stage_model is not None | |
| align = kwargs.pop("align", []) | |
| prompt_ = [[pp] if isinstance(pp, str) else pp for pp in prompt] | |
| if len(align) < 1: align = [0] * len(prompt_) | |
| context = getattr(self.cond_stage_model, 'encode_list_of_list')(prompt_) | |
| guide_scale = guide_scale or self.guide_scale | |
| if guide_scale is not None: | |
| guide_scale = torch.full((len(prompt),), guide_scale, device=we.device_id) | |
| else: | |
| guide_scale = None | |
| # image and image_mask | |
| seed = seed if seed >= 0 else random.randint(0, 2 ** 32 - 1) | |
| if image is not None: | |
| if check_list_of_list(image): | |
| image = [torch.cat(ix, dim=-1) for ix in image] | |
| image_mask = [torch.cat(im, dim=-1) for im in image_mask] | |
| noise = [self.noise_sample(1, ix.shape[1], ix.shape[2], seed) for ix in image] | |
| else: | |
| height, width = kwargs.pop("height"), kwargs.pop("width") | |
| noise = [self.noise_sample(1, height, width, seed) for _ in prompt] | |
| noise, x_shapes = pack_imagelist_into_tensor(noise) | |
| context['x_shapes'] = x_shapes | |
| context['align'] = align | |
| # process image mask | |
| image_mask = to_device(image_mask, strict=False) | |
| x_mask = [self.reshape_func(i).squeeze(0) for i in image_mask] | |
| context['x_mask'] = x_mask | |
| ref_edit_context = self.parse_ref_and_edit(src_image, modify_image, src_image_mask, context, edit_id) | |
| context.update(ref_edit_context) | |
| # UNet use input n_prompt | |
| # model = self.model_ema if self.use_ema and self.eval_ema else self.model | |
| # import pdb;pdb.set_trace() | |
| model = self.model | |
| embedding_context = model.no_sync if isinstance(model, torch.distributed.fsdp.FullyShardedDataParallel) \ | |
| else nullcontext | |
| with embedding_context(): | |
| samples = self.diffusion.sample( | |
| noise=noise, | |
| sampler=sampler, | |
| model=self.model, | |
| model_kwargs={"cond": context, "guidance": guide_scale, "gc_seg": -1 | |
| }, | |
| steps=sample_steps, | |
| show_progress=True, | |
| guide_scale=guide_scale, | |
| return_intermediate=None, | |
| **kwargs).float() | |
| samples = unpack_tensor_into_imagelist(samples, x_shapes) | |
| with torch.autocast(device_type="cuda", dtype=torch.bfloat16): | |
| x_samples = self.decode_first_stage(samples) | |
| outputs = list() | |
| for i in range(len(prompt)): | |
| rec_img = torch.clamp((x_samples[i].float() + 1.0) / 2.0, min=0.0, max=1.0) | |
| rec_img = rec_img.squeeze(0) | |
| edit_imgs, modify_imgs, edit_img_masks = [], [], [] | |
| if src_image is not None and src_image[i] is not None: | |
| if src_image_mask[i] is None: | |
| src_image_mask[i] = [None] * len(src_image[i]) | |
| for edit_img, modify_img, edit_mask in zip(src_image[i], modify_image_list[i], src_image_mask[i]): | |
| edit_img = torch.clamp((edit_img.float() + 1.0) / 2.0, min=0.0, max=1.0) | |
| edit_imgs.append(edit_img.squeeze(0)) | |
| modify_img = torch.clamp((modify_img.float() + 1.0) / 2.0, | |
| min=0.0, | |
| max=1.0) | |
| modify_imgs.append(modify_img.squeeze(0)) | |
| if edit_mask is None: | |
| edit_mask = torch.ones_like(edit_img[[0], :, :]) | |
| edit_img_masks.append(edit_mask) | |
| one_tup = { | |
| 'reconstruct_image': rec_img, | |
| 'instruction': prompt[i], | |
| 'edit_image': edit_imgs if len(edit_imgs) > 0 else None, | |
| 'modify_image': modify_imgs if len(modify_imgs) > 0 else None, | |
| 'edit_mask': edit_img_masks if len(edit_imgs) > 0 else None | |
| } | |
| if image is not None: | |
| if image_mask is None: | |
| image_mask = [None] * len(image) | |
| ori_img = torch.clamp((image[i] + 1.0) / 2.0, min=0.0, max=1.0) | |
| one_tup['target_image'] = ori_img.squeeze(0) | |
| one_tup['target_mask'] = image_mask[i] if image_mask[i] is not None else torch.ones_like( | |
| ori_img[[0], :, :]) | |
| outputs.append(one_tup) | |
| return outputs | |
| def get_config_template(): | |
| return dict_to_yaml('MODEL', | |
| __class__.__name__, | |
| LatentDiffusionACEPlus.para_dict, | |
| set_name=True) | |