Source code for alf.environments.alf_gym3_wrapper

# Copyright (c) 2021 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wrapper providing an AlfEnvironment adapter for Gym3 envrionments

Gym3 provides an unified interface for reinforcement leraning environments that
improves upon the gym interface and includes vectorization (i.e. natively
supported batched environments).

Gym3 has a different set of considerations which lead to different design
choices compared to gym. See the following links to learn about those design
choices.

https://github.com/openai/gym3/blob/master/docs/design.md

"""
from typing import List, Callable, Optional, Any

import torch
import numpy as np
import gym3
from absl import logging

from alf.environments.alf_environment import AlfEnvironment
from alf import TensorSpec, BoundedTensorSpec
import alf.data_structures as ds
import alf.nest as nest


def _gym3_space_to_tensor_spec(space, force_int64: bool = False):
    """Convert Gym3 tensor specifications (a.k.a. spaces) to TensorSpec

    This is a helper function to form obesrvation spec and action specs for the
    AlfGym3Wrapper.

    Gym3 defines its own tensor specifications and as an adapting layer we use
    this function to convert that to Alf's BoundedTensorSpec.

    The code logic here follows the gym3's implementation:

    https://github.com/openai/gym3/blob/4c3824680eaf9dd04dce224ee3d4856429878226/gym3/interop.py#L74

    Args:

        space: the Gym3 space that describes the tensor specification of
            observation or action.

        force_int64: If set to True, all the discrete type will be converted to
            torch.int64. This is useful for action spec where we expect all
            discrete action to be converted to int64. The main reason for this
            is that actions are usually generated from action distributions,
            whose sample always produce int64 tensors.

    Returns:

        A nested BoundedTensorSpec with the same sturcture.

    """

    def __convert(gym3_space: gym3.types.TensorType) -> BoundedTensorSpec:
        # Gym3 space's eltype is the counterpart of the dtype
        eltype = gym3_space.eltype

        if isinstance(eltype, gym3.types.Discrete):
            eltype = gym3_space.eltype
            return BoundedTensorSpec(
                shape=gym3_space.shape,
                dtype=torch.int64 if force_int64 else eltype.dtype_name,
                minimum=0,
                maximum=eltype.n - 1)
        elif isinstance(eltype, gym3.types.Real):
            # Currently this follows gym3's logic to convert it to unbounded
            # tesnor as gym3.types.Real is not bounded.
            eltype = gym3_space.eltype
            return BoundedTensorSpec(
                shape=gym3_space.shape,
                dtype=eltype.dtype_name,
                minimum=float('-inf'),
                maximum=float('inf'))
        else:
            raise NotImplementedError(
                f'AlfGym3Wrapper does not support space element type {eltype} yet'
            )

    if isinstance(space, gym3.types.DictType):
        return {
            key: nest.map_structure(lambda x: __convert(x), space[key])
            for key in space.keys()
        }
    return nest.map_structure(lambda x: __convert(x), space)


def _extract_env_info_spec(sampe_env_info, ignored_info_keys: List[str] = []):
    """Extracts the environment info spec from a sample

    Args:

        sample_env_info (nested numpy array): A sample environment info instance
            whose array specification will be extracted and converted to nested
            TesnorSpec.

        ignored_info_keys: a list of keys that should be ignored from the
            environment info. Only the top level keys in the nested structure
            obey this.

    Returns:

        A nested TensorSpec that shares the same structure as the sample instance.

    """

    def __to_tensor_spec(entry):
        x = entry
        if np.isscalar(x):
            x = np.array(x)
        return TensorSpec.from_array(np.zeros_like(x))

    trimmed = {
        key: sampe_env_info[key]
        for key in sampe_env_info if key not in ignored_info_keys
    }

    return nest.map_structure(__to_tensor_spec, trimmed)


[docs]class AlfGym3Wrapper(AlfEnvironment): """An adapter to make Gym3 environments follow Alf's convention Although Gym3 provides an official gym wrapper, we decided to not base the Alf adapter upon that gym wrapper because: 1. Performance and resource-wise, relying the natively supported batch (vectorized) environments from Gym3 is much more memory-efficient than creating a lot of Gym3 instances in subprocesses in batch mode. 2. Gym3 has a different interface on indicating the last step and first step of an episode compared to gym. 3. Gym3 has different interfaces to rendering and recording from gym. 4. Gym3 normally do not provide support for resetting the environment. In this adapter, all above are considered and patched to achieve compatibility with AlfEnvironment. Normally you are not expected to call AlfGym3Wrapper directly. Instead the ``load()`` functions for various Gym3-based environments are preferred. For example, ``suite_procgen.load()`` is used to construct procgen environments which themselves are Gym3-based environments. NOTE: TimeLimit is currently not applicable to Gym3 environments as it does not offer reset() interface. """ def __init__(self, gym3_env: gym3.Env, image_channel_first: bool = True, ignored_info_keys: List[str] = [], support_force_reset: bool = False, render_activator: Optional[Callable[[], gym3.Env]] = None, frame_extractor: Optional[Callable[[gym3.Env], Any]] = None): """Construct an adapted instance for the input Gym3 environment Args: gym3_env: the input environment which should be an instance of a class that derives from gym3.Env image_channel_first: when set to True, the image-based (of 3 channels) observation will be permuted so that the channel dimension comes first. ignored_info_keys: a list of keys in the env info that should not be included in the env info of the TimeStep. This is useful when some huge but not useful information are stored in the env info of the underlying Gym3 environment, and ignoring them is crucial to achieve better performance. support_force_reset: Gym3 environments do not support force reset in general. However, some of the environments such as procgen allows sending action -1 to reset the environments. Set this to True to enable such behavior. render_activator: when set to None, it indicates that this environment does not support rendering. Otherwise it will be a function that re-creates a Gym3 environment with render enabled. See render() for details. frame_extractor: when set to None, it indicates that this environment does not support recording. Otherwise it will be a function that extracts the rendered frame for recording from the environment. """ assert isinstance(gym3_env, gym3.Env), \ f'AlfGym3Wrapper: {type(gym3_env)} is not dervied from gym3.Env' super().__init__() # The underlying Gym3 environment self._gym3_env = gym3_env self._support_force_reset = support_force_reset # +--------------------------+ # | Render/Recording Related | # +--------------------------+ # When initially constructed, render is not enabled until the first call # to render() is invoked. Use self._render_enabled to make sure render # is not enabled for more than once. self._render_enabled = False self._render_activator = render_activator self._frame_extrator = frame_extractor # Create metadata with 'render.modes' so that it is compatible with # VideoRecorder. self.metadata = {'render.modes': []} if self._render_activator is not None and self._frame_extrator is not None: self.metadata['render.modes'].append('rgb_array') # +--------------------------+ # | Cache the Tensor Specs | # +--------------------------+ # NOTE(breakds): when needed, expose this and allow an user to set it. self._discount = 1.0 self._observation_spec = _gym3_space_to_tensor_spec( self._gym3_env.ob_space) self._image_channel_first = image_channel_first def _image_channel_first_permute_spec(spec): # Only transform the image-based component of the observation. This # simply assumes ndim == 3 implies an image. if spec.ndim != 3: return spec return BoundedTensorSpec( shape=(spec.shape[2], spec.shape[0], spec.shape[1]), dtype=spec.dtype, minimum=spec.minimum, maximum=spec.maximum) if image_channel_first: self._observation_spec = nest.map_structure( _image_channel_first_permute_spec, self._observation_spec) # For discrete action type, always use int64 during the conversion. self._action_spec = _gym3_space_to_tensor_spec( self._gym3_env.ac_space, force_int64=True) self._env_info_spec = _extract_env_info_spec( self._gym3_env.get_info()[0], ignored_info_keys=ignored_info_keys) # +--------------------------+ # | Stateful Contexts | # +--------------------------+ # A list representing whether the corresponding single environment # finishes the current episode self._prev_first = [False] * self.batch_size @property def batched(self): return True @property def batch_size(self): return self._gym3_env.num # Implement abstract env_info_spec
[docs] def env_info_spec(self): return self._env_info_spec
# Implement abstract observation_spec
[docs] def observation_spec(self): return self._observation_spec
# Implement abstract action_spec
[docs] def action_spec(self): return self._action_spec
def _create_time_step(self, reward, observation, action, step_type: List[ds.StepType]) -> ds.TimeStep: """Construct a TimeStep object for Alf algorithms to consume This function construct the TimeStep objects based on the information observed from the underlying Gym3 environment. It essentially does: 1. Convert numpy arrays or lists to tensors 2. Apply image channel first if necessary 3. Trim ignored keys from the env info """ observation = nest.map_structure(lambda x: torch.as_tensor(x), observation) if self._image_channel_first: observation = nest.map_structure( lambda x: x.permute(0, 3, 1, 2).contiguous(), observation) trimmed_info = [ nest.prune_nest_like(info, self.env_info_spec()) for info in self._gym3_env.get_info() ] # In the case when we assume no timeouts, all episode end will # be due to success or failure, where discount is set to 0.0. discount = [ 0.0 if s == ds.StepType.LAST else self._discount for s in step_type ] return ds.TimeStep( step_type=torch.as_tensor(step_type), reward=torch.as_tensor(reward), discount=torch.as_tensor(discount), observation=observation, env_id=torch.arange(self.batch_size), prev_action=torch.as_tensor(action), env_info=nest.map_structure( lambda *values: torch.as_tensor(values), *trimmed_info)) # Implement abstract _reset def _reset(self) -> ds.TimeStep: """Implement ``_reset()`` for compatibility Note that by default Gym3 environmenst cannot be reset. However, if ``support_force_reset`` is set to True, action -1 will be sent to all the sub-environments to reset them if the underlying environment follows the convention. Otherwise the reset will be ignored. """ if self._support_force_reset: self._gym3_env.act(np.array([-1] * self.batch_size)) self._prev_first = [False] * self.batch_size else: logging.warning('reset() ignored by AlfGym3Wrapper') reward, observation, _ = self._gym3_env.observe() time_step = self._create_time_step( reward, observation, step_type=[ds.StepType.FIRST] * self.batch_size, # Faking actions action=nest.map_structure( lambda spec: spec.numpy_zeros(outer_dims=(self.batch_size, )), self.action_spec())) return time_step # Implement abstract _step def _step(self, action) -> ds.TimeStep: """Implement ``_step()`` for compatibility A very important note here is that special treatment is done at the end of each episode. Unlike Gym environments, Gym3 environments do not return ``done=True`` immediately. Instead, ``first=True`` is returned upon the ``observe()`` of the NEXT ``act()``, which is actually the FIRST FRAME of a new episode in Gym3 sense. With this wrapper we CHANGED that definition, so that the frame with ``first=True`` actually becomes the LAST FRAME of the previous episode. Because the observation of the new episode can be DRAMATICALLY DIFFERENT from the actual last frame of the previous episode, the previous observation is returned in this case. Therefore to summarize, effectively we will 1. Repeat the end-of-episode observation twice for each episode. 2. Throw away the first frame of each episode, and use the second frame as if it is the first frame. Gym3's official Gym wrapper DID THE SAME. """ _, prev_observation, _ = self._gym3_env.observe() np_action = nest.map_structure(lambda x: x.cpu().numpy(), action) self._gym3_env.act(np_action) reward, observation, first = self._gym3_env.observe() # Override the obervation with the previous observation if that # particular environment has ``first=True``. def __override_with_prev_observation(ob_array: np.ndarray, prev_ob_array: np.ndarray): ob_array[first] = prev_ob_array[first] nest.map_structure(__override_with_prev_observation, observation, prev_observation) # TODO(breakds): More properly deal with this by pre-process the # experiences in the replay buffer so that if the next step has first = # True, the previous step is considered an END frame. The current # implementation will incur an unnecessarily non-zero TD error with the # last 2 frames of an episode when TimeLimit is applied, but when the # episodes are long enough, the impact will be small. # This does the trick of repeating end-of-episode frames and throwing # away first-of-episode frames. step_type = [ ds.StepType.FIRST if d else (ds.StepType.LAST if f else ds.StepType.MID) for d, f in zip(self._prev_first, first) ] time_step = self._create_time_step( reward=reward, observation=observation, step_type=step_type, action=action) self._prev_first = first return time_step
[docs] def render(self, mode: str): """Enables rendering by re-activating the environment Args: mode: A string indicate the rendering mode. This is to make it compatible with Gym environments' rendering interface. For AlfGym3Wrapper, it returns the RGB array image if mode is specified as `rgb_array`, and None for other modes. """ if not self._render_enabled: assert self._render_activator is not None, \ ('This gym3 environment does not support rendering because ' 'render_activator is not provided.') self._gym3_env = self._render_activator() self._render_enabled = True if mode == 'rgb_array': assert self._frame_extrator is not None, \ ('This gym3 environment does not support recording because ' 'frame_extractor is not provided.') return self._frame_extrator(self._gym3_env) return None