# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import gym
import numpy as np
import alf
from alf.environments import suite_gym, alf_wrappers, process_environment
from alf.environments.utils import UnwrappedEnvChecker
_unwrapped_env_checker_ = UnwrappedEnvChecker()
# `DeepmindLab` are required,
# see `https://github.com/deepmind/lab` to build `DeepmindLab`
try:
import deepmind_lab
except ImportError:
deepmind_lab = None
[docs]def is_available():
return deepmind_lab is not None
[docs]@alf.configurable
def action_discretize(action_spec,
look_left_right_pixels_per_frame=(-20, 20),
look_down_up_pixels_per_frame=(-10, 10),
strafe_left_right=(-1, 1),
move_back_forward=(-1, 1),
fire=(),
jump=(1, ),
crouch=(1, ),
**kwargs):
"""Discretize action from action_spec
TODO: action combinations
Mapping all valid action values to discrete action
original deepmind lab environment action_spec:
.. code-block:: python
[{'max': 512, 'min': -512, 'name': 'LOOK_LEFT_RIGHT_PIXELS_PER_FRAME'},
{'max': 512, 'min': -512, 'name': 'LOOK_DOWN_UP_PIXELS_PER_FRAME'},
{'max': 1, 'min': -1, 'name': 'STRAFE_LEFT_RIGHT'},
{'max': 1, 'min': -1, 'name': 'MOVE_BACK_FORWARD'},
{'max': 1, 'min': 0, 'name': 'FIRE'},
{'max': 1, 'min': 0, 'name': 'JUMP'},
{'max': 1, 'min': 0, 'name': 'CROUCH'}]
and discretized actions:
.. code-block::
0 -> [20,0,0,0,0,0,0] (look left 20 pixels),
1 -> [-20,0,0,0,0,0,0] (look right 20 pixels),
...,
m -> [0,0,0,-1,0,0,0] (move back),
m+1-> [0,0,0,1,0,0,0] (move forward) ,
...,
n -> [0,0,0,0,1,1,0] (jump and fire),
...
see `SuiteDMLabTest.test_action_discretize` in `suite_dmlab_test.py` for examples
Args:
action_spec (list(dict)): action spec
look_left_right_pixels_per_frame (iterable|str): look left or look right pixels
look_down_up_pixels_per_frame (iterable|str): look down or look up pixels
strafe_left_right (iterable|str): strafe left or strafe right
move_back_forward (iterable|str): move back or move forward
fire (iterable|str): fire values
jump (iterable|str): jump values
crouch (iterable|str): crouch values
kwargs (dict): other config for actions
Returns:
actions (list[numpy.array]): discrete actions
"""
actions = []
config = dict(
look_left_right_pixels_per_frame=look_left_right_pixels_per_frame,
look_down_up_pixels_per_frame=look_down_up_pixels_per_frame,
strafe_left_right=strafe_left_right,
move_back_forward=move_back_forward,
fire=fire,
jump=jump,
crouch=crouch)
config.update(kwargs)
config = {key.upper(): value for key, value in config.items()}
for i, spec in enumerate(action_spec):
val_min = spec['min']
val_max = spec['max']
values = config.get(spec['name'], None)
if values is None:
values = list(range(val_min, val_max + 1))
elif isinstance(values, str):
values = eval(values)
for value in values:
if value < val_min or value > val_max or value == 0:
continue
action = np.zeros([len(action_spec)], np.intc)
action[i] = value
actions.append(action)
return actions
[docs]@alf.configurable
class DeepmindLabEnv(gym.Env):
metadata = {'render.modes': ['rgb_array']}
def __init__(self,
scene,
action_repeat=4,
observation='RGB_INTERLEAVED',
config={},
renderer='hardware'):
"""Create an deepmind_lab env
Args:
scene (str): script for the deepmind_lab env. See available script:
`<https://github.com/deepmind/lab/tree/master/game_scripts/levels>`_
action_repeat (int): the interval at which the agent experiences the game
observation (str): observation format. See doc about the available observations:
`<https://github.com/deepmind/lab/blob/master/docs/users/python_api.md>`_
config (dict): config for env
renderer (str): 'software' or 'hardware'. If set to 'hardware', EGL or GLX is
used for rendering. Make sure you have GPU if you use 'hardware'.
"""
super(DeepmindLabEnv, self).__init__()
self._action_repeat = action_repeat
self._observation = observation
self._lab = deepmind_lab.Lab(
scene, [self._observation], config=config, renderer=renderer)
self._lab.reset()
action_spec = self._lab.action_spec()
action_list = action_discretize(action_spec)
self.action_space = gym.spaces.Discrete(len(action_list))
self._action_list = action_list
obs = self._lab.observations()[observation]
self.observation_space = gym.spaces.Box(
0, 255, obs.shape, dtype=np.uint8)
self._last_obs = obs
[docs] def step(self, action):
reward = self._lab.step(
self._action_list[action], num_steps=self._action_repeat)
terminal = not self._lab.is_running()
obs = None if terminal else self._lab.observations()[self._observation]
self._last_obs = obs if obs is not None else np.copy(self._last_obs)
return self._last_obs, reward, terminal, dict()
[docs] def reset(self):
self._lab.reset()
self._last_obs = self._lab.observations()[self._observation]
return self._last_obs
[docs] def seed(self, seed=None):
self._lab.reset(seed=seed)
[docs] def close(self):
self._lab.close()
[docs] def render(self, mode='rgb_array', close=False):
if mode == 'rgb_array':
return self._last_obs
else:
super().render(mode=mode) # just raise an exception
[docs]@alf.configurable
def load(scene,
env_id=None,
discount=1.0,
frame_skip=4,
gym_env_wrappers=(),
alf_env_wrappers=(),
wrap_with_process=False,
max_episode_steps=None):
"""Load deepmind lab envs.
Args:
scene (str): script for the deepmind_lab env. See available script:
`<https://github.com/deepmind/lab/tree/master/game_scripts/levels>`_
env_id (int): (optional) ID of the environment.
discount (float): Discount to use for the environment.
frame_skip (int): the frequency at which the agent experiences the game
gym_env_wrappers (Iterable): Iterable with references to gym_wrappers,
classes to use directly on the gym environment.
alf_env_wrappers (Iterable): Iterable with references to alf_wrappers
classes to use on the ALF environment.
wrap_with_process (bool): Whether wrap env in a process
max_episode_steps (int): max episode step limit
Returns:
An AlfEnvironment instance.
"""
_unwrapped_env_checker_.check_and_update(wrap_with_process)
if max_episode_steps is None:
max_episode_steps = 0
def env_ctor(env_id=None):
return suite_gym.wrap_env(
DeepmindLabEnv(scene=scene, action_repeat=frame_skip),
env_id=env_id,
discount=discount,
max_episode_steps=max_episode_steps,
gym_env_wrappers=gym_env_wrappers,
alf_env_wrappers=alf_env_wrappers)
if wrap_with_process:
process_env = process_environment.ProcessEnvironment(
functools.partial(env_ctor))
process_env.start()
torch_env = alf_wrappers.AlfEnvironmentBaseWrapper(process_env)
else:
torch_env = env_ctor(env_id=env_id)
return torch_env