# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wrapper providing an AlfEnvironment adapter for GYM environments.
Adapted from TF-Agents Environment API as seen in:
https://github.com/tensorflow/agents/blob/master/tf_agents/environments/suite_gym.py
"""
import collections
import gym
import gym.spaces
import numbers
import numpy as np
import alf.data_structures as ds
from alf.environments.alf_environment import AlfEnvironment
import alf.nest as nest
from alf.tensor_specs import TensorSpec, BoundedTensorSpec, torch_dtype_to_str
[docs]def tensor_spec_from_gym_space(space,
simplify_box_bounds=True,
float_dtype=np.float32):
"""
Construct tensor spec from gym space.
Args:
space (gym.Space): An instance of OpenAI gym Space.
simplify_box_bounds (bool): if True, will try to simplify redundant
arrays to make logging and debugging less verbose when printed out.
float_dtype (np.float32 | np.float64 | None): the dtype to be used for
the floating numbers. If None, it will use dtypes of gym spaces.
"""
# We try to simplify redundant arrays to make logging and debugging less
# verbose and easier to read since the printed spec bounds may be large.
def try_simplify_array_to_value(np_array):
"""If given numpy array has all the same values, returns that value."""
first_value = np_array.item(0)
if np.all(np_array == first_value):
return np.array(first_value, dtype=np_array.dtype)
else:
return np_array
if isinstance(space, gym.spaces.Discrete):
# Discrete spaces span the set {0, 1, ... , n-1} while Bounded Array specs
# are inclusive on their bounds.
maximum = space.n - 1
return BoundedTensorSpec(
shape=(), dtype=space.dtype.name, minimum=0, maximum=maximum)
elif isinstance(space, gym.spaces.MultiDiscrete):
maximum = try_simplify_array_to_value(
np.asarray(space.nvec - 1, dtype=space.dtype))
return BoundedTensorSpec(
shape=space.shape,
dtype=space.dtype.name,
minimum=0,
maximum=maximum)
elif isinstance(space, gym.spaces.MultiBinary):
shape = (space.n, )
return BoundedTensorSpec(
shape=shape, dtype=space.dtype.name, minimum=0, maximum=1)
elif isinstance(space, gym.spaces.Box):
if float_dtype is not None and "float" in space.dtype.name:
dtype = np.dtype(float_dtype)
else:
dtype = space.dtype
minimum = np.asarray(space.low, dtype=dtype)
maximum = np.asarray(space.high, dtype=dtype)
if simplify_box_bounds:
minimum = try_simplify_array_to_value(minimum)
maximum = try_simplify_array_to_value(maximum)
return BoundedTensorSpec(
shape=space.shape,
dtype=dtype.name,
minimum=minimum,
maximum=maximum)
elif isinstance(space, gym.spaces.Tuple):
return tuple([tensor_spec_from_gym_space(s) for s in space.spaces])
elif isinstance(space, gym.spaces.Dict):
return collections.OrderedDict([(key, tensor_spec_from_gym_space(s))
for key, s in space.spaces.items()])
else:
raise ValueError(
'The gym space {} is currently not supported.'.format(space))
def _as_array(nested):
"""Convert scalars in ``nested`` to np.ndarray."""
def __as_array(x):
if np.isscalar(x):
return np.array(x)
return x
return nest.map_structure(__as_array, nested)
[docs]class AlfGymWrapper(AlfEnvironment):
"""Base wrapper implementing AlfEnvironmentBaseWrapper interface for Gym envs.
Action and observation specs are automatically generated from the action and
observation spaces. See base class for ``AlfEnvironment`` details.
"""
def __init__(self,
gym_env,
env_id=None,
discount=1.0,
auto_reset=True,
simplify_box_bounds=True):
"""
Args:
gym_env (gym.Env): An instance of OpenAI gym environment.
env_id (int): (optional) ID of the environment.
discount (float): Discount to use for the environment.
auto_reset (bool): whether or not to reset the environment when done.
simplify_box_bounds (bool): whether or not to simplify redundant
arrays to values for spec bounds.
"""
super(AlfGymWrapper, self).__init__()
self._gym_env = gym_env
self._discount = discount
if env_id is None:
env_id = 0
self._env_id = np.int32(env_id)
self._action_is_discrete = isinstance(self._gym_env.action_space,
gym.spaces.Discrete)
# TODO: Add test for auto_reset param.
self._auto_reset = auto_reset
self._observation_spec = tensor_spec_from_gym_space(
self._gym_env.observation_space, simplify_box_bounds)
self._action_spec = tensor_spec_from_gym_space(
self._gym_env.action_space, simplify_box_bounds)
if hasattr(self._gym_env, "reward_space"):
self._reward_spec = tensor_spec_from_gym_space(
self._gym_env.reward_space, simplify_box_bounds)
else:
self._reward_spec = TensorSpec(())
self._time_step_spec = ds.time_step_spec(
self._observation_spec, self._action_spec, self._reward_spec)
self._info = None
self._done = True
self._zero_info = self._obtain_zero_info()
self._env_info_spec = nest.map_structure(TensorSpec.from_array,
self._zero_info)
@property
def gym(self):
"""Return the gym environment. """
return self._gym_env
def _obtain_zero_info(self):
"""Get an env info of zeros only once when the env is created.
This info will be filled in each ``FIRST`` time step as a placeholder.
"""
self._gym_env.reset()
action = nest.map_structure(lambda spec: spec.numpy_zeros(),
self._action_spec)
_, _, _, info = self._gym_env.step(action)
self._gym_env.reset()
info = _as_array(info)
return nest.map_structure(lambda a: np.zeros_like(a), info)
def __getattr__(self, name):
"""Forward all other calls to the base environment."""
return getattr(self._gym_env, name)
[docs] def get_info(self):
"""Returns the gym environment info returned on the last step."""
return self._info
def _reset(self):
# TODO: Upcoming update on gym adds **kwargs on reset. Update this to
# support that.
observation = self._gym_env.reset()
self._info = None
self._done = False
observation = self._to_spec_dtype_observation(observation)
return ds.restart(
observation=observation,
action_spec=self._action_spec,
reward_spec=self._reward_spec,
env_id=self._env_id,
env_info=self._zero_info)
@property
def done(self):
return self._done
def _step(self, action):
# Automatically reset the environments on step if they need to be reset.
if self._auto_reset and self._done:
return self.reset()
observation, reward, self._done, self._info = self._gym_env.step(
action)
observation = self._to_spec_dtype_observation(observation)
self._info = _as_array(self._info)
if self._done:
return ds.termination(
observation,
action,
reward,
self._reward_spec,
self._env_id,
env_info=self._info)
else:
return ds.transition(
observation,
action,
reward,
self._reward_spec,
self._discount,
self._env_id,
env_info=self._info)
def _to_spec_dtype_observation(self, observation):
"""Make sure observation from env is converted to the correct dtype.
Args:
observation (nested arrays or tensors): observations from env.
Returns:
A (nested) arrays of observation
"""
def _as_spec_dtype(arr, spec):
dtype = torch_dtype_to_str(spec.dtype)
if str(arr.dtype) == dtype:
return arr
else:
return arr.astype(dtype)
return nest.map_structure(_as_spec_dtype, observation,
self._observation_spec)
[docs] def env_info_spec(self):
return self._env_info_spec
[docs] def time_step_spec(self):
return self._time_step_spec
[docs] def observation_spec(self):
return self._observation_spec
[docs] def action_spec(self):
return self._action_spec
[docs] def reward_spec(self):
return self._reward_spec
[docs] def close(self):
return self._gym_env.close()
[docs] def seed(self, seed):
return self._gym_env.seed(seed)
[docs] def render(self, mode='rgb_array'):
return self._gym_env.render(mode)