Spaces:
Running
on
Zero
Running
on
Zero
| import torch | |
| import torch.nn as nn | |
| from torch.nn import init | |
| import functools | |
| from torch.optim import lr_scheduler | |
| import numpy as np | |
| import torch.nn.functional as F | |
| from torch.nn.modules.normalization import LayerNorm | |
| import os | |
| from torch.nn.utils import spectral_norm | |
| from torchvision import models | |
| ############################################################################### | |
| # Helper functions | |
| ############################################################################### | |
| def init_weights(net, init_type='normal', init_gain=0.02): | |
| """Initialize network weights. | |
| Parameters: | |
| net (network) -- network to be initialized | |
| init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal | |
| init_gain (float) -- scaling factor for normal, xavier and orthogonal. | |
| We use 'normal' in the original pix2pix and CycleGAN paper. But xavier and kaiming might | |
| work better for some applications. Feel free to try yourself. | |
| """ | |
| def init_func(m): # define the initialization function | |
| classname = m.__class__.__name__ | |
| if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1): | |
| if init_type == 'normal': | |
| init.normal_(m.weight.data, 0.0, init_gain) | |
| elif init_type == 'xavier': | |
| init.xavier_normal_(m.weight.data, gain=init_gain) | |
| elif init_type == 'kaiming': | |
| #init.kaiming_normal_(m.weight.data, a=0, mode='fan_in') | |
| init.kaiming_normal_(m.weight.data, a=0.2, mode='fan_in', nonlinearity='leaky_relu') | |
| elif init_type == 'orthogonal': | |
| init.orthogonal_(m.weight.data, gain=init_gain) | |
| else: | |
| raise NotImplementedError('initialization method [%s] is not implemented' % init_type) | |
| if hasattr(m, 'bias') and m.bias is not None: | |
| init.constant_(m.bias.data, 0.0) | |
| elif classname.find('BatchNorm2d') != -1: # BatchNorm Layer's weight is not a matrix; only normal distribution applies. | |
| init.normal_(m.weight.data, 1.0, init_gain) | |
| init.constant_(m.bias.data, 0.0) | |
| print('initialize network with %s' % init_type) | |
| net.apply(init_func) # apply the initialization function <init_func> | |
| def init_net(net, init_type='normal', init_gain=0.02, gpu_ids=[], init=True): | |
| """Initialize a network: 1. register CPU/GPU device (with multi-GPU support); 2. initialize the network weights | |
| Parameters: | |
| net (network) -- the network to be initialized | |
| init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal | |
| gain (float) -- scaling factor for normal, xavier and orthogonal. | |
| gpu_ids (int list) -- which GPUs the network runs on: e.g., 0,1,2 | |
| Return an initialized network. | |
| """ | |
| if len(gpu_ids) > 0: | |
| assert(torch.cuda.is_available()) | |
| net.to(gpu_ids[0]) | |
| if init: | |
| init_weights(net, init_type, init_gain=init_gain) | |
| return net | |
| def get_scheduler(optimizer, opt): | |
| """Return a learning rate scheduler | |
| Parameters: | |
| optimizer -- the optimizer of the network | |
| opt (option class) -- stores all the experiment flags; needs to be a subclass of BaseOptions. | |
| opt.lr_policy is the name of learning rate policy: linear | step | plateau | cosine | |
| For 'linear', we keep the same learning rate for the first <opt.niter> epochs | |
| and linearly decay the rate to zero over the next <opt.niter_decay> epochs. | |
| For other schedulers (step, plateau, and cosine), we use the default PyTorch schedulers. | |
| See https://pytorch.org/docs/stable/optim.html for more details. | |
| """ | |
| if opt.lr_policy == 'linear': | |
| def lambda_rule(epoch): | |
| lr_l = 1.0 - max(0, epoch + opt.epoch_count - opt.niter) / float(opt.niter_decay + 1) | |
| return lr_l | |
| scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda_rule) | |
| elif opt.lr_policy == 'step': | |
| scheduler = lr_scheduler.StepLR(optimizer, step_size=opt.lr_decay_iters, gamma=0.1) | |
| elif opt.lr_policy == 'plateau': | |
| scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, threshold=0.01, patience=5) | |
| elif opt.lr_policy == 'cosine': | |
| scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=opt.niter, eta_min=0) | |
| else: | |
| return NotImplementedError('learning rate policy [%s] is not implemented', opt.lr_policy) | |
| return scheduler | |
| class LayerNormWarpper(nn.Module): | |
| def __init__(self, num_features): | |
| super(LayerNormWarpper, self).__init__() | |
| self.num_features = int(num_features) | |
| def forward(self, x): | |
| x = nn.LayerNorm([self.num_features, x.size()[2], x.size()[3]], elementwise_affine=False).cuda()(x) | |
| return x | |
| def get_norm_layer(norm_type='instance'): | |
| """Return a normalization layer | |
| Parameters: | |
| norm_type (str) -- the name of the normalization layer: batch | instance | none | |
| For BatchNorm, we use learnable affine parameters and track running statistics (mean/stddev). | |
| For InstanceNorm, we do not use learnable affine parameters. We do not track running statistics. | |
| """ | |
| if norm_type == 'batch': | |
| norm_layer = functools.partial(nn.BatchNorm2d, affine=True, track_running_stats=True) | |
| elif norm_type == 'instance': | |
| norm_layer = functools.partial(nn.InstanceNorm2d, affine=False, track_running_stats=False) | |
| elif norm_type == 'layer': | |
| norm_layer = functools.partial(LayerNormWarpper) | |
| elif norm_type == 'none': | |
| norm_layer = None | |
| else: | |
| raise NotImplementedError('normalization layer [%s] is not found' % norm_type) | |
| return norm_layer | |
| def get_non_linearity(layer_type='relu'): | |
| if layer_type == 'relu': | |
| nl_layer = functools.partial(nn.ReLU, inplace=True) | |
| elif layer_type == 'lrelu': | |
| nl_layer = functools.partial( | |
| nn.LeakyReLU, negative_slope=0.2, inplace=True) | |
| elif layer_type == 'elu': | |
| nl_layer = functools.partial(nn.ELU, inplace=True) | |
| elif layer_type == 'selu': | |
| nl_layer = functools.partial(nn.SELU, inplace=True) | |
| elif layer_type == 'prelu': | |
| nl_layer = functools.partial(nn.PReLU) | |
| else: | |
| raise NotImplementedError( | |
| 'nonlinearity activitation [%s] is not found' % layer_type) | |
| return nl_layer | |
| def define_G(input_nc, output_nc, nz, ngf, netG='unet_128', norm='batch', nl='relu', use_noise=False, | |
| use_dropout=False, init_type='xavier', init_gain=0.02, gpu_ids=[], where_add='input', upsample='bilinear'): | |
| net = None | |
| norm_layer = get_norm_layer(norm_type=norm) | |
| nl_layer = get_non_linearity(layer_type=nl) | |
| # print(norm, norm_layer) | |
| if nz == 0: | |
| where_add = 'input' | |
| if netG == 'unet_128' and where_add == 'input': | |
| net = G_Unet_add_input(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
| use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
| elif netG == 'unet_128_G' and where_add == 'input': | |
| net = G_Unet_add_input_G(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
| use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
| elif netG == 'unet_256' and where_add == 'input': | |
| net = G_Unet_add_input(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
| use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
| elif netG == 'unet_256_G' and where_add == 'input': | |
| net = G_Unet_add_input_G(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
| use_dropout=use_dropout, upsample=upsample, device=gpu_ids) | |
| elif netG == 'unet_128' and where_add == 'all': | |
| net = G_Unet_add_all(input_nc, output_nc, nz, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
| use_dropout=use_dropout, upsample=upsample) | |
| elif netG == 'unet_256' and where_add == 'all': | |
| net = G_Unet_add_all(input_nc, output_nc, nz, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, use_noise=use_noise, | |
| use_dropout=use_dropout, upsample=upsample) | |
| else: | |
| raise NotImplementedError('Generator model name [%s] is not recognized' % net) | |
| # print(net) | |
| return init_net(net, init_type, init_gain, gpu_ids) | |
| def define_C(input_nc, output_nc, nz, ngf, netC='unet_128', norm='instance', nl='relu', | |
| use_dropout=False, init_type='normal', init_gain=0.02, gpu_ids=[], upsample='basic'): | |
| net = None | |
| norm_layer = get_norm_layer(norm_type=norm) | |
| nl_layer = get_non_linearity(layer_type=nl) | |
| if netC == 'resnet_9blocks': | |
| net = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=9) | |
| elif netC == 'resnet_6blocks': | |
| net = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=6) | |
| elif netC == 'unet_128': | |
| net = G_Unet_add_input_C(input_nc, output_nc, 0, 7, ngf, norm_layer=norm_layer, nl_layer=nl_layer, | |
| use_dropout=use_dropout, upsample=upsample) | |
| elif netC == 'unet_256': | |
| net = G_Unet_add_input(input_nc, output_nc, 0, 8, ngf, norm_layer=norm_layer, nl_layer=nl_layer, | |
| use_dropout=use_dropout, upsample=upsample) | |
| elif netC == 'unet_32': | |
| net = G_Unet_add_input(input_nc, output_nc, 0, 5, ngf, norm_layer=norm_layer, nl_layer=nl_layer, | |
| use_dropout=use_dropout, upsample=upsample) | |
| else: | |
| raise NotImplementedError('Generator model name [%s] is not recognized' % net) | |
| return init_net(net, init_type, init_gain, gpu_ids) | |
| def define_D(input_nc, ndf, netD, norm='batch', nl='lrelu', init_type='xavier', init_gain=0.02, num_Ds=1, gpu_ids=[]): | |
| net = None | |
| norm_layer = get_norm_layer(norm_type=norm) | |
| nl = 'lrelu' # use leaky relu for D | |
| nl_layer = get_non_linearity(layer_type=nl) | |
| if netD == 'basic_128': | |
| net = D_NLayers(input_nc, ndf, n_layers=2, norm_layer=norm_layer, nl_layer=nl_layer) | |
| elif netD == 'basic_256': | |
| net = D_NLayers(input_nc, ndf, n_layers=3, norm_layer=norm_layer, nl_layer=nl_layer) | |
| elif netD == 'basic_128_multi': | |
| net = D_NLayersMulti(input_nc=input_nc, ndf=ndf, n_layers=2, norm_layer=norm_layer, num_D=num_Ds, nl_layer=nl_layer) | |
| elif netD == 'basic_256_multi': | |
| net = D_NLayersMulti(input_nc=input_nc, ndf=ndf, n_layers=3, norm_layer=norm_layer, num_D=num_Ds, nl_layer=nl_layer) | |
| else: | |
| raise NotImplementedError('Discriminator model name [%s] is not recognized' % net) | |
| return init_net(net, init_type, init_gain, gpu_ids) | |
| def define_E(input_nc, output_nc, ndf, netE, norm='batch', nl='lrelu', | |
| init_type='xavier', init_gain=0.02, gpu_ids=[], vaeLike=False): | |
| net = None | |
| norm_layer = get_norm_layer(norm_type=norm) | |
| nl = 'lrelu' # use leaky relu for E | |
| nl_layer = get_non_linearity(layer_type=nl) | |
| if netE == 'resnet_128': | |
| net = E_ResNet(input_nc, output_nc, ndf, n_blocks=4, norm_layer=norm_layer, | |
| nl_layer=nl_layer, vaeLike=vaeLike) | |
| elif netE == 'resnet_256': | |
| net = E_ResNet(input_nc, output_nc, ndf, n_blocks=5, norm_layer=norm_layer, | |
| nl_layer=nl_layer, vaeLike=vaeLike) | |
| elif netE == 'conv_128': | |
| net = E_NLayers(input_nc, output_nc, ndf, n_layers=4, norm_layer=norm_layer, | |
| nl_layer=nl_layer, vaeLike=vaeLike) | |
| elif netE == 'conv_256': | |
| net = E_NLayers(input_nc, output_nc, ndf, n_layers=5, norm_layer=norm_layer, | |
| nl_layer=nl_layer, vaeLike=vaeLike) | |
| else: | |
| raise NotImplementedError('Encoder model name [%s] is not recognized' % net) | |
| return init_net(net, init_type, init_gain, gpu_ids, False) | |
| class ResnetGenerator(nn.Module): | |
| def __init__(self, input_nc, output_nc, ngf=64, n_downsampling=3, norm_layer=None, use_dropout=False, n_blocks=6, padding_type='replicate'): | |
| assert(n_blocks >= 0) | |
| super(ResnetGenerator, self).__init__() | |
| self.input_nc = input_nc | |
| self.output_nc = output_nc | |
| self.ngf = ngf | |
| if type(norm_layer) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters | |
| use_bias = norm_layer.func != nn.BatchNorm2d | |
| else: | |
| use_bias = norm_layer != nn.BatchNorm2d | |
| model = [nn.ReplicationPad2d(3), | |
| nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0, | |
| bias=use_bias)] | |
| if norm_layer is not None: | |
| model += [norm_layer(ngf)] | |
| model += [nn.ReLU(True)] | |
| # n_downsampling = 2 | |
| for i in range(n_downsampling): | |
| mult = 2**i | |
| model += [nn.ReplicationPad2d(1),nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, | |
| stride=2, padding=0, bias=use_bias)] | |
| # model += [nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3, | |
| # stride=2, padding=1, bias=use_bias)] | |
| if norm_layer is not None: | |
| model += [norm_layer(ngf * mult * 2)] | |
| model += [nn.ReLU(True)] | |
| mult = 2**n_downsampling | |
| for i in range(n_blocks): | |
| model += [ResnetBlock(ngf * mult, padding_type=padding_type, norm_layer=norm_layer, use_dropout=use_dropout, use_bias=use_bias)] | |
| for i in range(n_downsampling): | |
| mult = 2**(n_downsampling - i) | |
| # model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2), | |
| # kernel_size=3, stride=2, | |
| # padding=1, output_padding=1, | |
| # bias=use_bias)] | |
| # if norm_layer is not None: | |
| # model += [norm_layer(ngf * mult / 2)] | |
| # model += [nn.ReLU(True)] | |
| model += upsampleLayer(ngf * mult, int(ngf * mult / 2), upsample='bilinear', padding_type=padding_type) | |
| if norm_layer is not None: | |
| model += [norm_layer(int(ngf * mult / 2))] | |
| model += [nn.ReLU(True)] | |
| model +=[nn.ReplicationPad2d(1), | |
| nn.Conv2d(int(ngf * mult / 2), int(ngf * mult / 2), kernel_size=3, padding=0)] | |
| if norm_layer is not None: | |
| model += [norm_layer(ngf * mult / 2)] | |
| model += [nn.ReLU(True)] | |
| model += [nn.ReplicationPad2d(3)] | |
| model += [nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0)] | |
| #model += [nn.Tanh()] | |
| self.model = nn.Sequential(*model) | |
| def forward(self, input): | |
| return self.model(input) | |
| # Define a resnet block | |
| class ResnetBlock(nn.Module): | |
| def __init__(self, dim, padding_type, norm_layer, use_dropout, use_bias): | |
| super(ResnetBlock, self).__init__() | |
| self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias) | |
| def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias): | |
| conv_block = [] | |
| p = 0 | |
| if padding_type == 'reflect': | |
| conv_block += [nn.ReflectionPad2d(1)] | |
| elif padding_type == 'replicate': | |
| conv_block += [nn.ReplicationPad2d(1)] | |
| elif padding_type == 'zero': | |
| p = 1 | |
| else: | |
| raise NotImplementedError('padding [%s] is not implemented' % padding_type) | |
| conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias)] | |
| if norm_layer is not None: | |
| conv_block += [norm_layer(dim)] | |
| conv_block += [nn.ReLU(True)] | |
| # if use_dropout: | |
| # conv_block += [nn.Dropout(0.5)] | |
| p = 0 | |
| if padding_type == 'reflect': | |
| conv_block += [nn.ReflectionPad2d(1)] | |
| elif padding_type == 'replicate': | |
| conv_block += [nn.ReplicationPad2d(1)] | |
| elif padding_type == 'zero': | |
| p = 1 | |
| else: | |
| raise NotImplementedError('padding [%s] is not implemented' % padding_type) | |
| conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias)] | |
| if norm_layer is not None: | |
| conv_block += [norm_layer(dim)] | |
| return nn.Sequential(*conv_block) | |
| def forward(self, x): | |
| out = x + self.conv_block(x) | |
| return out | |
| class D_NLayersMulti(nn.Module): | |
| def __init__(self, input_nc, ndf=64, n_layers=3, | |
| norm_layer=nn.BatchNorm2d, num_D=1, nl_layer=None): | |
| super(D_NLayersMulti, self).__init__() | |
| # st() | |
| self.num_D = num_D | |
| self.nl_layer=nl_layer | |
| if num_D == 1: | |
| layers = self.get_layers(input_nc, ndf, n_layers, norm_layer) | |
| self.model = nn.Sequential(*layers) | |
| else: | |
| layers = self.get_layers(input_nc, ndf, n_layers, norm_layer) | |
| self.add_module("model_0", nn.Sequential(*layers)) | |
| self.down = nn.functional.interpolate | |
| for i in range(1, num_D): | |
| ndf_i = int(round(ndf / (2**i))) | |
| layers = self.get_layers(input_nc, ndf_i, n_layers, norm_layer) | |
| self.add_module("model_%d" % i, nn.Sequential(*layers)) | |
| def get_layers(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d): | |
| kw = 3 | |
| padw = 1 | |
| sequence = [spectral_norm(nn.Conv2d(input_nc, ndf, kernel_size=kw, | |
| stride=2, padding=padw)), nn.LeakyReLU(0.2, True)] | |
| nf_mult = 1 | |
| nf_mult_prev = 1 | |
| for n in range(1, n_layers): | |
| nf_mult_prev = nf_mult | |
| nf_mult = min(2**n, 8) | |
| sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, | |
| kernel_size=kw, stride=2, padding=padw))] | |
| if norm_layer: | |
| sequence += [norm_layer(ndf * nf_mult)] | |
| sequence += [self.nl_layer()] | |
| nf_mult_prev = nf_mult | |
| nf_mult = min(2**n_layers, 8) | |
| sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, | |
| kernel_size=kw, stride=1, padding=padw))] | |
| if norm_layer: | |
| sequence += [norm_layer(ndf * nf_mult)] | |
| sequence += [self.nl_layer()] | |
| sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult, 1, | |
| kernel_size=kw, stride=1, padding=padw))] | |
| return sequence | |
| def forward(self, input): | |
| if self.num_D == 1: | |
| return self.model(input) | |
| result = [] | |
| down = input | |
| for i in range(self.num_D): | |
| model = getattr(self, "model_%d" % i) | |
| result.append(model(down)) | |
| if i != self.num_D - 1: | |
| down = self.down(down, scale_factor=0.5, mode='bilinear') | |
| return result | |
| class D_NLayers(nn.Module): | |
| """Defines a PatchGAN discriminator""" | |
| def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d): | |
| """Construct a PatchGAN discriminator | |
| Parameters: | |
| input_nc (int) -- the number of channels in input images | |
| ndf (int) -- the number of filters in the last conv layer | |
| n_layers (int) -- the number of conv layers in the discriminator | |
| norm_layer -- normalization layer | |
| """ | |
| super(D_NLayers, self).__init__() | |
| if type(norm_layer) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters | |
| use_bias = norm_layer.func != nn.BatchNorm2d | |
| else: | |
| use_bias = norm_layer != nn.BatchNorm2d | |
| kw = 3 | |
| padw = 1 | |
| sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)] | |
| nf_mult = 1 | |
| nf_mult_prev = 1 | |
| for n in range(1, n_layers): # gradually increase the number of filters | |
| nf_mult_prev = nf_mult | |
| nf_mult = min(2 ** n, 8) | |
| sequence += [ | |
| nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias), | |
| norm_layer(ndf * nf_mult), | |
| nn.LeakyReLU(0.2, True) | |
| ] | |
| nf_mult_prev = nf_mult | |
| nf_mult = min(2 ** n_layers, 8) | |
| sequence += [ | |
| nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=use_bias), | |
| norm_layer(ndf * nf_mult), | |
| nn.LeakyReLU(0.2, True) | |
| ] | |
| sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)] # output 1 channel prediction map | |
| self.model = nn.Sequential(*sequence) | |
| def forward(self, input): | |
| """Standard forward.""" | |
| return self.model(input) | |
| class G_Unet_add_input(nn.Module): | |
| def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
| norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, | |
| upsample='basic', device=0): | |
| super(G_Unet_add_input, self).__init__() | |
| self.nz = nz | |
| max_nchn = 8 | |
| noise = [] | |
| for i in range(num_downs+1): | |
| if use_noise: | |
| noise.append(True) | |
| else: | |
| noise.append(False) | |
| # construct unet structure | |
| #print(num_downs) | |
| unet_block = UnetBlock_A(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=noise[num_downs-1], | |
| innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| for i in range(num_downs - 5): | |
| unet_block = UnetBlock_A(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise[num_downs-i-3], | |
| norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
| unet_block = UnetBlock_A(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise[2], | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock_A(ngf * 2, ngf * 2, ngf * 4, unet_block, noise[1], | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock_A(ngf, ngf, ngf * 2, unet_block, noise[0], | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock_A(input_nc + nz, output_nc, ngf, unet_block, None, | |
| outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| self.model = unet_block | |
| def forward(self, x, z=None): | |
| if self.nz > 0: | |
| z_img = z.view(z.size(0), z.size(1), 1, 1).expand( | |
| z.size(0), z.size(1), x.size(2), x.size(3)) | |
| x_with_z = torch.cat([x, z_img], 1) | |
| else: | |
| x_with_z = x # no z | |
| return torch.tanh(self.model(x_with_z)) | |
| # return self.model(x_with_z) | |
| class G_Unet_add_input_G(nn.Module): | |
| def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
| norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, | |
| upsample='basic', device=0): | |
| super(G_Unet_add_input_G, self).__init__() | |
| self.nz = nz | |
| max_nchn = 8 | |
| noise = [] | |
| for i in range(num_downs+1): | |
| if use_noise: | |
| noise.append(True) | |
| else: | |
| noise.append(False) | |
| # construct unet structure | |
| #print(num_downs) | |
| unet_block = UnetBlock_G(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=False, | |
| innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| for i in range(num_downs - 5): | |
| unet_block = UnetBlock_G(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise=False, | |
| norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
| unet_block = UnetBlock_G(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise[2], | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
| unet_block = UnetBlock_G(ngf * 2, ngf * 2, ngf * 4, unet_block, noise[1], | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
| unet_block = UnetBlock_G(ngf, ngf, ngf * 2, unet_block, noise[0], | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
| unet_block = UnetBlock_G(input_nc + nz, output_nc, ngf, unet_block, None, | |
| outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample='basic') | |
| self.model = unet_block | |
| def forward(self, x, z=None): | |
| if self.nz > 0: | |
| z_img = z.view(z.size(0), z.size(1), 1, 1).expand( | |
| z.size(0), z.size(1), x.size(2), x.size(3)) | |
| x_with_z = torch.cat([x, z_img], 1) | |
| else: | |
| x_with_z = x # no z | |
| # return F.tanh(self.model(x_with_z)) | |
| return self.model(x_with_z) | |
| class G_Unet_add_input_C(nn.Module): | |
| def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
| norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, | |
| upsample='basic', device=0): | |
| super(G_Unet_add_input_C, self).__init__() | |
| self.nz = nz | |
| max_nchn = 8 | |
| # construct unet structure | |
| #print(num_downs) | |
| unet_block = UnetBlock(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, noise=False, | |
| innermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| for i in range(num_downs - 5): | |
| unet_block = UnetBlock(ngf * max_nchn, ngf * max_nchn, ngf * max_nchn, unet_block, noise=False, | |
| norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
| unet_block = UnetBlock(ngf * 4, ngf * 4, ngf * max_nchn, unet_block, noise=False, | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock(ngf * 2, ngf * 2, ngf * 4, unet_block, noise=False, | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock(ngf, ngf, ngf * 2, unet_block, noise=False, | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock(input_nc + nz, output_nc, ngf, unet_block, noise=False, | |
| outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| self.model = unet_block | |
| def forward(self, x, z=None): | |
| if self.nz > 0: | |
| z_img = z.view(z.size(0), z.size(1), 1, 1).expand( | |
| z.size(0), z.size(1), x.size(2), x.size(3)) | |
| x_with_z = torch.cat([x, z_img], 1) | |
| else: | |
| x_with_z = x # no z | |
| # return torch.tanh(self.model(x_with_z)) | |
| return self.model(x_with_z) | |
| def upsampleLayer(inplanes, outplanes, kw=1, upsample='basic', padding_type='replicate'): | |
| # padding_type = 'zero' | |
| if upsample == 'basic': | |
| upconv = [nn.ConvTranspose2d(inplanes, outplanes, kernel_size=4, stride=2, padding=1)]#, padding_mode='replicate' | |
| elif upsample == 'bilinear' or upsample == 'nearest' or upsample == 'linear': | |
| upconv = [nn.Upsample(scale_factor=2, mode=upsample, align_corners=True), | |
| #nn.ReplicationPad2d(1), | |
| nn.Conv2d(inplanes, outplanes, kernel_size=1, stride=1, padding=0)] | |
| # p = kw//2 | |
| # upconv = [nn.Upsample(scale_factor=2, mode=upsample, align_corners=True), | |
| # nn.Conv2d(inplanes, outplanes, kernel_size=kw, stride=1, padding=p, padding_mode='replicate')] | |
| else: | |
| raise NotImplementedError( | |
| 'upsample layer [%s] not implemented' % upsample) | |
| return upconv | |
| class UnetBlock_G(nn.Module): | |
| def __init__(self, input_nc, outer_nc, inner_nc, | |
| submodule=None, noise=None, outermost=False, innermost=False, | |
| norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): | |
| super(UnetBlock_G, self).__init__() | |
| self.outermost = outermost | |
| p = 0 | |
| downconv = [] | |
| if padding_type == 'reflect': | |
| downconv += [nn.ReflectionPad2d(1)] | |
| elif padding_type == 'replicate': | |
| downconv += [nn.ReplicationPad2d(1)] | |
| elif padding_type == 'zero': | |
| p = 1 | |
| else: | |
| raise NotImplementedError( | |
| 'padding [%s] is not implemented' % padding_type) | |
| downconv += [nn.Conv2d(input_nc, inner_nc, | |
| kernel_size=3, stride=2, padding=p)] | |
| # downsample is different from upsample | |
| downrelu = nn.LeakyReLU(0.2, True) | |
| downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
| uprelu = nl_layer() | |
| uprelu2 = nl_layer() | |
| uppad = nn.ReplicationPad2d(1) | |
| upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
| upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
| self.noiseblock = ApplyNoise(outer_nc) | |
| self.noise = noise | |
| if outermost: | |
| upconv = upsampleLayer(inner_nc * 2, inner_nc, upsample=upsample, padding_type=padding_type) | |
| uppad = nn.ReplicationPad2d(3) | |
| upconv2 = nn.Conv2d(inner_nc, outer_nc, kernel_size=7, padding=0) | |
| down = downconv | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [norm_layer(inner_nc)] | |
| # upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
| # upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=0) | |
| # down = downconv | |
| # up = [uprelu] + upconv | |
| # if upnorm is not None: | |
| # up += [norm_layer(outer_nc)] | |
| up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
| model = down + [submodule] + up | |
| elif innermost: | |
| upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
| down = [downrelu] + downconv | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| model = down + up | |
| else: | |
| upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
| down = [downrelu] + downconv | |
| if downnorm is not None: | |
| down += [downnorm] | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| if use_dropout: | |
| model = down + [submodule] + up + [nn.Dropout(0.5)] | |
| else: | |
| model = down + [submodule] + up | |
| self.model = nn.Sequential(*model) | |
| def forward(self, x): | |
| if self.outermost: | |
| return self.model(x) | |
| else: | |
| x2 = self.model(x) | |
| if self.noise: | |
| x2 = self.noiseblock(x2, self.noise) | |
| return torch.cat([x2, x], 1) | |
| class UnetBlock(nn.Module): | |
| def __init__(self, input_nc, outer_nc, inner_nc, | |
| submodule=None, noise=None, outermost=False, innermost=False, | |
| norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): | |
| super(UnetBlock, self).__init__() | |
| self.outermost = outermost | |
| p = 0 | |
| downconv = [] | |
| if padding_type == 'reflect': | |
| downconv += [nn.ReflectionPad2d(1)] | |
| elif padding_type == 'replicate': | |
| downconv += [nn.ReplicationPad2d(1)] | |
| elif padding_type == 'zero': | |
| p = 1 | |
| else: | |
| raise NotImplementedError( | |
| 'padding [%s] is not implemented' % padding_type) | |
| downconv += [nn.Conv2d(input_nc, inner_nc, | |
| kernel_size=3, stride=2, padding=p)] | |
| # downsample is different from upsample | |
| downrelu = nn.LeakyReLU(0.2, True) | |
| downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
| uprelu = nl_layer() | |
| uprelu2 = nl_layer() | |
| uppad = nn.ReplicationPad2d(1) | |
| upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
| upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
| self.noiseblock = ApplyNoise(outer_nc) | |
| self.noise = noise | |
| if outermost: | |
| upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
| down = downconv | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
| model = down + [submodule] + up | |
| elif innermost: | |
| upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
| down = [downrelu] + downconv | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| model = down + up | |
| else: | |
| upconv = upsampleLayer(inner_nc * 2, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p) | |
| down = [downrelu] + downconv | |
| if downnorm is not None: | |
| down += [downnorm] | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| if use_dropout: | |
| model = down + [submodule] + up + [nn.Dropout(0.5)] | |
| else: | |
| model = down + [submodule] + up | |
| self.model = nn.Sequential(*model) | |
| def forward(self, x): | |
| if self.outermost: | |
| return self.model(x) | |
| else: | |
| x2 = self.model(x) | |
| if self.noise: | |
| x2 = self.noiseblock(x2, self.noise) | |
| return torch.cat([x2, x], 1) | |
| # Defines the submodule with skip connection. | |
| # X -------------------identity---------------------- X | |
| # |-- downsampling -- |submodule| -- upsampling --| | |
| class UnetBlock_A(nn.Module): | |
| def __init__(self, input_nc, outer_nc, inner_nc, | |
| submodule=None, noise=None, outermost=False, innermost=False, | |
| norm_layer=None, nl_layer=None, use_dropout=False, upsample='basic', padding_type='replicate'): | |
| super(UnetBlock_A, self).__init__() | |
| self.outermost = outermost | |
| p = 0 | |
| downconv = [] | |
| if padding_type == 'reflect': | |
| downconv += [nn.ReflectionPad2d(1)] | |
| elif padding_type == 'replicate': | |
| downconv += [nn.ReplicationPad2d(1)] | |
| elif padding_type == 'zero': | |
| p = 1 | |
| else: | |
| raise NotImplementedError( | |
| 'padding [%s] is not implemented' % padding_type) | |
| downconv += [spectral_norm(nn.Conv2d(input_nc, inner_nc, | |
| kernel_size=3, stride=2, padding=p))] | |
| # downsample is different from upsample | |
| downrelu = nn.LeakyReLU(0.2, True) | |
| downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
| uprelu = nl_layer() | |
| uprelu2 = nl_layer() | |
| uppad = nn.ReplicationPad2d(1) | |
| upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
| upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
| self.noiseblock = ApplyNoise(outer_nc) | |
| self.noise = noise | |
| if outermost: | |
| upconv = upsampleLayer(inner_nc * 1, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
| down = downconv | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
| model = down + [submodule] + up | |
| elif innermost: | |
| upconv = upsampleLayer(inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
| down = [downrelu] + downconv | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| model = down + up | |
| else: | |
| upconv = upsampleLayer(inner_nc * 1, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
| down = [downrelu] + downconv | |
| if downnorm is not None: | |
| down += [downnorm] | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| if use_dropout: | |
| model = down + [submodule] + up + [nn.Dropout(0.5)] | |
| else: | |
| model = down + [submodule] + up | |
| self.model = nn.Sequential(*model) | |
| def forward(self, x): | |
| if self.outermost: | |
| return self.model(x) | |
| else: | |
| x2 = self.model(x) | |
| if self.noise: | |
| x2 = self.noiseblock(x2, self.noise) | |
| if x2.shape[-1]==x.shape[-1]: | |
| return x2 + x | |
| else: | |
| x2 = F.interpolate(x2, x.shape[2:]) | |
| return x2 + x | |
| class E_ResNet(nn.Module): | |
| def __init__(self, input_nc=3, output_nc=1, ndf=64, n_blocks=4, | |
| norm_layer=None, nl_layer=None, vaeLike=False): | |
| super(E_ResNet, self).__init__() | |
| self.vaeLike = vaeLike | |
| max_ndf = 4 | |
| conv_layers = [ | |
| nn.Conv2d(input_nc, ndf, kernel_size=3, stride=2, padding=1, bias=True)] | |
| for n in range(1, n_blocks): | |
| input_ndf = ndf * min(max_ndf, n) | |
| output_ndf = ndf * min(max_ndf, n + 1) | |
| conv_layers += [BasicBlock(input_ndf, | |
| output_ndf, norm_layer, nl_layer)] | |
| conv_layers += [nl_layer(), nn.AdaptiveAvgPool2d(4)] | |
| if vaeLike: | |
| self.fc = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) | |
| self.fcVar = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) | |
| else: | |
| self.fc = nn.Sequential(*[nn.Linear(output_ndf * 16, output_nc)]) | |
| self.conv = nn.Sequential(*conv_layers) | |
| def forward(self, x): | |
| x_conv = self.conv(x) | |
| conv_flat = x_conv.view(x.size(0), -1) | |
| output = self.fc(conv_flat) | |
| if self.vaeLike: | |
| outputVar = self.fcVar(conv_flat) | |
| return output, outputVar | |
| else: | |
| return output | |
| return output | |
| # Defines the Unet generator. | |
| # |num_downs|: number of downsamplings in UNet. For example, | |
| # if |num_downs| == 7, image of size 128x128 will become of size 1x1 | |
| # at the bottleneck | |
| class G_Unet_add_all(nn.Module): | |
| def __init__(self, input_nc, output_nc, nz, num_downs, ngf=64, | |
| norm_layer=None, nl_layer=None, use_dropout=False, use_noise=False, upsample='basic'): | |
| super(G_Unet_add_all, self).__init__() | |
| self.nz = nz | |
| self.mapping = G_mapping(self.nz, self.nz, 512, normalize_latents=False, lrmul=1) | |
| self.truncation_psi = 0 | |
| self.truncation_cutoff = 0 | |
| # - 2 means we start from feature map with height and width equals 4. | |
| # as this example, we get num_layers = 18. | |
| num_layers = int(np.log2(512)) * 2 - 2 | |
| # Noise inputs. | |
| self.noise_inputs = [] | |
| for layer_idx in range(num_layers): | |
| res = layer_idx // 2 + 2 | |
| shape = [1, 1, 2 ** res, 2 ** res] | |
| self.noise_inputs.append(torch.randn(*shape).to("cuda")) | |
| # construct unet structure | |
| unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=None, innermost=True, | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=unet_block, | |
| norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
| for i in range(num_downs - 6): | |
| unet_block = UnetBlock_with_z(ngf * 8, ngf * 8, ngf * 8, nz, submodule=unet_block, | |
| norm_layer=norm_layer, nl_layer=nl_layer, use_dropout=use_dropout, upsample=upsample) | |
| unet_block = UnetBlock_with_z(ngf * 4, ngf * 4, ngf * 8, nz, submodule=unet_block, | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock_with_z(ngf * 2, ngf * 2, ngf * 4, nz, submodule=unet_block, | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock_with_z(ngf, ngf, ngf * 2, nz, submodule=unet_block, | |
| norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| unet_block = UnetBlock_with_z(input_nc, output_nc, ngf, nz, submodule=unet_block, | |
| outermost=True, norm_layer=norm_layer, nl_layer=nl_layer, upsample=upsample) | |
| self.model = unet_block | |
| def forward(self, x, z): | |
| dlatents1, num_layers = self.mapping(z) | |
| dlatents1 = dlatents1.unsqueeze(1) | |
| dlatents1 = dlatents1.expand(-1, int(num_layers), -1) | |
| # Apply truncation trick. | |
| if self.truncation_psi and self.truncation_cutoff: | |
| coefs = np.ones([1, num_layers, 1], dtype=np.float32) | |
| for i in range(num_layers): | |
| if i < self.truncation_cutoff: | |
| coefs[:, i, :] *= self.truncation_psi | |
| """Linear interpolation. | |
| a + (b - a) * t (a = 0) | |
| reduce to | |
| b * t | |
| """ | |
| dlatents1 = dlatents1 * torch.Tensor(coefs).to(dlatents1.device) | |
| return torch.tanh(self.model(x, dlatents1, self.noise_inputs)) | |
| class ApplyNoise(nn.Module): | |
| def __init__(self, channels): | |
| super().__init__() | |
| self.channels = channels | |
| self.weight = nn.Parameter(torch.randn(channels), requires_grad=True) | |
| self.bias = nn.Parameter(torch.zeros(channels), requires_grad=True) | |
| def forward(self, x, noise): | |
| W,_ = torch.split(self.weight.view(1, -1, 1, 1), self.channels // 2, dim=1) | |
| B,_ = torch.split(self.bias.view(1, -1, 1, 1), self.channels // 2, dim=1) | |
| Z = torch.zeros_like(W) | |
| w = torch.cat([W,Z], dim=1).to(x.device) | |
| b = torch.cat([B,Z], dim=1).to(x.device) | |
| adds = w * torch.randn_like(x) + b | |
| return x + adds.type_as(x) | |
| class FC(nn.Module): | |
| def __init__(self, | |
| in_channels, | |
| out_channels, | |
| gain=2**(0.5), | |
| use_wscale=False, | |
| lrmul=1.0, | |
| bias=True): | |
| """ | |
| The complete conversion of Dense/FC/Linear Layer of original Tensorflow version. | |
| """ | |
| super(FC, self).__init__() | |
| he_std = gain * in_channels ** (-0.5) # He init | |
| if use_wscale: | |
| init_std = 1.0 / lrmul | |
| self.w_lrmul = he_std * lrmul | |
| else: | |
| init_std = he_std / lrmul | |
| self.w_lrmul = lrmul | |
| self.weight = torch.nn.Parameter(torch.randn(out_channels, in_channels) * init_std) | |
| if bias: | |
| self.bias = torch.nn.Parameter(torch.zeros(out_channels)) | |
| self.b_lrmul = lrmul | |
| else: | |
| self.bias = None | |
| def forward(self, x): | |
| if self.bias is not None: | |
| out = F.linear(x, self.weight * self.w_lrmul, self.bias * self.b_lrmul) | |
| else: | |
| out = F.linear(x, self.weight * self.w_lrmul) | |
| out = F.leaky_relu(out, 0.2, inplace=True) | |
| return out | |
| class ApplyStyle(nn.Module): | |
| """ | |
| @ref: https://github.com/lernapparat/lernapparat/blob/master/style_gan/pytorch_style_gan.ipynb | |
| """ | |
| def __init__(self, latent_size, channels, use_wscale, nl_layer): | |
| super(ApplyStyle, self).__init__() | |
| modules = [nn.Linear(latent_size, channels*2)] | |
| if nl_layer: | |
| modules += [nl_layer()] | |
| self.linear = nn.Sequential(*modules) | |
| def forward(self, x, latent): | |
| style = self.linear(latent) # style => [batch_size, n_channels*2] | |
| shape = [-1, 2, x.size(1), 1, 1] | |
| style = style.view(shape) # [batch_size, 2, n_channels, ...] | |
| x = x * (style[:, 0] + 1.) + style[:, 1] | |
| return x | |
| class PixelNorm(nn.Module): | |
| def __init__(self, epsilon=1e-8): | |
| """ | |
| @notice: avoid in-place ops. | |
| https://discuss.pytorch.org/t/encounter-the-runtimeerror-one-of-the-variables-needed-for-gradient-computation-has-been-modified-by-an-inplace-operation/836/3 | |
| """ | |
| super(PixelNorm, self).__init__() | |
| self.epsilon = epsilon | |
| def forward(self, x): | |
| tmp = torch.mul(x, x) # or x ** 2 | |
| tmp1 = torch.rsqrt(torch.mean(tmp, dim=1, keepdim=True) + self.epsilon) | |
| return x * tmp1 | |
| class InstanceNorm(nn.Module): | |
| def __init__(self, epsilon=1e-8): | |
| """ | |
| @notice: avoid in-place ops. | |
| https://discuss.pytorch.org/t/encounter-the-runtimeerror-one-of-the-variables-needed-for-gradient-computation-has-been-modified-by-an-inplace-operation/836/3 | |
| """ | |
| super(InstanceNorm, self).__init__() | |
| self.epsilon = epsilon | |
| def forward(self, x): | |
| x = x - torch.mean(x, (2, 3), True) | |
| tmp = torch.mul(x, x) # or x ** 2 | |
| tmp = torch.rsqrt(torch.mean(tmp, (2, 3), True) + self.epsilon) | |
| return x * tmp | |
| class LayerEpilogue(nn.Module): | |
| def __init__(self, channels, dlatent_size, use_wscale, use_noise, | |
| use_pixel_norm, use_instance_norm, use_styles, nl_layer=None): | |
| super(LayerEpilogue, self).__init__() | |
| self.use_noise = use_noise | |
| if use_noise: | |
| self.noise = ApplyNoise(channels) | |
| self.act = nn.LeakyReLU(negative_slope=0.2) | |
| if use_pixel_norm: | |
| self.pixel_norm = PixelNorm() | |
| else: | |
| self.pixel_norm = None | |
| if use_instance_norm: | |
| self.instance_norm = InstanceNorm() | |
| else: | |
| self.instance_norm = None | |
| if use_styles: | |
| self.style_mod = ApplyStyle(dlatent_size, channels, use_wscale=use_wscale, nl_layer=nl_layer) | |
| else: | |
| self.style_mod = None | |
| def forward(self, x, noise, dlatents_in_slice=None): | |
| # if noise is not None: | |
| if self.use_noise: | |
| x = self.noise(x, noise) | |
| x = self.act(x) | |
| if self.pixel_norm is not None: | |
| x = self.pixel_norm(x) | |
| if self.instance_norm is not None: | |
| x = self.instance_norm(x) | |
| if self.style_mod is not None: | |
| x = self.style_mod(x, dlatents_in_slice) | |
| return x | |
| class G_mapping(nn.Module): | |
| def __init__(self, | |
| mapping_fmaps=512, | |
| dlatent_size=512, | |
| resolution=512, | |
| normalize_latents=True, # Normalize latent vectors (Z) before feeding them to the mapping layers? | |
| use_wscale=True, # Enable equalized learning rate? | |
| lrmul=0.01, # Learning rate multiplier for the mapping layers. | |
| gain=2**(0.5), # original gain in tensorflow. | |
| nl_layer=None | |
| ): | |
| super(G_mapping, self).__init__() | |
| self.mapping_fmaps = mapping_fmaps | |
| func = [ | |
| nn.Linear(self.mapping_fmaps, dlatent_size) | |
| ] | |
| if nl_layer: | |
| func += [nl_layer()] | |
| for j in range(0,4): | |
| func += [ | |
| nn.Linear(dlatent_size, dlatent_size) | |
| ] | |
| if nl_layer: | |
| func += [nl_layer()] | |
| self.func = nn.Sequential(*func) | |
| #FC(self.mapping_fmaps, dlatent_size, gain, lrmul=lrmul, use_wscale=use_wscale), | |
| #FC(dlatent_size, dlatent_size, gain, lrmul=lrmul, use_wscale=use_wscale), | |
| self.normalize_latents = normalize_latents | |
| self.resolution_log2 = int(np.log2(resolution)) | |
| self.num_layers = self.resolution_log2 * 2 - 2 | |
| self.pixel_norm = PixelNorm() | |
| # - 2 means we start from feature map with height and width equals 4. | |
| # as this example, we get num_layers = 18. | |
| def forward(self, x): | |
| if self.normalize_latents: | |
| x = self.pixel_norm(x) | |
| out = self.func(x) | |
| return out, self.num_layers | |
| class UnetBlock_with_z(nn.Module): | |
| def __init__(self, input_nc, outer_nc, inner_nc, nz=0, | |
| submodule=None, outermost=False, innermost=False, | |
| norm_layer=None, nl_layer=None, use_dropout=False, | |
| upsample='basic', padding_type='replicate'): | |
| super(UnetBlock_with_z, self).__init__() | |
| p = 0 | |
| downconv = [] | |
| if padding_type == 'reflect': | |
| downconv += [nn.ReflectionPad2d(1)] | |
| elif padding_type == 'replicate': | |
| downconv += [nn.ReplicationPad2d(1)] | |
| elif padding_type == 'zero': | |
| p = 1 | |
| else: | |
| raise NotImplementedError( | |
| 'padding [%s] is not implemented' % padding_type) | |
| self.outermost = outermost | |
| self.innermost = innermost | |
| self.nz = nz | |
| # input_nc = input_nc + nz | |
| downconv += [spectral_norm(nn.Conv2d(input_nc, inner_nc, | |
| kernel_size=3, stride=2, padding=p))] | |
| # downsample is different from upsample | |
| downrelu = nn.LeakyReLU(0.2, True) | |
| downnorm = norm_layer(inner_nc) if norm_layer is not None else None | |
| uprelu = nl_layer() | |
| uprelu2 = nl_layer() | |
| uppad = nn.ReplicationPad2d(1) | |
| upnorm = norm_layer(outer_nc) if norm_layer is not None else None | |
| upnorm2 = norm_layer(outer_nc) if norm_layer is not None else None | |
| use_styles=False | |
| uprelu = nl_layer() | |
| if self.nz >0: | |
| use_styles=True | |
| if outermost: | |
| self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=False, | |
| use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) | |
| upconv = upsampleLayer( | |
| inner_nc , outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
| down = downconv | |
| up = [uprelu] + upconv | |
| if upnorm is not None: | |
| up += [upnorm] | |
| up +=[uprelu2, uppad, upconv2] #+ [nn.Tanh()] | |
| elif innermost: | |
| self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=True, | |
| use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) | |
| upconv = upsampleLayer( | |
| inner_nc, outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
| down = [downrelu] + downconv | |
| up = [uprelu] + upconv | |
| if norm_layer is not None: | |
| up += [norm_layer(outer_nc)] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| else: | |
| self.adaIn = LayerEpilogue(inner_nc, self.nz, use_wscale=True, use_noise=False, | |
| use_pixel_norm=True, use_instance_norm=True, use_styles=use_styles, nl_layer=nl_layer) | |
| upconv = upsampleLayer( | |
| inner_nc , outer_nc, upsample=upsample, padding_type=padding_type) | |
| upconv2 = spectral_norm(nn.Conv2d(outer_nc, outer_nc, kernel_size=3, padding=p)) | |
| down = [downrelu] + downconv | |
| if norm_layer is not None: | |
| down += [norm_layer(inner_nc)] | |
| up = [uprelu] + upconv | |
| if norm_layer is not None: | |
| up += [norm_layer(outer_nc)] | |
| up += [uprelu2, uppad, upconv2] | |
| if upnorm2 is not None: | |
| up += [upnorm2] | |
| if use_dropout: | |
| up += [nn.Dropout(0.5)] | |
| self.down = nn.Sequential(*down) | |
| self.submodule = submodule | |
| self.up = nn.Sequential(*up) | |
| def forward(self, x, z, noise): | |
| if self.outermost: | |
| x1 = self.down(x) | |
| x2 = self.submodule(x1, z[:,2:], noise[2:]) | |
| return self.up(x2) | |
| elif self.innermost: | |
| x1 = self.down(x) | |
| x_and_z = self.adaIn(x1, noise[0], z[:,0]) | |
| x2 = self.up(x_and_z) | |
| x2 = F.interpolate(x2, x.shape[2:]) | |
| return x2 + x | |
| else: | |
| x1 = self.down(x) | |
| x2 = self.submodule(x1, z[:,2:], noise[2:]) | |
| x_and_z = self.adaIn(x2, noise[0], z[:,0]) | |
| return self.up(x_and_z) + x | |
| class E_NLayers(nn.Module): | |
| def __init__(self, input_nc, output_nc=1, ndf=64, n_layers=4, | |
| norm_layer=None, nl_layer=None, vaeLike=False): | |
| super(E_NLayers, self).__init__() | |
| self.vaeLike = vaeLike | |
| kw, padw = 3, 1 | |
| sequence = [spectral_norm(nn.Conv2d(input_nc, ndf, kernel_size=kw, | |
| stride=2, padding=padw, padding_mode='replicate')), nl_layer()] | |
| nf_mult = 1 | |
| nf_mult_prev = 1 | |
| for n in range(1, n_layers): | |
| nf_mult_prev = nf_mult | |
| nf_mult = min(2**n, 8) | |
| sequence += [spectral_norm(nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, | |
| kernel_size=kw, stride=2, padding=padw, padding_mode='replicate'))] | |
| if norm_layer is not None: | |
| sequence += [norm_layer(ndf * nf_mult)] | |
| sequence += [nl_layer()] | |
| sequence += [nn.AdaptiveAvgPool2d(4)] | |
| self.conv = nn.Sequential(*sequence) | |
| self.fc = nn.Sequential(*[spectral_norm(nn.Linear(ndf * nf_mult * 16, output_nc))]) | |
| if vaeLike: | |
| self.fcVar = nn.Sequential(*[spectral_norm(nn.Linear(ndf * nf_mult * 16, output_nc))]) | |
| def forward(self, x): | |
| x_conv = self.conv(x) | |
| conv_flat = x_conv.view(x.size(0), -1) | |
| output = self.fc(conv_flat) | |
| if self.vaeLike: | |
| outputVar = self.fcVar(conv_flat) | |
| return output, outputVar | |
| return output | |
| class BasicBlock(nn.Module): | |
| def __init__(self, inplanes, outplanes): | |
| super(BasicBlock, self).__init__() | |
| layers = [] | |
| norm_layer=get_norm_layer(norm_type='layer') #functools.partial(LayerNorm) | |
| # norm_layer = None | |
| nl_layer=nn.ReLU() | |
| if norm_layer is not None: | |
| layers += [norm_layer(inplanes)] | |
| layers += [nl_layer] | |
| layers += [nn.ReplicationPad2d(1), | |
| nn.Conv2d(inplanes, outplanes, kernel_size=3, stride=1, | |
| padding=0, bias=True)] | |
| self.conv = nn.Sequential(*layers) | |
| def forward(self, x): | |
| return self.conv(x) | |
| def define_SVAE(inc=96, outc=3, outplanes=64, blocks=1, netVAE='SVAE', model_name='', load_ext=True, save_dir='', | |
| init_type="normal", init_gain=0.02, gpu_ids=[]): | |
| if netVAE == 'SVAE': | |
| net = ScreenVAE(inc=inc, outc=outc, outplanes=outplanes, blocks=blocks, save_dir=save_dir, | |
| init_type=init_type, init_gain=init_gain, gpu_ids=gpu_ids) | |
| else: | |
| raise NotImplementedError('Encoder model name [%s] is not recognized' % net) | |
| init_net(net, init_type=init_type, init_gain=init_gain, gpu_ids=gpu_ids) | |
| net.load_networks('latest') | |
| return net | |
| class ScreenVAE(nn.Module): | |
| def __init__(self,inc=1,outc=4, outplanes=64, downs=5, blocks=2,load_ext=True, save_dir='',init_type="normal", init_gain=0.02, gpu_ids=[]): | |
| super(ScreenVAE, self).__init__() | |
| self.inc = inc | |
| self.outc = outc | |
| self.save_dir = save_dir | |
| norm_layer=functools.partial(LayerNormWarpper) | |
| nl_layer=nn.LeakyReLU | |
| self.model_names=['enc','dec'] | |
| self.enc=define_C(inc+1, outc*2, 0, 24, netC='resnet_6blocks', | |
| norm='layer', nl='lrelu', use_dropout=True, init_type='kaiming', | |
| gpu_ids=gpu_ids, upsample='bilinear') | |
| self.dec=define_G(outc, inc, 0, 48, netG='unet_128_G', | |
| norm='layer', nl='lrelu', use_dropout=True, init_type='kaiming', | |
| gpu_ids=gpu_ids, where_add='input', upsample='bilinear', use_noise=True) | |
| for param in self.parameters(): | |
| param.requires_grad = False | |
| def load_networks(self, epoch): | |
| """Load all the networks from the disk. | |
| Parameters: | |
| epoch (int) -- current epoch; used in the file name '%s_net_%s.pth' % (epoch, name) | |
| """ | |
| for name in self.model_names: | |
| if isinstance(name, str): | |
| load_filename = '%s_net_%s.pth' % (epoch, name) | |
| load_path = os.path.join(self.save_dir, load_filename) | |
| net = getattr(self, name) | |
| if isinstance(net, torch.nn.DataParallel): | |
| net = net.module | |
| print('loading the model from %s' % load_path) | |
| state_dict = torch.load( | |
| load_path, map_location=lambda storage, loc: storage.cuda()) | |
| if hasattr(state_dict, '_metadata'): | |
| del state_dict._metadata | |
| net.load_state_dict(state_dict) | |
| del state_dict | |
| def npad(self, im, pad=128): | |
| h,w = im.shape[-2:] | |
| hp = h //pad*pad+pad | |
| wp = w //pad*pad+pad | |
| return F.pad(im, (0, wp-w, 0, hp-h), mode='replicate') | |
| def forward(self, x, line=None, img_input=True, output_screen_only=True): | |
| if img_input: | |
| if line is None: | |
| line = torch.ones_like(x) | |
| else: | |
| line = torch.sign(line) | |
| x = torch.clamp(x + (1-line),-1,1) | |
| h,w = x.shape[-2:] | |
| input = torch.cat([x, line], 1) | |
| input = self.npad(input) | |
| inter = self.enc(input)[:,:,:h,:w] | |
| scr, logvar = torch.split(inter, (self.outc, self.outc), dim=1) | |
| if output_screen_only: | |
| return scr | |
| recons = self.dec(scr) | |
| return recons, scr, logvar | |
| else: | |
| h,w = x.shape[-2:] | |
| x = self.npad(x) | |
| recons = self.dec(x)[:,:,:h,:w] | |
| recons = (recons+1)*(line+1)/2-1 | |
| return torch.clamp(recons,-1,1) | |