Spaces:
Runtime error
Runtime error
| from __future__ import absolute_import | |
| from __future__ import division | |
| from __future__ import print_function | |
| """Tests for common.rollout.""" | |
| import numpy as np | |
| import tensorflow as tf | |
| from common import rollout as rollout_lib # brain coder | |
| class RolloutTest(tf.test.TestCase): | |
| def MakeRollout(self, states, actions, rewards, values=None, terminated=True): | |
| rollout = rollout_lib.Rollout() | |
| rollout.add_many( | |
| states=states, actions=actions, rewards=rewards, values=values, | |
| terminated=terminated) | |
| return rollout | |
| def testDiscount(self): | |
| discounted = np.array([1.0 / 2 ** n for n in range(4, -1, -1)]) | |
| discounted[:2] += [1.0 / 2 ** n for n in range(1, -1, -1)] | |
| self.assertTrue(np.array_equal( | |
| rollout_lib.discount([0.0, 1.0, 0.0, 0.0, 1.0], 0.50), | |
| discounted)) | |
| self.assertTrue(np.array_equal( | |
| rollout_lib.discount(np.array([0.0, 1.0, 0.0, 0.0, 1.0]), 0.50), | |
| discounted)) | |
| def testDiscountedAdvantageAndRewards(self): | |
| # lambda=1, No bootstrapping. | |
| values = [0.1, 0.5, 0.5, 0.25] | |
| (empirical_values, | |
| generalized_advantage) = rollout_lib.discounted_advantage_and_rewards( | |
| [0.0, 0.0, 0.0, 1.0], | |
| values, | |
| gamma=0.75, | |
| lambda_=1.0) | |
| expected_discounted_r = ( | |
| np.array([1.0 * 0.75 ** n for n in range(3, -1, -1)])) | |
| expected_adv = expected_discounted_r - values | |
| self.assertTrue(np.array_equal(empirical_values, expected_discounted_r)) | |
| self.assertTrue(np.allclose(generalized_advantage, expected_adv)) | |
| # lambda=1, With bootstrapping. | |
| values = [0.1, 0.5, 0.5, 0.25, 0.75] | |
| (empirical_values, | |
| generalized_advantage) = rollout_lib.discounted_advantage_and_rewards( | |
| [0.0, 0.0, 0.0, 1.0], | |
| values, | |
| gamma=0.75, | |
| lambda_=1.0) | |
| expected_discounted_r = ( | |
| np.array([0.75 * 0.75 ** n for n in range(4, 0, -1)]) | |
| + np.array([1.0 * 0.75 ** n for n in range(3, -1, -1)])) | |
| expected_adv = expected_discounted_r - values[:-1] | |
| self.assertTrue(np.array_equal(empirical_values, expected_discounted_r)) | |
| self.assertTrue(np.allclose(generalized_advantage, expected_adv)) | |
| # lambda=0.5, With bootstrapping. | |
| values = [0.1, 0.5, 0.5, 0.25, 0.75] | |
| rewards = [0.0, 0.0, 0.0, 1.0] | |
| l = 0.5 # lambda | |
| g = 0.75 # gamma | |
| (empirical_values, | |
| generalized_advantage) = rollout_lib.discounted_advantage_and_rewards( | |
| rewards, | |
| values, | |
| gamma=g, | |
| lambda_=l) | |
| expected_discounted_r = ( | |
| np.array([0.75 * g ** n for n in range(4, 0, -1)]) | |
| + np.array([1.0 * g ** n for n in range(3, -1, -1)])) | |
| expected_adv = [0.0] * len(values) | |
| for t in range(3, -1, -1): | |
| delta_t = rewards[t] + g * values[t + 1] - values[t] | |
| expected_adv[t] = delta_t + g * l * expected_adv[t + 1] | |
| expected_adv = expected_adv[:-1] | |
| self.assertTrue(np.array_equal(empirical_values, expected_discounted_r)) | |
| self.assertTrue(np.allclose(generalized_advantage, expected_adv)) | |
| def testProcessRollouts(self): | |
| g = 0.95 | |
| rollouts = [ | |
| self.MakeRollout( | |
| states=[3, 6, 9], | |
| actions=[1, 2, 3], | |
| rewards=[1.0, -1.0, 0.5], | |
| values=[0.5, 0.5, 0.1]), | |
| self.MakeRollout( | |
| states=[10], | |
| actions=[5], | |
| rewards=[1.0], | |
| values=[0.5])] | |
| batch = rollout_lib.process_rollouts(rollouts, gamma=g) | |
| self.assertEqual(2, batch.batch_size) | |
| self.assertEqual(3, batch.max_time) | |
| self.assertEqual([3, 1], batch.episode_lengths) | |
| self.assertEqual([0.5, 1.0], batch.total_rewards) | |
| self.assertEqual( | |
| [[3, 6, 9], [10, 0, 0]], | |
| batch.states.tolist()) | |
| self.assertEqual( | |
| [[1, 2, 3], [5, 0, 0]], | |
| batch.actions.tolist()) | |
| rew1, rew2 = rollouts[0].rewards, rollouts[1].rewards | |
| expected_discounted_rewards = [ | |
| [rew1[0] + g * rew1[1] + g * g * rew1[2], | |
| rew1[1] + g * rew1[2], | |
| rew1[2]], | |
| [rew2[0], 0.0, 0.0]] | |
| expected_advantages = [ | |
| [dr - v | |
| for dr, v | |
| in zip(expected_discounted_rewards[0], rollouts[0].values)], | |
| [expected_discounted_rewards[1][0] - rollouts[1].values[0], 0.0, 0.0]] | |
| self.assertTrue( | |
| np.allclose(expected_discounted_rewards, batch.discounted_r)) | |
| self.assertTrue( | |
| np.allclose(expected_advantages, batch.discounted_adv)) | |
| if __name__ == '__main__': | |
| tf.test.main() | |