Source code for alf.environments.suite_unittest

# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Environments for unittest."""

from abc import abstractmethod
from enum import Enum
import numpy as np
import torch

import alf
from alf.data_structures import StepType, TimeStep
from alf.tensor_specs import BoundedTensorSpec, TensorSpec

from .alf_environment import AlfEnvironment

ActionType = Enum('ActionType', ('Discrete', 'Continuous'))


[docs]class UnittestEnv(AlfEnvironment): """Abstract base for unittest environment. Every episode ends in `episode_length` steps (including LAST step). The observation is one dimensional. The action is binary {0, 1} when action_type is ActionType.Discrete and a float value in range (0.0, 1.0) when action_type is ActionType.Continuous """ def __init__(self, batch_size, episode_length, obs_dim=1, action_type=ActionType.Discrete, reward_dim=1): """Initializes the environment. Args: batch_size (int): The batch size expected for the actions and observations. episode_length (int): length of each episode action_type (nest): ActionType """ self._steps = 0 self._episode_length = episode_length super(UnittestEnv, self).__init__() self._action_type = action_type def _create_action_spec(act_type): if act_type == ActionType.Discrete: return BoundedTensorSpec( shape=(), dtype=torch.int64, minimum=0, maximum=1) else: return BoundedTensorSpec( shape=(1, ), dtype=torch.float32, minimum=[0], maximum=[1]) self._action_spec = alf.nest.map_structure(_create_action_spec, action_type) self._observation_spec = TensorSpec( shape=(obs_dim, ), dtype=torch.float32) self._batch_size = batch_size self._reward_dim = reward_dim if reward_dim == 1: self._reward_spec = TensorSpec(()) else: self._reward_spec = TensorSpec((reward_dim, )) self.reset() @property def batched(self): return True @property def batch_size(self): return self._batch_size
[docs] def action_spec(self): return self._action_spec
[docs] def observation_spec(self): return self._observation_spec
[docs] def reward_spec(self): return self._reward_spec
[docs] def env_info_spec(self): return {}
def _reset(self): self._steps = 0 time_step = self._gen_time_step(0, None) self._current_time_step = time_step._replace( prev_action=alf.nest.map_structure( lambda spec: spec.zeros([self.batch_size]), self._action_spec), env_id=torch.arange(self.batch_size, dtype=torch.int32)) return self._current_time_step def _step(self, action): self._steps += 1 time_step = self._gen_time_step(self._steps % self._episode_length, action) self._current_time_step = time_step._replace( prev_action=action, env_id=torch.arange(self.batch_size, dtype=torch.int32)) return self._current_time_step @abstractmethod def _gen_time_step(self, s, action): """Generate time step. Args: s (int): step count in current episode. It ranges from 0 to `episode_length` - 1. action: action from agent. Returns: time_step (TimeStep) """ pass
[docs]class ValueUnittestEnv(UnittestEnv): """Environment for testing value estimation. Every episode ends in `episode_length` steps. It always give reward 1 at each step. """ def _gen_time_step(self, s, action): """Return the current `TimeStep`.""" step_type = StepType.MID discount = 1.0 if s == 0: step_type = StepType.FIRST elif s == self._episode_length - 1: step_type = StepType.LAST discount = 0.0 return TimeStep( step_type=torch.full([self.batch_size], step_type, dtype=torch.int32), reward=torch.ones(self.batch_size), discount=torch.full([ self.batch_size, ], discount), observation=torch.ones(self.batch_size))
[docs]class PolicyUnittestEnv(UnittestEnv): """Environment for testing policy. The agent receives 1-diff(action, observation) as reward """ def _gen_time_step(self, s, action): step_type = StepType.MID discount = 1.0 if s == 0: step_type = StepType.FIRST elif s == self._episode_length - 1: step_type = StepType.LAST discount = 0.0 if s == 0: reward = torch.zeros(self.batch_size) else: prev_observation = self._current_time_step.observation reward = 1.0 - torch.abs(prev_observation - action.reshape(prev_observation.shape)) reward = reward.reshape(self.batch_size) if self._reward_dim != 1: reward = reward.unsqueeze(-1).expand((-1, self._reward_dim)) observation = torch.randint( 0, 2, size=(self.batch_size, 1), dtype=torch.float32) return TimeStep( step_type=torch.full([self.batch_size], step_type, dtype=torch.int32), reward=reward, discount=torch.full([self.batch_size], discount), observation=observation)
[docs]class MixedPolicyUnittestEnv(UnittestEnv): """Environment for testing a mixed policy. Given the agent's `(discrete, continuous)` action pair ``(a_d, a_c)``, if ``'a_d == (a_c > 0.5)``, the agent receives a reward of 1; otherwise it receives 0. """ def __init__(self, batch_size, episode_length, obs_dim=1): """Initializes the environment. Args: batch_size (int): The batch size expected for the actions and observations. episode_length (int): length of each episode """ super().__init__( batch_size=batch_size, episode_length=episode_length, obs_dim=obs_dim, action_type=[ActionType.Discrete, ActionType.Continuous]) def _gen_time_step(self, s, action): step_type = StepType.MID discount = 1.0 reward = torch.zeros(self.batch_size) if s == 0: step_type = StepType.FIRST elif s == self._episode_length - 1: step_type = StepType.LAST discount = 0.0 if s > 0: reward = (action[0] == (action[1].squeeze(-1) > 0.5).to( torch.int64)).to(torch.float32) observation = self._observation_spec.randn( outer_dims=(self.batch_size, )) return TimeStep( step_type=torch.full([self.batch_size], step_type, dtype=torch.int32), reward=reward, discount=torch.full([self.batch_size], discount), observation=observation)
[docs]class RNNPolicyUnittestEnv(UnittestEnv): """Environment for testing RNN policy. The agent receives reward 1 after initial `gap` steps if its actions action match the observation given at the first step. """ def __init__(self, batch_size, episode_length, gap=3, action_type=ActionType.Discrete, obs_dim=1): self._gap = gap self._obs_dim = obs_dim super(RNNPolicyUnittestEnv, self).__init__( batch_size, episode_length, action_type=action_type, obs_dim=obs_dim) def _gen_time_step(self, s, action): step_type = StepType.MID discount = 1.0 obs_dim = self._obs_dim if s == 0: self._observation0 = 2. * torch.randint( 0, 2, size=(self.batch_size, 1)) - 1. if obs_dim > 1: self._observation0 = torch.cat([ self._observation0, torch.ones(self.batch_size, obs_dim - 1) ], dim=-1) step_type = StepType.FIRST elif s == self._episode_length - 1: step_type = StepType.LAST discount = 0.0 if s <= self._gap: reward = torch.zeros(self.batch_size) else: obs0 = self._observation0[:, 0].reshape(self.batch_size, 1) reward = 1.0 - 0.5 * torch.abs(2 * action.reshape(obs0.shape) - 1 - obs0) reward = reward.reshape(self.batch_size) if s == 0: observation = self._observation0 else: observation = torch.zeros(self.batch_size, obs_dim) return TimeStep( step_type=torch.full([self.batch_size], step_type, dtype=torch.int32), reward=reward, discount=torch.full([self.batch_size], discount), observation=observation)