Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright (c) Alibaba, Inc. and its affiliates. | |
| import math | |
| import re, io | |
| import numpy as np | |
| import random, torch | |
| from PIL import Image | |
| import torchvision.transforms as T | |
| from collections import defaultdict | |
| from scepter.modules.data.dataset.registry import DATASETS | |
| from scepter.modules.data.dataset.base_dataset import BaseDataset | |
| from scepter.modules.transform.io import pillow_convert | |
| from scepter.modules.utils.directory import osp_path | |
| from scepter.modules.utils.file_system import FS | |
| from torchvision.transforms import InterpolationMode | |
| def load_image(prefix, img_path, cvt_type=None): | |
| if img_path is None or img_path == '': | |
| return None | |
| img_path = osp_path(prefix, img_path) | |
| with FS.get_object(img_path) as image_bytes: | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| if cvt_type is not None: | |
| image = pillow_convert(image, cvt_type) | |
| return image | |
| def transform_image(image, std = 0.5, mean = 0.5): | |
| return (image.permute(2, 0, 1)/255. - mean)/std | |
| def transform_mask(mask): | |
| return mask.unsqueeze(0)/255. | |
| def ensure_src_align_target_h_mode(src_image, size, image_id, interpolation=InterpolationMode.BILINEAR): | |
| # padding mode | |
| H, W = size | |
| ret_image = [] | |
| for one_id in image_id: | |
| edit_image = src_image[one_id] | |
| _, eH, eW = edit_image.shape | |
| scale = H/eH | |
| tH, tW = H, int(eW * scale) | |
| ret_image.append(T.Resize((tH, tW), interpolation=interpolation, antialias=True)(edit_image)) | |
| return ret_image | |
| def ensure_src_align_target_padding_mode(src_image, size, image_id, size_h = [], interpolation=InterpolationMode.BILINEAR): | |
| # padding mode | |
| H, W = size | |
| ret_data = [] | |
| ret_h = [] | |
| for idx, one_id in enumerate(image_id): | |
| if len(size_h) < 1: | |
| rH = random.randint(int(H / 3), int(H)) | |
| else: | |
| rH = size_h[idx] | |
| ret_h.append(rH) | |
| edit_image = src_image[one_id] | |
| _, eH, eW = edit_image.shape | |
| scale = rH/eH | |
| tH, tW = rH, int(eW * scale) | |
| edit_image = T.Resize((tH, tW), interpolation=interpolation, antialias=True)(edit_image) | |
| # padding | |
| delta_w = 0 | |
| delta_h = H - tH | |
| padding = (delta_w // 2, delta_h // 2, delta_w - (delta_w // 2), delta_h - (delta_h // 2)) | |
| ret_data.append(T.Pad(padding, fill=0, padding_mode="constant")(edit_image).float()) | |
| return ret_data, ret_h | |
| def ensure_limit_sequence(image, max_seq_len = 4096, d = 16, interpolation=InterpolationMode.BILINEAR): | |
| # resize image for max_seq_len, while keep the aspect ratio | |
| H, W = image.shape[-2:] | |
| scale = min(1.0, math.sqrt(max_seq_len / ((H / d) * (W / d)))) | |
| rH = int(H * scale) // d * d # ensure divisible by self.d | |
| rW = int(W * scale) // d * d | |
| # print(f"{H} {W} -> {rH} {rW}") | |
| image = T.Resize((rH, rW), interpolation=interpolation, antialias=True)(image) | |
| return image | |
| class ACEPlusDataset(BaseDataset): | |
| para_dict = { | |
| "DELIMITER": { | |
| "value": "#;#", | |
| "description": "The delimiter for records of data list." | |
| }, | |
| "FIELDS": { | |
| "value": ["data_type", "edit_image", "edit_mask", "ref_image", "target_image", "prompt"], | |
| "description": "The fields for every record." | |
| }, | |
| "PATH_PREFIX": { | |
| "value": "", | |
| "description": "The path prefix for every input image." | |
| }, | |
| "EDIT_TYPE_LIST": { | |
| "value": [], | |
| "description": "The edit type list to be trained for data list." | |
| }, | |
| "MAX_SEQ_LEN": { | |
| "value": 4096, | |
| "description": "The max sequence length for input image." | |
| }, | |
| "D": { | |
| "value": 16, | |
| "description": "Patch size for resized image." | |
| } | |
| } | |
| para_dict.update(BaseDataset.para_dict) | |
| def __init__(self, cfg, logger=None): | |
| super().__init__(cfg, logger=logger) | |
| delimiter = cfg.get("DELIMITER", "#;#") | |
| fields = cfg.get("FIELDS", []) | |
| prefix = cfg.get("PATH_PREFIX", "") | |
| edit_type_list = cfg.get("EDIT_TYPE_LIST", []) | |
| self.modify_mode = cfg.get("MODIFY_MODE", True) | |
| self.max_seq_len = cfg.get("MAX_SEQ_LEN", 4096) | |
| self.repaiting_scale = cfg.get("REPAINTING_SCALE", 0.5) | |
| self.d = cfg.get("D", 16) | |
| prompt_file = cfg.DATA_LIST | |
| self.items = self.read_data_list(delimiter, | |
| fields, | |
| prefix, | |
| edit_type_list, | |
| prompt_file) | |
| random.shuffle(self.items) | |
| use_num = int(cfg.get('USE_NUM', -1)) | |
| if use_num > 0: | |
| self.items = self.items[:use_num] | |
| def read_data_list(self, delimiter, | |
| fields, | |
| prefix, | |
| edit_type_list, | |
| prompt_file): | |
| with FS.get_object(prompt_file) as local_data: | |
| rows = local_data.decode('utf-8').strip().split('\n') | |
| items = list() | |
| dtype_level_num = {} | |
| for i, row in enumerate(rows): | |
| item = {"prefix": prefix} | |
| for key, val in zip(fields, row.split(delimiter)): | |
| item[key] = val | |
| edit_type = item["data_type"] | |
| if len(edit_type_list) > 0: | |
| for re_pattern in edit_type_list: | |
| if re.match(re_pattern, edit_type): | |
| items.append(item) | |
| if edit_type not in dtype_level_num: | |
| dtype_level_num[edit_type] = 0 | |
| dtype_level_num[edit_type] += 1 | |
| break | |
| else: | |
| items.append(item) | |
| if edit_type not in dtype_level_num: | |
| dtype_level_num[edit_type] = 0 | |
| dtype_level_num[edit_type] += 1 | |
| for edit_type in dtype_level_num: | |
| self.logger.info(f"{edit_type} has {dtype_level_num[edit_type]} samples.") | |
| return items | |
| def __len__(self): | |
| return len(self.items) | |
| def __getitem__(self, index): | |
| item = self._get(index) | |
| return self.pipeline(item) | |
| def _get(self, index): | |
| # normalize | |
| sample_id = index%len(self) | |
| index = self.items[index%len(self)] | |
| prefix = index.get("prefix", "") | |
| edit_image = index.get("edit_image", "") | |
| edit_mask = index.get("edit_mask", "") | |
| ref_image = index.get("ref_image", "") | |
| target_image = index.get("target_image", "") | |
| prompt = index.get("prompt", "") | |
| edit_image = load_image(prefix, edit_image, cvt_type="RGB") if edit_image != "" else None | |
| edit_mask = load_image(prefix, edit_mask, cvt_type="L") if edit_mask != "" else None | |
| ref_image = load_image(prefix, ref_image, cvt_type="RGB") if ref_image != "" else None | |
| target_image = load_image(prefix, target_image, cvt_type="RGB") if target_image != "" else None | |
| assert target_image is not None | |
| edit_id, ref_id, src_image_list, src_mask_list = [], [], [], [] | |
| # parse editing image | |
| if edit_image is None: | |
| edit_image = Image.new("RGB", target_image.size, (255, 255, 255)) | |
| edit_mask = Image.new("L", edit_image.size, 255) | |
| elif edit_mask is None: | |
| edit_mask = Image.new("L", edit_image.size, 255) | |
| src_image_list.append(edit_image) | |
| edit_id.append(0) | |
| src_mask_list.append(edit_mask) | |
| # parse reference image | |
| if ref_image is not None: | |
| src_image_list.append(ref_image) | |
| ref_id.append(1) | |
| src_mask_list.append(Image.new("L", ref_image.size, 0)) | |
| image = transform_image(torch.tensor(np.array(target_image).astype(np.float32))) | |
| if edit_mask is not None: | |
| image_mask = transform_mask(torch.tensor(np.array(edit_mask).astype(np.float32))) | |
| else: | |
| image_mask = Image.new("L", target_image.size, 255) | |
| image_mask = transform_mask(torch.tensor(np.array(image_mask).astype(np.float32))) | |
| src_image_list = [transform_image(torch.tensor(np.array(im).astype(np.float32))) for im in src_image_list] | |
| src_mask_list = [transform_mask(torch.tensor(np.array(im).astype(np.float32))) for im in src_mask_list] | |
| # decide the repainting scale for the editing task | |
| if len(ref_id) > 0: | |
| repainting_scale = 1.0 | |
| else: | |
| repainting_scale = self.repaiting_scale | |
| for e_i in edit_id: | |
| src_image_list[e_i] = src_image_list[e_i] * (1 - repainting_scale * src_mask_list[e_i]) | |
| size = image.shape[1:] | |
| ref_image_list, ret_h = ensure_src_align_target_padding_mode(src_image_list, size, | |
| image_id=ref_id, | |
| interpolation=InterpolationMode.NEAREST_EXACT) | |
| ref_mask_list, ret_h = ensure_src_align_target_padding_mode(src_mask_list, size, | |
| size_h=ret_h, | |
| image_id=ref_id, | |
| interpolation=InterpolationMode.NEAREST_EXACT) | |
| edit_image_list = ensure_src_align_target_h_mode(src_image_list, size, | |
| image_id=edit_id, | |
| interpolation=InterpolationMode.NEAREST_EXACT) | |
| edit_mask_list = ensure_src_align_target_h_mode(src_mask_list, size, | |
| image_id=edit_id, | |
| interpolation=InterpolationMode.NEAREST_EXACT) | |
| src_image_list = [torch.cat(ref_image_list + edit_image_list, dim=-1)] | |
| src_mask_list = [torch.cat(ref_mask_list + edit_mask_list, dim=-1)] | |
| image = torch.cat(ref_image_list + [image], dim=-1) | |
| image_mask = torch.cat(ref_mask_list + [image_mask], dim=-1) | |
| # limit max sequence length | |
| image = ensure_limit_sequence(image, max_seq_len = self.max_seq_len, | |
| d = self.d, interpolation=InterpolationMode.BILINEAR) | |
| image_mask = ensure_limit_sequence(image_mask, max_seq_len = self.max_seq_len, | |
| d = self.d, interpolation=InterpolationMode.NEAREST_EXACT) | |
| src_image_list = [ensure_limit_sequence(i, max_seq_len = self.max_seq_len, | |
| d = self.d, interpolation=InterpolationMode.BILINEAR) for i in src_image_list] | |
| src_mask_list = [ensure_limit_sequence(i, max_seq_len = self.max_seq_len, | |
| d = self.d, interpolation=InterpolationMode.NEAREST_EXACT) for i in src_mask_list] | |
| if self.modify_mode: | |
| # To be modified regions according to mask | |
| modify_image_list = [ii * im for ii, im in zip(src_image_list, src_mask_list)] | |
| # To be edited regions according to mask | |
| src_image_list = [ii * (1 - im) for ii, im in zip(src_image_list, src_mask_list)] | |
| else: | |
| src_image_list = src_image_list | |
| modify_image_list = src_image_list | |
| item = { | |
| "src_image_list": src_image_list, | |
| "src_mask_list": src_mask_list, | |
| "modify_image_list": modify_image_list, | |
| "image": image, | |
| "image_mask": image_mask, | |
| "edit_id": edit_id, | |
| "ref_id": ref_id, | |
| "prompt": prompt, | |
| "edit_key": index["edit_key"] if "edit_key" in index else "", | |
| "sample_id": sample_id | |
| } | |
| return item | |
| def collate_fn(batch): | |
| collect = defaultdict(list) | |
| for sample in batch: | |
| for k, v in sample.items(): | |
| collect[k].append(v) | |
| new_batch = dict() | |
| for k, v in collect.items(): | |
| if all([i is None for i in v]): | |
| new_batch[k] = None | |
| else: | |
| new_batch[k] = v | |
| return new_batch | |