Spaces:
Sleeping
Sleeping
| import torch | |
| from collections import OrderedDict | |
| import numpy as np | |
| from pytorch_grad_cam.utils.svd_on_activations import get_2d_projection | |
| class AblationLayer(torch.nn.Module): | |
| def __init__(self): | |
| super(AblationLayer, self).__init__() | |
| def objectiveness_mask_from_svd(self, activations, threshold=0.01): | |
| """ Experimental method to get a binary mask to compare if the activation is worth ablating. | |
| The idea is to apply the EigenCAM method by doing PCA on the activations. | |
| Then we create a binary mask by comparing to a low threshold. | |
| Areas that are masked out, are probably not interesting anyway. | |
| """ | |
| projection = get_2d_projection(activations[None, :])[0, :] | |
| projection = np.abs(projection) | |
| projection = projection - projection.min() | |
| projection = projection / projection.max() | |
| projection = projection > threshold | |
| return projection | |
| def activations_to_be_ablated( | |
| self, | |
| activations, | |
| ratio_channels_to_ablate=1.0): | |
| """ Experimental method to get a binary mask to compare if the activation is worth ablating. | |
| Create a binary CAM mask with objectiveness_mask_from_svd. | |
| Score each Activation channel, by seeing how much of its values are inside the mask. | |
| Then keep the top channels. | |
| """ | |
| if ratio_channels_to_ablate == 1.0: | |
| self.indices = np.int32(range(activations.shape[0])) | |
| return self.indices | |
| projection = self.objectiveness_mask_from_svd(activations) | |
| scores = [] | |
| for channel in activations: | |
| normalized = np.abs(channel) | |
| normalized = normalized - normalized.min() | |
| normalized = normalized / np.max(normalized) | |
| score = (projection * normalized).sum() / normalized.sum() | |
| scores.append(score) | |
| scores = np.float32(scores) | |
| indices = list(np.argsort(scores)) | |
| high_score_indices = indices[::- | |
| 1][: int(len(indices) * | |
| ratio_channels_to_ablate)] | |
| low_score_indices = indices[: int( | |
| len(indices) * ratio_channels_to_ablate)] | |
| self.indices = np.int32(high_score_indices + low_score_indices) | |
| return self.indices | |
| def set_next_batch( | |
| self, | |
| input_batch_index, | |
| activations, | |
| num_channels_to_ablate): | |
| """ This creates the next batch of activations from the layer. | |
| Just take corresponding batch member from activations, and repeat it num_channels_to_ablate times. | |
| """ | |
| self.activations = activations[input_batch_index, :, :, :].clone( | |
| ).unsqueeze(0).repeat(num_channels_to_ablate, 1, 1, 1) | |
| def __call__(self, x): | |
| output = self.activations | |
| for i in range(output.size(0)): | |
| # Commonly the minimum activation will be 0, | |
| # And then it makes sense to zero it out. | |
| # However depending on the architecture, | |
| # If the values can be negative, we use very negative values | |
| # to perform the ablation, deviating from the paper. | |
| if torch.min(output) == 0: | |
| output[i, self.indices[i], :] = 0 | |
| else: | |
| ABLATION_VALUE = 1e7 | |
| output[i, self.indices[i], :] = torch.min( | |
| output) - ABLATION_VALUE | |
| return output | |
| class AblationLayerVit(AblationLayer): | |
| def __init__(self): | |
| super(AblationLayerVit, self).__init__() | |
| def __call__(self, x): | |
| output = self.activations | |
| output = output.transpose(1, len(output.shape) - 1) | |
| for i in range(output.size(0)): | |
| # Commonly the minimum activation will be 0, | |
| # And then it makes sense to zero it out. | |
| # However depending on the architecture, | |
| # If the values can be negative, we use very negative values | |
| # to perform the ablation, deviating from the paper. | |
| if torch.min(output) == 0: | |
| output[i, self.indices[i], :] = 0 | |
| else: | |
| ABLATION_VALUE = 1e7 | |
| output[i, self.indices[i], :] = torch.min( | |
| output) - ABLATION_VALUE | |
| output = output.transpose(len(output.shape) - 1, 1) | |
| return output | |
| def set_next_batch( | |
| self, | |
| input_batch_index, | |
| activations, | |
| num_channels_to_ablate): | |
| """ This creates the next batch of activations from the layer. | |
| Just take corresponding batch member from activations, and repeat it num_channels_to_ablate times. | |
| """ | |
| repeat_params = [num_channels_to_ablate] + \ | |
| len(activations.shape[:-1]) * [1] | |
| self.activations = activations[input_batch_index, :, :].clone( | |
| ).unsqueeze(0).repeat(*repeat_params) | |
| class AblationLayerFasterRCNN(AblationLayer): | |
| def __init__(self): | |
| super(AblationLayerFasterRCNN, self).__init__() | |
| def set_next_batch( | |
| self, | |
| input_batch_index, | |
| activations, | |
| num_channels_to_ablate): | |
| """ Extract the next batch member from activations, | |
| and repeat it num_channels_to_ablate times. | |
| """ | |
| self.activations = OrderedDict() | |
| for key, value in activations.items(): | |
| fpn_activation = value[input_batch_index, | |
| :, :, :].clone().unsqueeze(0) | |
| self.activations[key] = fpn_activation.repeat( | |
| num_channels_to_ablate, 1, 1, 1) | |
| def __call__(self, x): | |
| result = self.activations | |
| layers = {0: '0', 1: '1', 2: '2', 3: '3', 4: 'pool'} | |
| num_channels_to_ablate = result['pool'].size(0) | |
| for i in range(num_channels_to_ablate): | |
| pyramid_layer = int(self.indices[i] / 256) | |
| index_in_pyramid_layer = int(self.indices[i] % 256) | |
| result[layers[pyramid_layer]][i, | |
| index_in_pyramid_layer, :, :] = -1000 | |
| return result | |