Source code for alf.environments.dmc_gym_wrapper

# Copyright (c) 2021 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wrap ``dm_control`` environment with a Gym interface.
Adapted and simplified from https://github.com/denisyarats/dmc2gym
"""

from functools import partial

import gym
from gym import spaces
from gym.envs.registration import register
import numpy as np
from typing import Dict, Optional, Any

try:
    import dm_control
    from dm_control import suite
    import dm_env
except ImportError:
    dm_control = None


def _dmc_spec_to_box(spec):
    """Convert a dmc spec to a Gym Box.
    Copied from https://github.com/denisyarats/dmc2gym
    """

    def extract_min_max(s):
        assert s.dtype == np.float64 or s.dtype == np.float32
        dim = np.int(np.prod(s.shape))
        if type(s) == dm_env.specs.Array:
            bound = np.inf * np.ones(dim, dtype=np.float32)
            return -bound, bound
        elif type(s) == dm_env.specs.BoundedArray:
            zeros = np.zeros(dim, dtype=np.float32)
            return s.minimum + zeros, s.maximum + zeros

    mins, maxs = [], []
    for s in spec:
        mn, mx = extract_min_max(s)
        mins.append(mn)
        maxs.append(mx)
    low = np.concatenate(mins, axis=0)
    high = np.concatenate(maxs, axis=0)
    assert low.shape == high.shape
    return spaces.Box(low, high, dtype=np.float32)


def _flatten_obs(obs):
    """Flatten a dict to a vector.
    Copied from https://github.com/denisyarats/dmc2gym
    """
    obs_pieces = []
    for v in obs.values():
        flat = np.array([v]) if np.isscalar(v) else v.ravel()
        obs_pieces.append(flat)
    return np.concatenate(obs_pieces, axis=0)


[docs]class DMCGYMWrapper(gym.core.Env): def __init__(self, domain_name: str, task_name: str, visualize_reward: bool = True, from_pixels: bool = False, height: int = 84, width: int = 84, camera_id: int = 0, control_timestep: Optional[float] = None): """A Gym env that wraps a ``dm_control`` environment. Args: domain_name: the domain name corresponds to the physical robot task_name: a specific task under a domain, which corresponds to a particular MDP structure visualize_reward: if True, then the rendered frame will have a highlighted color when the agent achieves a reward. from_pixels: if True, the observation will be raw pixels; otherwise use the interval state vector as the observation. height: image observation height width: image observation width camera_id: which camera to render; a MuJoCo xml file can define multiple cameras with different views control_timestep: the time duration between two agent actions. If this is greater than the agent's primitive physics timestep, then multiple physics simulation steps might be performed between two actions. If None, the default control timstep defined by DM control suite will be used. """ self.metadata.update({'render.modes': ["rgb_array"]}) self._from_pixels = from_pixels self._height = height self._width = width self._camera_id = camera_id if control_timestep is not None: environment_kwargs = {"control_timestep": control_timestep} else: environment_kwargs = None # create task self._env_fn = partial( suite.load, domain_name=domain_name, task_name=task_name, task_kwargs={"time_limit": float('inf')}, environment_kwargs=environment_kwargs, visualize_reward=visualize_reward) self._env = self._env_fn() self._action_space = _dmc_spec_to_box([self._env.action_spec()]) # create observation space if from_pixels: shape = [3, height, width] self._observation_space = spaces.Box( low=0, high=255, shape=shape, dtype=np.uint8) else: self._observation_space = _dmc_spec_to_box( self._env.observation_spec().values()) def __getattr__(self, name): return getattr(self._env, name) def _get_obs(self, time_step): if self._from_pixels: # this returns channels_last images obs = self.render( height=self._height, width=self._width, camera_id=self._camera_id) obs = obs.transpose(2, 0, 1).copy() else: obs = _flatten_obs(time_step.observation) return obs @property def observation_space(self): return self._observation_space @property def action_space(self): return self._action_space
[docs] def seed(self, seed): self._action_space.seed(seed) self._observation_space.seed(seed) # Because dm_control seems not to provide an API for # seeding after an env is created, here we need to re-create # an env again. self._env = self._env_fn(task_kwargs={ 'random': seed, "time_limit": float('inf') })
[docs] def step(self, action): assert self._action_space.contains(action) time_step = self._env.step(action) reward = time_step.reward or 0 obs = self._get_obs(time_step) return obs, reward, False, {}
[docs] def reset(self): time_step = self._env.reset() obs = self._get_obs(time_step) return obs
[docs] def render(self, mode='rgb_array', height=None, width=None, camera_id=0): """Render an RGB image. Copied from https://github.com/denisyarats/dmc2gym """ assert mode == 'rgb_array', ( 'only support rgb_array mode, given %s' % mode) height = height or self._height width = width or self._width camera_id = camera_id or self._camera_id return self._env.physics.render( height=height, width=width, camera_id=camera_id)