Spaces:
Running
Running
| from __future__ import print_function, division | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.optim import lr_scheduler | |
| import torch.backends.cudnn as cudnn | |
| import numpy as np | |
| import torchvision | |
| from torchvision import datasets, models, transforms | |
| from torch.utils.data import TensorDataset, DataLoader | |
| from PIL import Image | |
| import matplotlib.pyplot as plt | |
| from dataloader import imgDataset | |
| import time | |
| import os | |
| import copy | |
| from transformers import BlipProcessor, BlipForConditionalGeneration | |
| from transformers import AutoImageProcessor, ResNetModel | |
| from translate import Translator | |
| PATH = './images/' | |
| class CUPredictor_v2(nn.Module): | |
| def __init__(self, num_class=2): | |
| super(CUPredictor_v2, self).__init__() | |
| self.base = ResNetModel.from_pretrained("microsoft/resnet-50") | |
| num_ftrs = 2048 | |
| #self.base.fc = nn.Linear(num_ftrs, num_ftrs//2) | |
| self.classifier = nn.Linear(num_ftrs, num_class) | |
| self.height_regressor = nn.Linear(num_ftrs, 1) | |
| self.relu = nn.ReLU() | |
| def forward(self, input_img): | |
| output = self.base(input_img['pixel_values'].squeeze(1)).pooler_output.squeeze() | |
| predict_cls = self.classifier(output) | |
| predict_height = self.relu(self.height_regressor(output)) | |
| return predict_cls, predict_height | |
| class CUPredictor(nn.Module): | |
| def __init__(self, num_class=2): | |
| super(CUPredictor, self).__init__() | |
| self.base = torchvision.models.resnet50(pretrained=True) | |
| for param in self.base.parameters(): | |
| param.requires_grad = False | |
| num_ftrs = self.base.fc.in_features | |
| self.base.fc = nn.Sequential( | |
| nn.Linear(num_ftrs, num_ftrs//4), | |
| nn.ReLU(), | |
| nn.Linear(num_ftrs//4, num_ftrs//8), | |
| nn.ReLU() | |
| ) | |
| self.classifier = nn.Linear(num_ftrs//8, num_class) | |
| self.regressor_h = nn.Linear(num_ftrs//8, 1) | |
| self.regressor_b = nn.Linear(num_ftrs//8, 1) | |
| self.regressor_w = nn.Linear(num_ftrs//8, 1) | |
| self.regressor_hi = nn.Linear(num_ftrs//8, 1) | |
| self.relu = nn.ReLU() | |
| def forward(self, input_img): | |
| output = self.base(input_img) | |
| predict_cls = self.classifier(output) | |
| predict_h = self.relu(self.regressor_h(output)) | |
| predict_b = self.relu(self.regressor_b(output)) | |
| predict_w = self.relu(self.regressor_w(output)) | |
| predict_hi = self.relu(self.regressor_hi(output)) | |
| return predict_cls, predict_h, predict_b, predict_w, predict_hi | |
| def imshow(inp, title=None): | |
| """Imshow for Tensor.""" | |
| inp = inp.numpy().transpose((1, 2, 0)) | |
| mean = np.array([0.485, 0.456, 0.406]) | |
| std = np.array([0.229, 0.224, 0.225]) | |
| inp = std * inp + mean | |
| inp = np.clip(inp, 0, 1) | |
| plt.imshow(inp) | |
| if title is not None: | |
| plt.title(title) | |
| plt.pause(0.001) # pause a bit so that plots are updated | |
| plt.savefig(f'images/preds/prediction.png') | |
| def train_model(model, device, dataloaders, dataset_sizes, num_epochs=25): | |
| since = time.time() | |
| ce = nn.CrossEntropyLoss() | |
| mse = nn.MSELoss() | |
| optimizer = optim.AdamW(model.parameters(), lr=0.0008) | |
| best_model_wts = copy.deepcopy(model.state_dict()) | |
| best_acc = 0.0 | |
| for epoch in range(num_epochs): | |
| print(f'Epoch {epoch+1}/{num_epochs}') | |
| print('-' * 10) | |
| # Each epoch has a training and validation phase | |
| for phase in ['train', 'val']: | |
| if phase == 'train': | |
| model.train() # Set model to training mode | |
| else: | |
| model.eval() # Set model to evaluate mode | |
| running_ce_loss = 0.0 | |
| running_rmse_loss = 0.0 | |
| running_corrects = 0 | |
| # Iterate over data. | |
| for inputs, labels, heights, bust, waist, hips in dataloaders[phase]: | |
| inputs = inputs.to(device) | |
| labels = labels.to(device) | |
| heights = heights.to(device) | |
| bust = bust.to(device) | |
| waist, hips = waist.to(device), hips.to(device) | |
| # zero the parameter gradients | |
| optimizer.zero_grad() | |
| # forward | |
| # track history if only in train | |
| with torch.set_grad_enabled(phase == 'train'): | |
| outputs_c, outputs_h, outputs_b, outputs_w, outputs_hi = model(inputs) | |
| _, preds = torch.max(outputs_c, 1) | |
| ce_loss = ce(outputs_c, labels) | |
| rmse_loss_h = torch.sqrt(mse(outputs_h, heights.unsqueeze(-1))) | |
| rmse_loss_b = torch.sqrt(mse(outputs_b, bust.unsqueeze(-1))) | |
| rmse_loss_w = torch.sqrt(mse(outputs_w, waist.unsqueeze(-1))) | |
| rmse_loss_hi = torch.sqrt(mse(outputs_hi, hips.unsqueeze(-1))) | |
| rmse_loss = rmse_loss_h*4 + rmse_loss_b*2 + rmse_loss_w + rmse_loss_hi | |
| loss = ce_loss + (rmse_loss)*1 | |
| # backward + optimize only if in training phase | |
| if phase == 'train': | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) | |
| optimizer.step() | |
| # statistics | |
| running_ce_loss += ce_loss.item() * inputs.size(0) | |
| running_rmse_loss += rmse_loss.item() * inputs.size(0) | |
| running_corrects += torch.sum(preds == labels.data) | |
| epoch_ce_loss = running_ce_loss / dataset_sizes[phase] | |
| epoch_rmse_loss = running_rmse_loss / dataset_sizes[phase] | |
| epoch_acc = running_corrects.double() / dataset_sizes[phase] | |
| print(f'{phase} CE_Loss: {epoch_ce_loss:.4f} RMSE_Loss: {epoch_rmse_loss:.4f} Acc: {epoch_acc:.4f}') | |
| # deep copy the model | |
| if phase == 'val' and epoch_acc > best_acc: | |
| best_acc = epoch_acc | |
| best_model_wts = copy.deepcopy(model.state_dict()) | |
| #if epoch %2 == 0 and phase == 'val':print(outputs_c, outputs_h) | |
| print() | |
| time_elapsed = time.time() - since | |
| print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s') | |
| print(f'Best val Acc: {best_acc:4f}') | |
| # load best model weights | |
| model.load_state_dict(best_model_wts) | |
| return model | |
| def visualize_model(model, device, dataloaders, class_names, num_images=6): | |
| was_training = model.training | |
| model.eval() | |
| images_so_far = 0 | |
| fig = plt.figure() | |
| with torch.no_grad(): | |
| for i, (inputs, labels) in enumerate(dataloaders['val']): | |
| inputs = inputs.to(device) | |
| labels = labels.to(device) | |
| outputs = model(inputs) | |
| _, preds = torch.max(outputs, 1) | |
| for j in range(inputs.size()[0]): | |
| images_so_far += 1 | |
| ax = plt.subplot(num_images//2, 2, images_so_far) | |
| ax.axis('off') | |
| ax.set_title(f'pred: {class_names[preds[j]]}|tar: {class_names[labels[j]]}') | |
| imshow(inputs.cpu().data[j]) | |
| if images_so_far == num_images: | |
| model.train(mode=was_training) | |
| return | |
| model.train(mode=was_training) | |
| def evaluation(model, epoch, device, dataloaders): | |
| model.load_state_dict(torch.load(f'models/model_{epoch}.pt')) | |
| model.eval() | |
| with torch.no_grad(): | |
| for i, (inputs, labels) in enumerate(dataloaders['val']): | |
| inputs = inputs.to(device) | |
| labels = labels.to(device) | |
| outputs = model(inputs) | |
| _, preds = torch.max(outputs, 1) | |
| print(preds) | |
| def inference(inp_img, classes = ['big', 'small'], epoch = 6): | |
| device = torch.device("cpu") | |
| translator= Translator(to_lang="zh-TW") | |
| model = CUPredictor() | |
| model.load_state_dict(torch.load(f'models/model_{epoch}.pt', map_location=torch.device('cpu'))) | |
| # load image-to-text model | |
| processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") | |
| model_blip = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base") | |
| model.eval() | |
| trans = transforms.Compose([ | |
| transforms.Resize(256), | |
| transforms.CenterCrop(224), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) | |
| ]) | |
| image_tensor = trans(inp_img) | |
| image_tensor = image_tensor.unsqueeze(0) | |
| with torch.no_grad(): | |
| inputs = image_tensor.to(device) | |
| outputs_c, outputs_h, outputs_b, outputs_w, outputs_hi = model(inputs) | |
| _, preds = torch.max(outputs_c, 1) | |
| idx = preds.numpy()[0] | |
| # unconditional image captioning | |
| inputs = processor(inp_img, return_tensors="pt") | |
| out = model_blip.generate(**inputs) | |
| description = processor.decode(out[0], skip_special_tokens=True) | |
| description_tw = translator.translate(description) | |
| return outputs_c, classes[idx], f"{outputs_h.numpy()[0][0]:.2f}", f"{outputs_b.numpy()[0][0]:.2f}", f"{outputs_w.numpy()[0][0]:.2f}", f"{outputs_hi.numpy()[0][0]:.2f}", [description, description_tw] | |
| def main(epoch = 15, mode = 'val'): | |
| cudnn.benchmark = True | |
| plt.ion() # interactive mode | |
| model = CUPredictor() | |
| train_dataset = imgDataset('labels.txt', mode='train', use_processor=False) | |
| test_dataset = imgDataset('labels.txt', mode='val', use_processor=False) | |
| dataloaders = { | |
| "train": DataLoader(train_dataset, batch_size=64, shuffle=True), | |
| "val": DataLoader(test_dataset, batch_size=64, shuffle=False) | |
| } | |
| dataset_sizes = { | |
| "train": len(train_dataset), | |
| "val": len(test_dataset) | |
| } | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| #device = torch.device("cpu") | |
| model = model.to(device) | |
| model_conv = train_model(model, device, dataloaders, dataset_sizes, num_epochs=epoch) | |
| torch.save(model_conv.state_dict(), f'models/model_{epoch}.pt') | |
| def divide_class_dir(path): | |
| file_list = os.listdir(path) | |
| for img_name in file_list: | |
| dest_path = os.path.join(path, img_name.split('-')[3]) | |
| if not os.path.exists(dest_path): | |
| os.mkdir(dest_path) # 建立資料夾 | |
| os.replace(os.path.join(path, img_name), os.path.join(dest_path, img_name)) | |
| def get_label(types): | |
| with open('labels.txt', 'w', encoding='utf-8') as f: | |
| for f_type in types: | |
| for img_type in CLASS: | |
| path = os.path.join('images', f_type, img_type) | |
| file_list = os.listdir(path) | |
| for file_name in file_list: | |
| file_name_list = file_name.split('-') | |
| f.write(" ".join([f_type, file_name, img_type, file_name_list[4].split('_')[0], '\n'])) | |
| if __name__ == "__main__": | |
| CLASS = ['big', 'small'] | |
| mode = 'train' | |
| get_label(['train', 'val']) | |
| epoch = 7 | |
| #main(epoch, mode = mode) | |
| outputs, preds, heights, bust, waist, hips, description = inference('images/test/lin.png', CLASS, epoch=epoch) | |
| print(outputs, preds, heights, bust, waist, hips) | |
| #print(CUPredictor()) | |
| #divide_class_dir('./images/train_all') | |
| #divide_class_dir('./images/val_all') | |
| '''''' |