Spaces:
Sleeping
Sleeping
| import os | |
| import clip | |
| import pytorch_lightning as pl | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from rewards.base_reward import BaseRewardLoss | |
| class AestheticLoss(BaseRewardLoss): | |
| """CLIP reward loss function for optimization.""" | |
| def __init__( | |
| self, | |
| weigthing: float, | |
| dtype: torch.dtype, | |
| device: torch.device, | |
| cache_dir: str, | |
| memsave: bool = False, | |
| ): | |
| self.clip_model, self.preprocess_fn = clip.load( | |
| "ViT-L/14", device=device, download_root=cache_dir | |
| ) | |
| self.clip_model = self.clip_model.to(device, dtype=dtype) | |
| self.mlp = MLP(768).to(device, dtype=dtype) | |
| s = torch.load( | |
| f"{os.getcwd()}/ckpts/aesthetic-model.pth" | |
| ) # load the model you trained previously or the model available in this repo | |
| self.mlp.load_state_dict(s) | |
| self.clip_model.eval() | |
| if memsave: | |
| import memsave_torch.nn | |
| self.mlp = memsave_torch.nn.convert_to_memory_saving(self.mlp) | |
| self.clip_model = memsave_torch.nn.convert_to_memory_saving( | |
| self.clip_model | |
| ).to(device, dtype=dtype) | |
| self.freeze_parameters(self.clip_model.parameters()) | |
| self.freeze_parameters(self.mlp.parameters()) | |
| super().__init__("Aesthetic", weigthing) | |
| def get_image_features(self, image: torch.Tensor) -> torch.Tensor: | |
| with torch.autocast("cuda"): | |
| clip_img_features = self.clip_model.encode_image(image) | |
| l2 = torch.norm(clip_img_features, p=2, dim=-1, keepdim=True) | |
| l2 = torch.where( | |
| l2 == 0, | |
| torch.tensor( | |
| 1.0, device=clip_img_features.device, dtype=clip_img_features.dtype | |
| ), | |
| l2, | |
| ) | |
| clip_img_features = clip_img_features / l2 | |
| return clip_img_features | |
| def get_text_features(self, prompt: str) -> torch.Tensor: | |
| return None | |
| def compute_loss( | |
| self, image_features: torch.Tensor, text_features: torch.Tensor | |
| ) -> torch.Tensor: | |
| return None | |
| def __call__(self, image: torch.Tensor, prompt: torch.Tensor) -> torch.Tensor: | |
| if self.memsave: | |
| image = image.to(torch.float32) | |
| image_features = self.get_image_features(image) | |
| image_features_normed = self.process_features(image_features.to(torch.float16)) | |
| aesthetic_loss = 10.0 - self.mlp(image_features_normed).mean() | |
| return aesthetic_loss | |
| class MLP(pl.LightningModule): | |
| def __init__(self, input_size, xcol="emb", ycol="avg_rating"): | |
| super().__init__() | |
| self.input_size = input_size | |
| self.xcol = xcol | |
| self.ycol = ycol | |
| self.layers = nn.Sequential( | |
| nn.Linear(self.input_size, 1024), | |
| # nn.ReLU(), | |
| nn.Dropout(0.2), | |
| nn.Linear(1024, 128), | |
| # nn.ReLU(), | |
| nn.Dropout(0.2), | |
| nn.Linear(128, 64), | |
| # nn.ReLU(), | |
| nn.Dropout(0.1), | |
| nn.Linear(64, 16), | |
| # nn.ReLU(), | |
| nn.Linear(16, 1), | |
| ) | |
| def forward(self, x): | |
| return self.layers(x) | |
| def training_step(self, batch, batch_idx): | |
| x = batch[self.xcol] | |
| y = batch[self.ycol].reshape(-1, 1) | |
| x_hat = self.layers(x) | |
| loss = F.mse_loss(x_hat, y) | |
| return loss | |
| def validation_step(self, batch, batch_idx): | |
| x = batch[self.xcol] | |
| y = batch[self.ycol].reshape(-1, 1) | |
| x_hat = self.layers(x) | |
| loss = F.mse_loss(x_hat, y) | |
| return loss | |
| def configure_optimizers(self): | |
| optimizer = torch.optim.Adam(self.parameters(), lr=1e-3) | |
| return optimizer | |