Spaces:
Paused
Paused
| import numpy as np | |
| # from skimage import measure | |
| from skimage.metrics import structural_similarity as compare_ssim | |
| from scipy import linalg | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from propainter.core.utils import to_tensors | |
| def calculate_epe(flow1, flow2): | |
| """Calculate End point errors.""" | |
| epe = torch.sum((flow1 - flow2)**2, dim=1).sqrt() | |
| epe = epe.view(-1) | |
| return epe.mean().item() | |
| def calculate_psnr(img1, img2): | |
| """Calculate PSNR (Peak Signal-to-Noise Ratio). | |
| Ref: https://en.wikipedia.org/wiki/Peak_signal-to-noise_ratio | |
| Args: | |
| img1 (ndarray): Images with range [0, 255]. | |
| img2 (ndarray): Images with range [0, 255]. | |
| Returns: | |
| float: psnr result. | |
| """ | |
| assert img1.shape == img2.shape, \ | |
| (f'Image shapes are differnet: {img1.shape}, {img2.shape}.') | |
| mse = np.mean((img1 - img2)**2) | |
| if mse == 0: | |
| return float('inf') | |
| return 20. * np.log10(255. / np.sqrt(mse)) | |
| def calc_psnr_and_ssim(img1, img2): | |
| """Calculate PSNR and SSIM for images. | |
| img1: ndarray, range [0, 255] | |
| img2: ndarray, range [0, 255] | |
| """ | |
| img1 = img1.astype(np.float64) | |
| img2 = img2.astype(np.float64) | |
| psnr = calculate_psnr(img1, img2) | |
| ssim = compare_ssim(img1, | |
| img2, | |
| data_range=255, | |
| multichannel=True, | |
| win_size=65, | |
| channel_axis=2) | |
| return psnr, ssim | |
| ########################### | |
| # I3D models | |
| ########################### | |
| def init_i3d_model(i3d_model_path): | |
| print(f"[Loading I3D model from {i3d_model_path} for FID score ..]") | |
| i3d_model = InceptionI3d(400, in_channels=3, final_endpoint='Logits') | |
| i3d_model.load_state_dict(torch.load(i3d_model_path)) | |
| i3d_model.to(torch.device('cuda:0')) | |
| return i3d_model | |
| def calculate_i3d_activations(video1, video2, i3d_model, device): | |
| """Calculate VFID metric. | |
| video1: list[PIL.Image] | |
| video2: list[PIL.Image] | |
| """ | |
| video1 = to_tensors()(video1).unsqueeze(0).to(device) | |
| video2 = to_tensors()(video2).unsqueeze(0).to(device) | |
| video1_activations = get_i3d_activations( | |
| video1, i3d_model).cpu().numpy().flatten() | |
| video2_activations = get_i3d_activations( | |
| video2, i3d_model).cpu().numpy().flatten() | |
| return video1_activations, video2_activations | |
| def calculate_vfid(real_activations, fake_activations): | |
| """ | |
| Given two distribution of features, compute the FID score between them | |
| Params: | |
| real_activations: list[ndarray] | |
| fake_activations: list[ndarray] | |
| """ | |
| m1 = np.mean(real_activations, axis=0) | |
| m2 = np.mean(fake_activations, axis=0) | |
| s1 = np.cov(real_activations, rowvar=False) | |
| s2 = np.cov(fake_activations, rowvar=False) | |
| return calculate_frechet_distance(m1, s1, m2, s2) | |
| def calculate_frechet_distance(mu1, sigma1, mu2, sigma2, eps=1e-6): | |
| """Numpy implementation of the Frechet Distance. | |
| The Frechet distance between two multivariate Gaussians X_1 ~ N(mu_1, C_1) | |
| and X_2 ~ N(mu_2, C_2) is | |
| d^2 = ||mu_1 - mu_2||^2 + Tr(C_1 + C_2 - 2*sqrt(C_1*C_2)). | |
| Stable version by Dougal J. Sutherland. | |
| Params: | |
| -- mu1 : Numpy array containing the activations of a layer of the | |
| inception net (like returned by the function 'get_predictions') | |
| for generated samples. | |
| -- mu2 : The sample mean over activations, precalculated on an | |
| representive data set. | |
| -- sigma1: The covariance matrix over activations for generated samples. | |
| -- sigma2: The covariance matrix over activations, precalculated on an | |
| representive data set. | |
| Returns: | |
| -- : The Frechet Distance. | |
| """ | |
| mu1 = np.atleast_1d(mu1) | |
| mu2 = np.atleast_1d(mu2) | |
| sigma1 = np.atleast_2d(sigma1) | |
| sigma2 = np.atleast_2d(sigma2) | |
| assert mu1.shape == mu2.shape, \ | |
| 'Training and test mean vectors have different lengths' | |
| assert sigma1.shape == sigma2.shape, \ | |
| 'Training and test covariances have different dimensions' | |
| diff = mu1 - mu2 | |
| # Product might be almost singular | |
| covmean, _ = linalg.sqrtm(sigma1.dot(sigma2), disp=False) | |
| if not np.isfinite(covmean).all(): | |
| msg = ('fid calculation produces singular product; ' | |
| 'adding %s to diagonal of cov estimates') % eps | |
| print(msg) | |
| offset = np.eye(sigma1.shape[0]) * eps | |
| covmean = linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset)) | |
| # Numerical error might give slight imaginary component | |
| if np.iscomplexobj(covmean): | |
| if not np.allclose(np.diagonal(covmean).imag, 0, atol=1e-3): | |
| m = np.max(np.abs(covmean.imag)) | |
| raise ValueError('Imaginary component {}'.format(m)) | |
| covmean = covmean.real | |
| tr_covmean = np.trace(covmean) | |
| return (diff.dot(diff) + np.trace(sigma1) + # NOQA | |
| np.trace(sigma2) - 2 * tr_covmean) | |
| def get_i3d_activations(batched_video, | |
| i3d_model, | |
| target_endpoint='Logits', | |
| flatten=True, | |
| grad_enabled=False): | |
| """ | |
| Get features from i3d model and flatten them to 1d feature, | |
| valid target endpoints are defined in InceptionI3d.VALID_ENDPOINTS | |
| VALID_ENDPOINTS = ( | |
| 'Conv3d_1a_7x7', | |
| 'MaxPool3d_2a_3x3', | |
| 'Conv3d_2b_1x1', | |
| 'Conv3d_2c_3x3', | |
| 'MaxPool3d_3a_3x3', | |
| 'Mixed_3b', | |
| 'Mixed_3c', | |
| 'MaxPool3d_4a_3x3', | |
| 'Mixed_4b', | |
| 'Mixed_4c', | |
| 'Mixed_4d', | |
| 'Mixed_4e', | |
| 'Mixed_4f', | |
| 'MaxPool3d_5a_2x2', | |
| 'Mixed_5b', | |
| 'Mixed_5c', | |
| 'Logits', | |
| 'Predictions', | |
| ) | |
| """ | |
| with torch.set_grad_enabled(grad_enabled): | |
| feat = i3d_model.extract_features(batched_video.transpose(1, 2), | |
| target_endpoint) | |
| if flatten: | |
| feat = feat.view(feat.size(0), -1) | |
| return feat | |
| # This code is from https://github.com/piergiaj/pytorch-i3d/blob/master/pytorch_i3d.py | |
| # I only fix flake8 errors and do some cleaning here | |
| class MaxPool3dSamePadding(nn.MaxPool3d): | |
| def compute_pad(self, dim, s): | |
| if s % self.stride[dim] == 0: | |
| return max(self.kernel_size[dim] - self.stride[dim], 0) | |
| else: | |
| return max(self.kernel_size[dim] - (s % self.stride[dim]), 0) | |
| def forward(self, x): | |
| # compute 'same' padding | |
| (batch, channel, t, h, w) = x.size() | |
| pad_t = self.compute_pad(0, t) | |
| pad_h = self.compute_pad(1, h) | |
| pad_w = self.compute_pad(2, w) | |
| pad_t_f = pad_t // 2 | |
| pad_t_b = pad_t - pad_t_f | |
| pad_h_f = pad_h // 2 | |
| pad_h_b = pad_h - pad_h_f | |
| pad_w_f = pad_w // 2 | |
| pad_w_b = pad_w - pad_w_f | |
| pad = (pad_w_f, pad_w_b, pad_h_f, pad_h_b, pad_t_f, pad_t_b) | |
| x = F.pad(x, pad) | |
| return super(MaxPool3dSamePadding, self).forward(x) | |
| class Unit3D(nn.Module): | |
| def __init__(self, | |
| in_channels, | |
| output_channels, | |
| kernel_shape=(1, 1, 1), | |
| stride=(1, 1, 1), | |
| padding=0, | |
| activation_fn=F.relu, | |
| use_batch_norm=True, | |
| use_bias=False, | |
| name='unit_3d'): | |
| """Initializes Unit3D module.""" | |
| super(Unit3D, self).__init__() | |
| self._output_channels = output_channels | |
| self._kernel_shape = kernel_shape | |
| self._stride = stride | |
| self._use_batch_norm = use_batch_norm | |
| self._activation_fn = activation_fn | |
| self._use_bias = use_bias | |
| self.name = name | |
| self.padding = padding | |
| self.conv3d = nn.Conv3d( | |
| in_channels=in_channels, | |
| out_channels=self._output_channels, | |
| kernel_size=self._kernel_shape, | |
| stride=self._stride, | |
| padding=0, # we always want padding to be 0 here. We will | |
| # dynamically pad based on input size in forward function | |
| bias=self._use_bias) | |
| if self._use_batch_norm: | |
| self.bn = nn.BatchNorm3d(self._output_channels, | |
| eps=0.001, | |
| momentum=0.01) | |
| def compute_pad(self, dim, s): | |
| if s % self._stride[dim] == 0: | |
| return max(self._kernel_shape[dim] - self._stride[dim], 0) | |
| else: | |
| return max(self._kernel_shape[dim] - (s % self._stride[dim]), 0) | |
| def forward(self, x): | |
| # compute 'same' padding | |
| (batch, channel, t, h, w) = x.size() | |
| pad_t = self.compute_pad(0, t) | |
| pad_h = self.compute_pad(1, h) | |
| pad_w = self.compute_pad(2, w) | |
| pad_t_f = pad_t // 2 | |
| pad_t_b = pad_t - pad_t_f | |
| pad_h_f = pad_h // 2 | |
| pad_h_b = pad_h - pad_h_f | |
| pad_w_f = pad_w // 2 | |
| pad_w_b = pad_w - pad_w_f | |
| pad = (pad_w_f, pad_w_b, pad_h_f, pad_h_b, pad_t_f, pad_t_b) | |
| x = F.pad(x, pad) | |
| x = self.conv3d(x) | |
| if self._use_batch_norm: | |
| x = self.bn(x) | |
| if self._activation_fn is not None: | |
| x = self._activation_fn(x) | |
| return x | |
| class InceptionModule(nn.Module): | |
| def __init__(self, in_channels, out_channels, name): | |
| super(InceptionModule, self).__init__() | |
| self.b0 = Unit3D(in_channels=in_channels, | |
| output_channels=out_channels[0], | |
| kernel_shape=[1, 1, 1], | |
| padding=0, | |
| name=name + '/Branch_0/Conv3d_0a_1x1') | |
| self.b1a = Unit3D(in_channels=in_channels, | |
| output_channels=out_channels[1], | |
| kernel_shape=[1, 1, 1], | |
| padding=0, | |
| name=name + '/Branch_1/Conv3d_0a_1x1') | |
| self.b1b = Unit3D(in_channels=out_channels[1], | |
| output_channels=out_channels[2], | |
| kernel_shape=[3, 3, 3], | |
| name=name + '/Branch_1/Conv3d_0b_3x3') | |
| self.b2a = Unit3D(in_channels=in_channels, | |
| output_channels=out_channels[3], | |
| kernel_shape=[1, 1, 1], | |
| padding=0, | |
| name=name + '/Branch_2/Conv3d_0a_1x1') | |
| self.b2b = Unit3D(in_channels=out_channels[3], | |
| output_channels=out_channels[4], | |
| kernel_shape=[3, 3, 3], | |
| name=name + '/Branch_2/Conv3d_0b_3x3') | |
| self.b3a = MaxPool3dSamePadding(kernel_size=[3, 3, 3], | |
| stride=(1, 1, 1), | |
| padding=0) | |
| self.b3b = Unit3D(in_channels=in_channels, | |
| output_channels=out_channels[5], | |
| kernel_shape=[1, 1, 1], | |
| padding=0, | |
| name=name + '/Branch_3/Conv3d_0b_1x1') | |
| self.name = name | |
| def forward(self, x): | |
| b0 = self.b0(x) | |
| b1 = self.b1b(self.b1a(x)) | |
| b2 = self.b2b(self.b2a(x)) | |
| b3 = self.b3b(self.b3a(x)) | |
| return torch.cat([b0, b1, b2, b3], dim=1) | |
| class InceptionI3d(nn.Module): | |
| """Inception-v1 I3D architecture. | |
| The model is introduced in: | |
| Quo Vadis, Action Recognition? A New Model and the Kinetics Dataset | |
| Joao Carreira, Andrew Zisserman | |
| https://arxiv.org/pdf/1705.07750v1.pdf. | |
| See also the Inception architecture, introduced in: | |
| Going deeper with convolutions | |
| Christian Szegedy, Wei Liu, Yangqing Jia, Pierre Sermanet, Scott Reed, | |
| Dragomir Anguelov, Dumitru Erhan, Vincent Vanhoucke, Andrew Rabinovich. | |
| http://arxiv.org/pdf/1409.4842v1.pdf. | |
| """ | |
| # Endpoints of the model in order. During construction, all the endpoints up | |
| # to a designated `final_endpoint` are returned in a dictionary as the | |
| # second return value. | |
| VALID_ENDPOINTS = ( | |
| 'Conv3d_1a_7x7', | |
| 'MaxPool3d_2a_3x3', | |
| 'Conv3d_2b_1x1', | |
| 'Conv3d_2c_3x3', | |
| 'MaxPool3d_3a_3x3', | |
| 'Mixed_3b', | |
| 'Mixed_3c', | |
| 'MaxPool3d_4a_3x3', | |
| 'Mixed_4b', | |
| 'Mixed_4c', | |
| 'Mixed_4d', | |
| 'Mixed_4e', | |
| 'Mixed_4f', | |
| 'MaxPool3d_5a_2x2', | |
| 'Mixed_5b', | |
| 'Mixed_5c', | |
| 'Logits', | |
| 'Predictions', | |
| ) | |
| def __init__(self, | |
| num_classes=400, | |
| spatial_squeeze=True, | |
| final_endpoint='Logits', | |
| name='inception_i3d', | |
| in_channels=3, | |
| dropout_keep_prob=0.5): | |
| """Initializes I3D model instance. | |
| Args: | |
| num_classes: The number of outputs in the logit layer (default 400, which | |
| matches the Kinetics dataset). | |
| spatial_squeeze: Whether to squeeze the spatial dimensions for the logits | |
| before returning (default True). | |
| final_endpoint: The model contains many possible endpoints. | |
| `final_endpoint` specifies the last endpoint for the model to be built | |
| up to. In addition to the output at `final_endpoint`, all the outputs | |
| at endpoints up to `final_endpoint` will also be returned, in a | |
| dictionary. `final_endpoint` must be one of | |
| InceptionI3d.VALID_ENDPOINTS (default 'Logits'). | |
| name: A string (optional). The name of this module. | |
| Raises: | |
| ValueError: if `final_endpoint` is not recognized. | |
| """ | |
| if final_endpoint not in self.VALID_ENDPOINTS: | |
| raise ValueError('Unknown final endpoint %s' % final_endpoint) | |
| super(InceptionI3d, self).__init__() | |
| self._num_classes = num_classes | |
| self._spatial_squeeze = spatial_squeeze | |
| self._final_endpoint = final_endpoint | |
| self.logits = None | |
| if self._final_endpoint not in self.VALID_ENDPOINTS: | |
| raise ValueError('Unknown final endpoint %s' % | |
| self._final_endpoint) | |
| self.end_points = {} | |
| end_point = 'Conv3d_1a_7x7' | |
| self.end_points[end_point] = Unit3D(in_channels=in_channels, | |
| output_channels=64, | |
| kernel_shape=[7, 7, 7], | |
| stride=(2, 2, 2), | |
| padding=(3, 3, 3), | |
| name=name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'MaxPool3d_2a_3x3' | |
| self.end_points[end_point] = MaxPool3dSamePadding( | |
| kernel_size=[1, 3, 3], stride=(1, 2, 2), padding=0) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Conv3d_2b_1x1' | |
| self.end_points[end_point] = Unit3D(in_channels=64, | |
| output_channels=64, | |
| kernel_shape=[1, 1, 1], | |
| padding=0, | |
| name=name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Conv3d_2c_3x3' | |
| self.end_points[end_point] = Unit3D(in_channels=64, | |
| output_channels=192, | |
| kernel_shape=[3, 3, 3], | |
| padding=1, | |
| name=name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'MaxPool3d_3a_3x3' | |
| self.end_points[end_point] = MaxPool3dSamePadding( | |
| kernel_size=[1, 3, 3], stride=(1, 2, 2), padding=0) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_3b' | |
| self.end_points[end_point] = InceptionModule(192, | |
| [64, 96, 128, 16, 32, 32], | |
| name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_3c' | |
| self.end_points[end_point] = InceptionModule( | |
| 256, [128, 128, 192, 32, 96, 64], name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'MaxPool3d_4a_3x3' | |
| self.end_points[end_point] = MaxPool3dSamePadding( | |
| kernel_size=[3, 3, 3], stride=(2, 2, 2), padding=0) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_4b' | |
| self.end_points[end_point] = InceptionModule( | |
| 128 + 192 + 96 + 64, [192, 96, 208, 16, 48, 64], name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_4c' | |
| self.end_points[end_point] = InceptionModule( | |
| 192 + 208 + 48 + 64, [160, 112, 224, 24, 64, 64], name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_4d' | |
| self.end_points[end_point] = InceptionModule( | |
| 160 + 224 + 64 + 64, [128, 128, 256, 24, 64, 64], name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_4e' | |
| self.end_points[end_point] = InceptionModule( | |
| 128 + 256 + 64 + 64, [112, 144, 288, 32, 64, 64], name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_4f' | |
| self.end_points[end_point] = InceptionModule( | |
| 112 + 288 + 64 + 64, [256, 160, 320, 32, 128, 128], | |
| name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'MaxPool3d_5a_2x2' | |
| self.end_points[end_point] = MaxPool3dSamePadding( | |
| kernel_size=[2, 2, 2], stride=(2, 2, 2), padding=0) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_5b' | |
| self.end_points[end_point] = InceptionModule( | |
| 256 + 320 + 128 + 128, [256, 160, 320, 32, 128, 128], | |
| name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Mixed_5c' | |
| self.end_points[end_point] = InceptionModule( | |
| 256 + 320 + 128 + 128, [384, 192, 384, 48, 128, 128], | |
| name + end_point) | |
| if self._final_endpoint == end_point: | |
| return | |
| end_point = 'Logits' | |
| self.avg_pool = nn.AvgPool3d(kernel_size=[2, 7, 7], stride=(1, 1, 1)) | |
| self.dropout = nn.Dropout(dropout_keep_prob) | |
| self.logits = Unit3D(in_channels=384 + 384 + 128 + 128, | |
| output_channels=self._num_classes, | |
| kernel_shape=[1, 1, 1], | |
| padding=0, | |
| activation_fn=None, | |
| use_batch_norm=False, | |
| use_bias=True, | |
| name='logits') | |
| self.build() | |
| def replace_logits(self, num_classes): | |
| self._num_classes = num_classes | |
| self.logits = Unit3D(in_channels=384 + 384 + 128 + 128, | |
| output_channels=self._num_classes, | |
| kernel_shape=[1, 1, 1], | |
| padding=0, | |
| activation_fn=None, | |
| use_batch_norm=False, | |
| use_bias=True, | |
| name='logits') | |
| def build(self): | |
| for k in self.end_points.keys(): | |
| self.add_module(k, self.end_points[k]) | |
| def forward(self, x): | |
| for end_point in self.VALID_ENDPOINTS: | |
| if end_point in self.end_points: | |
| x = self._modules[end_point]( | |
| x) # use _modules to work with dataparallel | |
| x = self.logits(self.dropout(self.avg_pool(x))) | |
| if self._spatial_squeeze: | |
| logits = x.squeeze(3).squeeze(3) | |
| # logits is batch X time X classes, which is what we want to work with | |
| return logits | |
| def extract_features(self, x, target_endpoint='Logits'): | |
| for end_point in self.VALID_ENDPOINTS: | |
| if end_point in self.end_points: | |
| x = self._modules[end_point](x) | |
| if end_point == target_endpoint: | |
| break | |
| if target_endpoint == 'Logits': | |
| return x.mean(4).mean(3).mean(2) | |
| else: | |
| return x | |