Source code for alf.algorithms.dynamic_action_repeat_agent

# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch

import alf
from alf.algorithms.algorithm import Algorithm
from alf.algorithms.off_policy_algorithm import OffPolicyAlgorithm
from alf.algorithms.sac_algorithm import SacAlgorithm
from alf.algorithms.config import TrainerConfig
from alf.algorithms.data_transformer import RewardNormalizer
from alf.data_structures import TimeStep, Experience, namedtuple, AlgStep
from alf.data_structures import make_experience
from alf.tensor_specs import BoundedTensorSpec, TensorSpec
from alf.utils.conditional_ops import conditional_update
from alf.utils import common, summary_utils

ActionRepeatState = namedtuple(
    "ActionRepeatState", [
        "rl", "action", "steps", "k", "rl_discount", "rl_reward",
        "sample_rewards", "repr"
    ],
    default_value=())


[docs]@alf.configurable class DynamicActionRepeatAgent(OffPolicyAlgorithm): """Create an agent which learns a variable action repetition duration. At each decision step, the agent outputs both the action to repeat and the number of steps to repeat. These two quantities together constitute the action of the agent. We use SAC with mixed action type for training. The core idea is similar to `Learning to Repeat: Fine Grained Action Repetition for Deep Reinforcement Learning <http://arxiv.org/abs/1702.06054>`_. """ def __init__(self, observation_spec, action_spec, reward_spec=TensorSpec(()), env=None, config: TrainerConfig = None, K=5, rl_algorithm_cls=SacAlgorithm, representation_learner_cls=None, reward_normalizer_ctor=None, gamma=0.99, optimizer=None, debug_summaries=False, name="DynamicActionRepeatAgent"): """ Args: observation_spec (nested TensorSpec): representing the observations. action_spec (nested BoundedTensorSpec): representing the actions; can only be continuous actions for now. reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing the reward(s). env (Environment): The environment to interact with. ``env`` is a batched environment, which means that it runs multiple simulations simultateously. ``env` only needs to be provided to the root algorithm. config (TrainerConfig): config for training. ``config`` only needs to be provided to the algorithm which performs a training iteration by itself. K (int): the maximal repeating times for an action. rl_algorithm_cls (Callable): creates an RL algorithm to be augmented by this dynamic action repeating ability. representation_learner_cls (type): The algorithm class for learning the representation. If provided, the constructed learner will calculate the representation from the original observation as the observation for downstream algorithms such as ``rl_algorithm``. We assume that the representation is trained by ``rl_algorithm``. reward_normalizer_ctor (Callable): if not None, it must be ``RewardNormalizer`` and environment rewards will be normalized for training. gamma (float): the reward discount to be applied when accumulating ``k`` steps' rewards for a repeated action. Note that this value should be equal to the gamma used by the critic loss for target values. optimizer (None|Optimizer): The default optimizer for training. See comments above for detail. debug_summaries (bool): True if debug summaries should be created. name (str): name of this agent. """ assert action_spec.is_continuous, ( "Only support continuous actions for now!") rl_observation_spec = observation_spec repr_learner = None if representation_learner_cls is not None: repr_learner = representation_learner_cls( observation_spec=observation_spec, action_spec=action_spec, debug_summaries=debug_summaries) rl_observation_spec = repr_learner.output_spec self._rl_action_spec = (BoundedTensorSpec( shape=(), dtype='int64', maximum=K - 1), action_spec) rl = rl_algorithm_cls( observation_spec=rl_observation_spec, action_spec=self._rl_action_spec, debug_summaries=debug_summaries) self._action_spec = action_spec self._observation_spec = observation_spec self._gamma = gamma predict_state_spec = ActionRepeatState( rl=rl.predict_state_spec, action=action_spec, steps=TensorSpec(shape=(), dtype='int64')) rollout_state_spec = predict_state_spec._replace( rl=rl.rollout_state_spec, rl_discount=TensorSpec(()), rl_reward=TensorSpec(()), k=TensorSpec((), dtype='int64'), sample_rewards=TensorSpec(())) train_state_spec = ActionRepeatState(rl=rl.train_state_spec) if repr_learner is not None: predict_state_spec = predict_state_spec._replace( repr=repr_learner.predict_state_spec) rollout_state_spec = rollout_state_spec._replace( repr=repr_learner.rollout_state_spec) train_state_spec = train_state_spec._replace( repr=repr_learner.train_state_spec) super().__init__( observation_spec, action_spec, reward_spec=reward_spec, train_state_spec=train_state_spec, rollout_state_spec=rollout_state_spec, predict_state_spec=predict_state_spec, env=env, config=config, optimizer=optimizer, debug_summaries=debug_summaries, name=name) self._repr_learner = repr_learner self._reward_normalizer = None if reward_normalizer_ctor is not None: self._reward_normalizer = reward_normalizer_ctor( observation_spec=()) self._rl = rl self._K = K
[docs] def observe_for_replay(self, exp): # Do not observe data at every time step; customized observing pass
def _should_switch_action(self, time_step: TimeStep, state): repeat_last_step = (state.steps == 0) return repeat_last_step | time_step.is_first() | time_step.is_last()
[docs] def predict_step(self, time_step: TimeStep, state): switch_action = self._should_switch_action(time_step, state) @torch.no_grad() def _generate_new_action(time_step, state): repr_state = () if self._repr_learner is not None: repr_step = self._repr_learner.predict_step( time_step, state.repr) time_step = time_step._replace(observation=repr_step.output) repr_state = repr_step.state rl_step = self._rl.predict_step(time_step, state.rl) steps, action = rl_step.output return ActionRepeatState( action=action, steps=steps + 1, # [0, K-1] -> [1, K] rl=rl_step.state, repr=repr_state) new_state = conditional_update( target=state, cond=switch_action, func=_generate_new_action, time_step=time_step, state=state) new_state = new_state._replace(steps=new_state.steps - 1) return AlgStep( output=new_state.action, state=new_state, # plot steps and action when rendering video info=dict(action=(new_state.action, new_state.steps)))
[docs] def rollout_step(self, time_step: TimeStep, state: ActionRepeatState): switch_action = self._should_switch_action(time_step, state) # state.k is the current step index over K steps state = state._replace( rl_reward=state.rl_reward + torch.pow( self._gamma, state.k.to(torch.float32)) * time_step.reward, rl_discount=state.rl_discount * time_step.discount * self._gamma, k=state.k + 1) if self._reward_normalizer is not None: # The probability of a reward at step k being kept till K steps is: # 1/k * k/(k+1) * .. * (K-1)/K = 1/K. This provides enough randomness # to make the normalizer unbiased. state = state._replace( sample_rewards=torch.where(( torch.rand_like(state.sample_rewards) < 1. / state.k.to(torch.float32) ), time_step.reward, state.sample_rewards)) @torch.no_grad() def _generate_new_action(time_step, state): rl_time_step = time_step._replace( reward=state.rl_reward, # To keep consistent with other algorithms, we choose to multiply # discount with gamma once more in td_loss.py discount=state.rl_discount / self._gamma) observation, repr_state = rl_time_step.observation, () if self._repr_learner is not None: repr_step = self._repr_learner.rollout_step( time_step, state.repr) observation = repr_step.output repr_state = repr_step.state rl_step = self._rl.rollout_step( rl_time_step._replace(observation=observation), state.rl) rl_step = rl_step._replace( info=(rl_step.info, state.k, state.sample_rewards)) # Store to replay buffer. super(DynamicActionRepeatAgent, self).observe_for_replay( make_experience( rl_time_step._replace( # Store the untransformed observation so that later it will # be transformed again during training observation=rl_time_step.untransformed.observation), rl_step, state)) steps, action = rl_step.output return ActionRepeatState( action=action, steps=steps + 1, # [0, K-1] -> [1, K] k=torch.zeros_like(state.k), repr=repr_state, rl=rl_step.state, rl_reward=torch.zeros_like(state.rl_reward), sample_rewards=torch.zeros_like(state.sample_rewards), rl_discount=torch.ones_like(state.rl_discount)) new_state = conditional_update( target=state, cond=switch_action, func=_generate_new_action, time_step=time_step, state=state) new_state = new_state._replace(steps=new_state.steps - 1) return AlgStep(output=new_state.action, state=new_state)
[docs] def train_step(self, inputs: TimeStep, state: ActionRepeatState, rollout_info): """Train the underlying RL algorithm ``self._rl``. Because in ``self.rollout_step()`` the replay buffer only stores info related to ``self._rl``, here we can directly call ``self._rl.train_step()``. Args: rl_exp (Experience): experiences that have been transformed to be learned by ``self._rl``. state (ActionRepeatState): """ repr_state = () if self._repr_learner is not None: repr_step = self._repr_learner.train_step(inputs, state.repr) inputs = inputs._replace(observation=repr_step.output) repr_state = repr_step.state rl_step = self._rl.train_step(inputs, state.rl, rollout_info) new_state = ActionRepeatState(rl=rl_step.state, repr=repr_state) return rl_step._replace(state=new_state)
[docs] def calc_loss(self, info): """Calculate the loss for training ``self._rl``.""" return self._rl.calc_loss(info)
[docs] def after_update(self, root_inputs, info): """Call ``self._rl.after_update()``.""" self._rl.after_update(root_inputs, info)
[docs] def summarize_train(self, experience, train_info, loss_info, params): """Overwrite the function because the training action spec is different from the rollout action spec. """ Algorithm.summarize_train(self, experience, train_info, loss_info, params) if self._debug_summaries: summary_utils.summarize_action(experience.action, self._rl_action_spec) self.summarize_reward("training_reward", experience.reward) if self._config.summarize_action_distributions: field = alf.nest.find_field(train_info, 'action_distribution') if len(field) == 1: summary_utils.summarize_distribution("action_dist", field[0])
[docs] def preprocess_experience(self, root_inputs, rollout_info, batch_info): """Normalize training rewards if a reward normalizer is provided. Shape of ``rl_exp`` is ``[B, T, ...]``. The statistics of the normalizer is updated by random sample rewards. """ reward = root_inputs.reward rl_info, repeats, sample_rewards = rollout_info if self._reward_normalizer is not None: normalizer = self._reward_normalizer.normalizer normalizer.update(sample_rewards) # compute current variance m = normalizer._mean_averager.get() m2 = normalizer._m2_averager.get() var = torch.relu(m2 - m**2) # compute accumulated mean over ``repeats`` steps acc_mean = ((1 - torch.pow(self._gamma, repeats.to(torch.float32))) / (1 - self._gamma) * m) reward -= acc_mean reward = alf.layers.normalize_along_batch_dims( reward, torch.zeros_like(var), var, variance_epsilon=normalizer._variance_epsilon) clip = self._reward_normalizer.clip_value if clip > 0: # The clip value is for single-step rewards, so we need to multiply # it with the repeated steps. clip = clip * repeats reward = torch.max(torch.min(clip, reward), -clip) root_inputs = root_inputs._replace(reward=reward) return self._rl.preprocess_experience(root_inputs, rl_info, batch_info)