Spaces:
Runtime error
Runtime error
| from cProfile import label | |
| import os | |
| import json | |
| import numpy as np | |
| from tqdm import tqdm | |
| from argparse import ArgumentParser | |
| from PIL import Image | |
| import torch | |
| from torch.utils.data import Dataset, DataLoader | |
| from src.open_clip import create_model_and_transforms, get_tokenizer | |
| from src.training.train import calc_ImageReward, inversion_score | |
| from src.training.data import ImageRewardDataset, collate_rank, RankingDataset | |
| parser = ArgumentParser() | |
| parser.add_argument('--data-type', type=str, choices=['benchmark', 'test', 'ImageReward', 'drawbench']) | |
| parser.add_argument('--data-path', type=str, help='path to dataset') | |
| parser.add_argument('--image-path', type=str, help='path to image files') | |
| parser.add_argument('--checkpoint', type=str, help='path to checkpoint') | |
| parser.add_argument('--batch-size', type=int, default=20) | |
| args = parser.parse_args() | |
| batch_size = args.batch_size | |
| args.model = "ViT-H-14" | |
| args.precision = 'amp' | |
| print(args.model) | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| model, preprocess_train, preprocess_val = create_model_and_transforms( | |
| args.model, | |
| 'laion2B-s32B-b79K', | |
| precision=args.precision, | |
| device=device, | |
| jit=False, | |
| force_quick_gelu=False, | |
| force_custom_text=False, | |
| force_patch_dropout=False, | |
| force_image_size=None, | |
| pretrained_image=False, | |
| image_mean=None, | |
| image_std=None, | |
| light_augmentation=True, | |
| aug_cfg={}, | |
| output_dict=True, | |
| with_score_predictor=False, | |
| with_region_predictor=False | |
| ) | |
| checkpoint = torch.load(args.checkpoint) | |
| model.load_state_dict(checkpoint['state_dict']) | |
| tokenizer = get_tokenizer(args.model) | |
| model.eval() | |
| class BenchmarkDataset(Dataset): | |
| def __init__(self, meta_file, image_folder,transforms, tokenizer): | |
| self.transforms = transforms | |
| self.image_folder = image_folder | |
| self.tokenizer = tokenizer | |
| self.open_image = Image.open | |
| with open(meta_file, 'r') as f: | |
| self.annotations = json.load(f) | |
| def __len__(self): | |
| return len(self.annotations) | |
| def __getitem__(self, idx): | |
| try: | |
| img_path = os.path.join(self.image_folder, f'{idx:05d}.jpg') | |
| images = self.transforms(self.open_image(os.path.join(img_path))) | |
| caption = self.tokenizer(self.annotations[idx]) | |
| return images, caption | |
| except: | |
| print('file not exist') | |
| return self.__getitem__((idx + 1) % len(self)) | |
| def evaluate_IR(data_path, image_folder, model): | |
| meta_file = data_path + '/ImageReward_test.json' | |
| dataset = ImageRewardDataset(meta_file, image_folder, preprocess_val, tokenizer) | |
| dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4, collate_fn=collate_rank) | |
| score = 0 | |
| total = len(dataset) | |
| with torch.no_grad(): | |
| for batch in tqdm(dataloader): | |
| images, num_images, labels, texts = batch | |
| images = images.to(device=device, non_blocking=True) | |
| texts = texts.to(device=device, non_blocking=True) | |
| num_images = num_images.to(device=device, non_blocking=True) | |
| labels = labels.to(device=device, non_blocking=True) | |
| with torch.cuda.amp.autocast(): | |
| outputs = model(images, texts) | |
| image_features, text_features, logit_scale = outputs["image_features"], outputs["text_features"], outputs["logit_scale"] | |
| logits_per_image = logit_scale * image_features @ text_features.T | |
| paired_logits_list = [logit[:,i] for i, logit in enumerate(logits_per_image.split(num_images.tolist()))] | |
| predicted = [torch.argsort(-k) for k in paired_logits_list] | |
| hps_ranking = [[predicted[i].tolist().index(j) for j in range(n)] for i,n in enumerate(num_images)] | |
| labels = [label for label in labels.split(num_images.tolist())] | |
| score +=sum([calc_ImageReward(paired_logits_list[i].tolist(), labels[i]) for i in range(len(hps_ranking))]) | |
| print('ImageReward:', score/total) | |
| def evaluate_rank(data_path, image_folder, model): | |
| meta_file = data_path + '/test.json' | |
| dataset = RankingDataset(meta_file, image_folder, preprocess_val, tokenizer) | |
| dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4, collate_fn=collate_rank) | |
| score = 0 | |
| total = len(dataset) | |
| all_rankings = [] | |
| with torch.no_grad(): | |
| for batch in tqdm(dataloader): | |
| images, num_images, labels, texts = batch | |
| images = images.to(device=device, non_blocking=True) | |
| texts = texts.to(device=device, non_blocking=True) | |
| num_images = num_images.to(device=device, non_blocking=True) | |
| labels = labels.to(device=device, non_blocking=True) | |
| with torch.cuda.amp.autocast(): | |
| outputs = model(images, texts) | |
| image_features, text_features, logit_scale = outputs["image_features"], outputs["text_features"], outputs["logit_scale"] | |
| logits_per_image = logit_scale * image_features @ text_features.T | |
| paired_logits_list = [logit[:,i] for i, logit in enumerate(logits_per_image.split(num_images.tolist()))] | |
| predicted = [torch.argsort(-k) for k in paired_logits_list] | |
| hps_ranking = [[predicted[i].tolist().index(j) for j in range(n)] for i,n in enumerate(num_images)] | |
| labels = [label for label in labels.split(num_images.tolist())] | |
| all_rankings.extend(hps_ranking) | |
| score += sum([inversion_score(hps_ranking[i], labels[i]) for i in range(len(hps_ranking))]) | |
| print('ranking_acc:', score/total) | |
| with open('logs/hps_rank.json', 'w') as f: | |
| json.dump(all_rankings, f) | |
| def collate_eval(batch): | |
| images = torch.stack([sample[0] for sample in batch]) | |
| captions = torch.cat([sample[1] for sample in batch]) | |
| return images, captions | |
| def evaluate_benchmark(data_path, root_dir, model): | |
| meta_dir = data_path | |
| model_list = os.listdir(root_dir) | |
| style_list = os.listdir(os.path.join(root_dir, model_list[0])) | |
| score = {} | |
| for model_id in model_list: | |
| score[model_id]={} | |
| for style in style_list: | |
| # score[model_id][style] = [0] * 10 | |
| score[model_id][style] = [] | |
| image_folder = os.path.join(root_dir, model_id, style) | |
| meta_file = os.path.join(meta_dir, f'{style}.json') | |
| dataset = BenchmarkDataset(meta_file, image_folder, preprocess_val, tokenizer) | |
| dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_eval) | |
| with torch.no_grad(): | |
| for i, batch in enumerate(dataloader): | |
| images, texts = batch | |
| images = images.to(device=device, non_blocking=True) | |
| texts = texts.to(device=device, non_blocking=True) | |
| with torch.cuda.amp.autocast(): | |
| outputs = model(images, texts) | |
| image_features, text_features = outputs["image_features"], outputs["text_features"] | |
| logits_per_image = image_features @ text_features.T | |
| # score[model_id][style][i] = torch.sum(torch.diagonal(logits_per_image)).cpu().item() / 80 | |
| score[model_id][style].extend(torch.diagonal(logits_per_image).cpu().tolist()) | |
| print('-----------benchmark score ---------------- ') | |
| for model_id, data in score.items(): | |
| for style , res in data.items(): | |
| avg_score = [np.mean(res[i:i+80]) for i in range(0, 800, 80)] | |
| print(model_id, '\t', style, '\t', np.mean(avg_score), '\t', np.std(avg_score)) | |
| def evaluate_benchmark_DB(data_path, root_dir, model): | |
| meta_file = data_path + '/drawbench.json' | |
| model_list = os.listdir(root_dir) | |
| score = {} | |
| for model_id in model_list: | |
| image_folder = os.path.join(root_dir, model_id) | |
| dataset = BenchmarkDataset(meta_file, image_folder, preprocess_val, tokenizer) | |
| dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=4, collate_fn=collate_eval) | |
| score[model_id] = 0 | |
| with torch.no_grad(): | |
| for batch in tqdm(dataloader): | |
| images, texts = batch | |
| images = images.to(device=device, non_blocking=True) | |
| texts = texts.to(device=device, non_blocking=True) | |
| with torch.cuda.amp.autocast(): | |
| outputs = model(images, texts) | |
| image_features, text_features = outputs["image_features"], outputs["text_features"] | |
| logits_per_image = image_features @ text_features.T | |
| diag = torch.diagonal(logits_per_image) | |
| score[model_id] += torch.sum(diag).cpu().item() | |
| score[model_id] = score[model_id] / len(dataset) | |
| # with open('logs/benchmark_score_DB.json', 'w') as f: | |
| # json.dump(score, f) | |
| print('-----------drawbench score ---------------- ') | |
| for model, data in score.items(): | |
| print(model, '\t', '\t', np.mean(data)) | |
| if args.data_type == 'ImageReward': | |
| evaluate_IR(args.data_path, args.image_path, model) | |
| elif args.data_type == 'test': | |
| evaluate_rank(args.data_path, args.image_path, model) | |
| elif args.data_type == 'benchmark': | |
| evaluate_benchmark(args.data_path, args.image_path, model) | |
| elif args.data_type == 'drawbench': | |
| evaluate_benchmark_DB(args.data_path, args.image_path, model) | |
| else: | |
| raise NotImplementedError | |