Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import scipy.stats as st | |
| import tensorflow | |
| tensorflow.compat.v1.disable_v2_behavior() | |
| tf = tensorflow.compat.v1 | |
| def layer(op): | |
| def layer_decorated(self, *args, **kwargs): | |
| # Automatically set a name if not provided. | |
| name = kwargs.setdefault('name', self.get_unique_name(op.__name__)) | |
| # Figure out the layer inputs. | |
| if len(self.terminals) == 0: | |
| raise RuntimeError('No input variables found for layer %s.' % name) | |
| elif len(self.terminals) == 1: | |
| layer_input = self.terminals[0] | |
| else: | |
| layer_input = list(self.terminals) | |
| # Perform the operation and get the output. | |
| layer_output = op(self, layer_input, *args, **kwargs) | |
| # Add to layer LUT. | |
| self.layers[name] = layer_output | |
| # This output is now the input for the next layer. | |
| self.feed(layer_output) | |
| # Return self for chained calls. | |
| return self | |
| return layer_decorated | |
| class Smoother(object): | |
| def __init__(self, inputs, filter_size, sigma): | |
| self.inputs = inputs | |
| self.terminals = [] | |
| self.layers = dict(inputs) | |
| self.filter_size = filter_size | |
| self.sigma = sigma | |
| self.setup() | |
| def setup(self): | |
| (self.feed('data') | |
| .conv(name = 'smoothing')) | |
| def get_unique_name(self, prefix): | |
| ident = sum(t.startswith(prefix) for t, _ in self.layers.items()) + 1 | |
| return '%s_%d' % (prefix, ident) | |
| def feed(self, *args): | |
| assert len(args) != 0 | |
| self.terminals = [] | |
| for fed_layer in args: | |
| if isinstance(fed_layer, str): | |
| try: | |
| fed_layer = self.layers[fed_layer] | |
| except KeyError: | |
| raise KeyError('Unknown layer name fed: %s' % fed_layer) | |
| self.terminals.append(fed_layer) | |
| return self | |
| def gauss_kernel(self, kernlen=21, nsig=3, channels=1): | |
| interval = (2*nsig+1.)/(kernlen) | |
| x = np.linspace(-nsig-interval/2., nsig+interval/2., kernlen+1) | |
| kern1d = np.diff(st.norm.cdf(x)) | |
| kernel_raw = np.sqrt(np.outer(kern1d, kern1d)) | |
| kernel = kernel_raw/kernel_raw.sum() | |
| out_filter = np.array(kernel, dtype = np.float32) | |
| out_filter = out_filter.reshape((int(kernlen), int(kernlen), 1, 1)) | |
| out_filter = np.repeat(out_filter, channels, axis = 2) | |
| return out_filter | |
| def make_gauss_var(self, name, size, sigma, c_i): | |
| kernel = self.gauss_kernel(size, sigma, c_i) | |
| var = tf.Variable(tf.convert_to_tensor(kernel), name=name) | |
| return var | |
| def get_output(self): | |
| '''Returns the smoother output.''' | |
| return self.terminals[-1] | |
| def conv(self, | |
| input, | |
| name, | |
| padding='SAME'): | |
| # Get the number of channels in the input | |
| c_i = input.get_shape().as_list()[3] | |
| # Convolution for a given input and kernel | |
| convolve = lambda i, k: tf.nn.depthwise_conv2d(i, k, [1, 1, 1, 1], | |
| padding=padding) | |
| with tf.variable_scope(name) as scope: | |
| kernel = self.make_gauss_var('gauss_weight', self.filter_size, | |
| self.sigma, c_i) | |
| output = convolve(input, kernel) | |
| return output | |