Spaces:
Runtime error
Runtime error
| # Copyright 2019 The TensorFlow Authors. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ============================================================================== | |
| """Tests for lstm_object_detection.lstm.rnn_decoder.""" | |
| from __future__ import absolute_import | |
| from __future__ import division | |
| from __future__ import print_function | |
| import numpy as np | |
| import tensorflow.compat.v1 as tf | |
| from tensorflow.contrib import layers as contrib_layers | |
| from tensorflow.contrib import rnn as contrib_rnn | |
| from lstm_object_detection.lstm import rnn_decoder | |
| class MockRnnCell(contrib_rnn.RNNCell): | |
| def __init__(self, input_size, num_units): | |
| self._input_size = input_size | |
| self._num_units = num_units | |
| self._filter_size = [3, 3] | |
| def __call__(self, inputs, state_tuple): | |
| outputs = tf.concat([inputs, state_tuple[0]], axis=3) | |
| new_state_tuple = (tf.multiply(state_tuple[0], 2), state_tuple[1]) | |
| return outputs, new_state_tuple | |
| def state_size(self): | |
| return self._num_units | |
| def output_size(self): | |
| return self._input_size + self._num_units | |
| def pre_bottleneck(self, inputs, state, input_index): | |
| with tf.variable_scope('bottleneck_%d' % input_index, reuse=tf.AUTO_REUSE): | |
| inputs = contrib_layers.separable_conv2d( | |
| tf.concat([inputs, state], 3), | |
| self._input_size, | |
| self._filter_size, | |
| depth_multiplier=1, | |
| activation_fn=tf.nn.relu6, | |
| normalizer_fn=None) | |
| return inputs | |
| class RnnDecoderTest(tf.test.TestCase): | |
| def test_rnn_decoder_single_unroll(self): | |
| batch_size = 2 | |
| num_unroll = 1 | |
| num_units = 64 | |
| width = 8 | |
| height = 10 | |
| input_channels = 128 | |
| initial_state = tf.random_normal((batch_size, width, height, num_units)) | |
| inputs = tf.random_normal([batch_size, width, height, input_channels]) | |
| rnn_cell = MockRnnCell(input_channels, num_units) | |
| outputs, states = rnn_decoder.rnn_decoder( | |
| decoder_inputs=[inputs] * num_unroll, | |
| initial_state=(initial_state, initial_state), | |
| cell=rnn_cell) | |
| self.assertEqual(len(outputs), num_unroll) | |
| self.assertEqual(len(states), num_unroll) | |
| with tf.Session() as sess: | |
| sess.run(tf.global_variables_initializer()) | |
| results = sess.run((outputs, states, inputs, initial_state)) | |
| outputs_results = results[0] | |
| states_results = results[1] | |
| inputs_results = results[2] | |
| initial_states_results = results[3] | |
| self.assertEqual(outputs_results[0].shape, | |
| (batch_size, width, height, input_channels + num_units)) | |
| self.assertAllEqual( | |
| outputs_results[0], | |
| np.concatenate((inputs_results, initial_states_results), axis=3)) | |
| self.assertEqual(states_results[0][0].shape, | |
| (batch_size, width, height, num_units)) | |
| self.assertEqual(states_results[0][1].shape, | |
| (batch_size, width, height, num_units)) | |
| self.assertAllEqual(states_results[0][0], | |
| np.multiply(initial_states_results, 2.0)) | |
| self.assertAllEqual(states_results[0][1], initial_states_results) | |
| def test_rnn_decoder_multiple_unroll(self): | |
| batch_size = 2 | |
| num_unroll = 3 | |
| num_units = 64 | |
| width = 8 | |
| height = 10 | |
| input_channels = 128 | |
| initial_state = tf.random_normal((batch_size, width, height, num_units)) | |
| inputs = tf.random_normal([batch_size, width, height, input_channels]) | |
| rnn_cell = MockRnnCell(input_channels, num_units) | |
| outputs, states = rnn_decoder.rnn_decoder( | |
| decoder_inputs=[inputs] * num_unroll, | |
| initial_state=(initial_state, initial_state), | |
| cell=rnn_cell) | |
| self.assertEqual(len(outputs), num_unroll) | |
| self.assertEqual(len(states), num_unroll) | |
| with tf.Session() as sess: | |
| sess.run(tf.global_variables_initializer()) | |
| results = sess.run((outputs, states, inputs, initial_state)) | |
| outputs_results = results[0] | |
| states_results = results[1] | |
| inputs_results = results[2] | |
| initial_states_results = results[3] | |
| for i in range(num_unroll): | |
| previous_state = ([initial_states_results, initial_states_results] | |
| if i == 0 else states_results[i - 1]) | |
| self.assertEqual( | |
| outputs_results[i].shape, | |
| (batch_size, width, height, input_channels + num_units)) | |
| self.assertAllEqual( | |
| outputs_results[i], | |
| np.concatenate((inputs_results, previous_state[0]), axis=3)) | |
| self.assertEqual(states_results[i][0].shape, | |
| (batch_size, width, height, num_units)) | |
| self.assertEqual(states_results[i][1].shape, | |
| (batch_size, width, height, num_units)) | |
| self.assertAllEqual(states_results[i][0], | |
| np.multiply(previous_state[0], 2.0)) | |
| self.assertAllEqual(states_results[i][1], previous_state[1]) | |
| class MultiInputRnnDecoderTest(tf.test.TestCase): | |
| def test_rnn_decoder_single_unroll(self): | |
| batch_size = 2 | |
| num_unroll = 1 | |
| num_units = 12 | |
| width = 8 | |
| height = 10 | |
| input_channels_large = 24 | |
| input_channels_small = 12 | |
| bottleneck_channels = 20 | |
| initial_state_c = tf.random_normal((batch_size, width, height, num_units)) | |
| initial_state_h = tf.random_normal((batch_size, width, height, num_units)) | |
| initial_state = (initial_state_c, initial_state_h) | |
| inputs_large = tf.random_normal( | |
| [batch_size, width, height, input_channels_large]) | |
| inputs_small = tf.random_normal( | |
| [batch_size, width, height, input_channels_small]) | |
| rnn_cell = MockRnnCell(bottleneck_channels, num_units) | |
| outputs, states = rnn_decoder.multi_input_rnn_decoder( | |
| decoder_inputs=[[inputs_large] * num_unroll, | |
| [inputs_small] * num_unroll], | |
| initial_state=initial_state, | |
| cell=rnn_cell, | |
| sequence_step=tf.zeros([batch_size]), | |
| pre_bottleneck=True) | |
| self.assertEqual(len(outputs), num_unroll) | |
| self.assertEqual(len(states), num_unroll) | |
| with tf.Session() as sess: | |
| sess.run(tf.global_variables_initializer()) | |
| results = sess.run( | |
| (outputs, states, inputs_large, inputs_small, initial_state)) | |
| outputs_results = results[0] | |
| states_results = results[1] | |
| initial_states_results = results[4] | |
| self.assertEqual( | |
| outputs_results[0].shape, | |
| (batch_size, width, height, bottleneck_channels + num_units)) | |
| self.assertEqual(states_results[0][0].shape, | |
| (batch_size, width, height, num_units)) | |
| self.assertEqual(states_results[0][1].shape, | |
| (batch_size, width, height, num_units)) | |
| # The first step should always update state. | |
| self.assertAllEqual(states_results[0][0], | |
| np.multiply(initial_states_results[0], 2)) | |
| self.assertAllEqual(states_results[0][1], initial_states_results[1]) | |
| def test_rnn_decoder_multiple_unroll(self): | |
| batch_size = 2 | |
| num_unroll = 3 | |
| num_units = 12 | |
| width = 8 | |
| height = 10 | |
| input_channels_large = 24 | |
| input_channels_small = 12 | |
| bottleneck_channels = 20 | |
| initial_state_c = tf.random_normal((batch_size, width, height, num_units)) | |
| initial_state_h = tf.random_normal((batch_size, width, height, num_units)) | |
| initial_state = (initial_state_c, initial_state_h) | |
| inputs_large = tf.random_normal( | |
| [batch_size, width, height, input_channels_large]) | |
| inputs_small = tf.random_normal( | |
| [batch_size, width, height, input_channels_small]) | |
| rnn_cell = MockRnnCell(bottleneck_channels, num_units) | |
| outputs, states = rnn_decoder.multi_input_rnn_decoder( | |
| decoder_inputs=[[inputs_large] * num_unroll, | |
| [inputs_small] * num_unroll], | |
| initial_state=initial_state, | |
| cell=rnn_cell, | |
| sequence_step=tf.zeros([batch_size]), | |
| pre_bottleneck=True) | |
| self.assertEqual(len(outputs), num_unroll) | |
| self.assertEqual(len(states), num_unroll) | |
| with tf.Session() as sess: | |
| sess.run(tf.global_variables_initializer()) | |
| results = sess.run( | |
| (outputs, states, inputs_large, inputs_small, initial_state)) | |
| outputs_results = results[0] | |
| states_results = results[1] | |
| initial_states_results = results[4] | |
| # The first step should always update state. | |
| self.assertAllEqual(states_results[0][0], | |
| np.multiply(initial_states_results[0], 2)) | |
| self.assertAllEqual(states_results[0][1], initial_states_results[1]) | |
| for i in range(num_unroll): | |
| self.assertEqual( | |
| outputs_results[i].shape, | |
| (batch_size, width, height, bottleneck_channels + num_units)) | |
| self.assertEqual(states_results[i][0].shape, | |
| (batch_size, width, height, num_units)) | |
| self.assertEqual(states_results[i][1].shape, | |
| (batch_size, width, height, num_units)) | |
| def test_rnn_decoder_multiple_unroll_with_skip(self): | |
| batch_size = 2 | |
| num_unroll = 5 | |
| num_units = 12 | |
| width = 8 | |
| height = 10 | |
| input_channels_large = 24 | |
| input_channels_small = 12 | |
| bottleneck_channels = 20 | |
| skip = 2 | |
| initial_state_c = tf.random_normal((batch_size, width, height, num_units)) | |
| initial_state_h = tf.random_normal((batch_size, width, height, num_units)) | |
| initial_state = (initial_state_c, initial_state_h) | |
| inputs_large = tf.random_normal( | |
| [batch_size, width, height, input_channels_large]) | |
| inputs_small = tf.random_normal( | |
| [batch_size, width, height, input_channels_small]) | |
| rnn_cell = MockRnnCell(bottleneck_channels, num_units) | |
| outputs, states = rnn_decoder.multi_input_rnn_decoder( | |
| decoder_inputs=[[inputs_large] * num_unroll, | |
| [inputs_small] * num_unroll], | |
| initial_state=initial_state, | |
| cell=rnn_cell, | |
| sequence_step=tf.zeros([batch_size]), | |
| pre_bottleneck=True, | |
| selection_strategy='SKIP%d' % skip) | |
| self.assertEqual(len(outputs), num_unroll) | |
| self.assertEqual(len(states), num_unroll) | |
| with tf.Session() as sess: | |
| sess.run(tf.global_variables_initializer()) | |
| results = sess.run( | |
| (outputs, states, inputs_large, inputs_small, initial_state)) | |
| outputs_results = results[0] | |
| states_results = results[1] | |
| initial_states_results = results[4] | |
| for i in range(num_unroll): | |
| self.assertEqual( | |
| outputs_results[i].shape, | |
| (batch_size, width, height, bottleneck_channels + num_units)) | |
| self.assertEqual(states_results[i][0].shape, | |
| (batch_size, width, height, num_units)) | |
| self.assertEqual(states_results[i][1].shape, | |
| (batch_size, width, height, num_units)) | |
| previous_state = ( | |
| initial_states_results if i == 0 else states_results[i - 1]) | |
| # State only updates during key frames | |
| if i % (skip + 1) == 0: | |
| self.assertAllEqual(states_results[i][0], | |
| np.multiply(previous_state[0], 2)) | |
| self.assertAllEqual(states_results[i][1], previous_state[1]) | |
| else: | |
| self.assertAllEqual(states_results[i][0], previous_state[0]) | |
| self.assertAllEqual(states_results[i][1], previous_state[1]) | |
| if __name__ == '__main__': | |
| tf.test.main() | |