Source code for alf.algorithms.mbrl_algorithm

# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model-based RL Algorithm."""

from functools import partial

import torch
from typing import Any, Callable, Optional

import alf
from alf.algorithms.config import TrainerConfig
from alf.algorithms.off_policy_algorithm import OffPolicyAlgorithm
from alf.algorithms.one_step_loss import OneStepTDLoss
from alf.data_structures import (AlgStep, Experience, LossInfo, namedtuple,
                                 TimeStep)
from alf.nest import nest
from alf.networks import ActorDistributionNetwork, CriticNetwork
from alf.tensor_specs import TensorSpec, BoundedTensorSpec
from alf.utils.math_ops import add_ignore_empty

from alf.algorithms.dynamics_learning_algorithm import DynamicsLearningAlgorithm
from alf.algorithms.reward_learning_algorithm import RewardEstimationAlgorithm
from alf.algorithms.planning_algorithm import PlanAlgorithm
from alf.algorithms.predictive_representation_learner import \
                                    PredictiveRepresentationLearner

MbrlState = namedtuple("MbrlState", ["dynamics", "reward", "planner"])
MbrlInfo = namedtuple(
    "MbrlInfo", ["dynamics", "reward", "planner"], default_value=())


[docs]@alf.configurable class MbrlAlgorithm(OffPolicyAlgorithm): """Model-based RL algorithm """ def __init__(self, observation_spec, action_spec, reward_module: RewardEstimationAlgorithm, planner_module_ctor: Callable[[Any, Any], PlanAlgorithm], feature_spec: Optional[TensorSpec] = None, dynamics_module_ctor: Optional[ Callable[[Any, Any], DynamicsLearningAlgorithm]] = None, reward_spec=TensorSpec(()), particles_per_replica=1, epsilon_greedy=None, env=None, config: TrainerConfig = None, dynamics_optimizer=None, reward_optimizer=None, planner_optimizer=None, checkpoint=None, debug_summaries=False, name="MbrlAlgorithm"): """Create an MbrlAlgorithm. The MbrlAlgorithm takes as input the following set of modules for making decisions on actions based on the current observation: 1) learnable/fixed dynamics module 2) learnable/fixed reward module 3) learnable/fixed planner module Args: action_spec (BoundedTensorSpec): representing the actions. dynamics_module_ctor: used to construct the module for learning to predict the next feature based on the previous feature and action. It should accept input with spec [feature_spec, encoded_action_spec] and output a tensor of shape feature_spec. For discrete action, encoded_action is an one-hot representation of the action. For continuous action, encoded action is same as the original action. reward_module (RewardEstimationAlgorithm): module for calculating the reward, i.e., evaluating the reward for a (s, a) pair planner_module_ctor:: used to construct the module for generating planned action based on specified reward function and dynamics function reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing the reward(s). particles_per_replica (int): number of particles for each replica epsilon_greedy (float): a floating value in [0,1], representing the chance of action sampling instead of taking argmax. This can help prevent a dead loop in some deterministic environment like Breakout. Only used for evaluation. If None, its value is taken from ``config.epsilon_greedy`` and then ``alf.get_config_value(TrainerConfig.epsilon_greedy)``. env (Environment): The environment to interact with. env is a batched environment, which means that it runs multiple simulations simultateously. env only needs to be provided to the root Algorithm. config (TrainerConfig): config for training. config only needs to be provided to the algorithm which performs `train_iter()` by itself. checkpoint (None|str): a string in the format of "prefix@path", where the "prefix" is the multi-step path to the contents in the checkpoint to be loaded. "path" is the full path to the checkpoint file saved by ALF. Refer to ``Algorithm`` for more details. debug_summaries (bool): True if debug summaries should be created. name (str): The name of this algorithm. """ if feature_spec is None: feature_spec = observation_spec dynamics_module = None if dynamics_module_ctor is not None: dynamics_module = dynamics_module_ctor( feature_spec=feature_spec, action_spec=action_spec) planner_module = planner_module_ctor( feature_spec=feature_spec, action_spec=action_spec) train_state_spec = MbrlState( dynamics=dynamics_module.train_state_spec if dynamics_module is not None else (), reward=reward_module.train_state_spec if reward_module is not None else (), planner=planner_module.train_state_spec if planner_module is not None else ()) if epsilon_greedy is None: epsilon_greedy = alf.utils.common.get_epsilon_greedy(config) self._epsilon_greedy = epsilon_greedy super().__init__( feature_spec, action_spec, reward_spec=reward_spec, train_state_spec=train_state_spec, env=env, config=config, checkpoint=checkpoint, debug_summaries=debug_summaries, name=name) flat_action_spec = nest.flatten(action_spec) action_spec = flat_action_spec[0] assert action_spec.is_continuous, "only support \ continious control" num_actions = action_spec.shape[-1] flat_feature_spec = nest.flatten(feature_spec) assert len(flat_feature_spec) == 1, "Mbrl doesn't support nested \ feature_spec" self._action_spec = action_spec self._num_actions = num_actions if dynamics_optimizer is not None: self.add_optimizer(dynamics_optimizer, [dynamics_module]) if planner_optimizer is not None: self.add_optimizer(planner_optimizer, [planner_module]) if reward_optimizer is not None: self.add_optimizer(reward_optimizer, [reward_module]) self._dynamics_module = dynamics_module self._reward_module = reward_module self._planner_module = planner_module self._planner_module.set_action_sequence_cost_func( self._predict_multi_step_cost) if dynamics_module is not None: self._num_dynamics_replicas = dynamics_module.num_replicas self._particles_per_replica = particles_per_replica def _predict_next_step(self, time_step, dynamics_state): """Predict the next step (observation and state) based on the current time step and state Args: time_step (TimeStep): input data for next step prediction dynamics_state: input dynamics state next step prediction Returns: next_time_step (TimeStep): updated time_step with observation predicted from the dynamics module next_dynamic_state: updated dynamics state from the dynamics module """ with torch.no_grad(): dynamics_step = self._dynamics_module.predict_step( time_step, dynamics_state) pred_obs = dynamics_step.output next_time_step = time_step._replace(observation=pred_obs) next_dynamic_state = dynamics_step.state return next_time_step, next_dynamic_state def _expand_to_population(self, data, population_size): """Expand the input tensor to a population of replications Args: data (Tensor): input data with shape [batch_size, ...] Returns: data_population (Tensor) with shape [batch_size * population_size, ...]. For example data tensor [[a, b], [c, d]] and a population_size of 2, we have the following data_population tensor as output [[a, b], [a, b], [c, d], [c, d]] """ data_population = torch.repeat_interleave(data, population_size, dim=0) return data_population def _expand_to_particles(self, inputs): """Expand the inputs of shape [B, ...] to [B*p, n, ...] if n > 1, or to [B*p, ...] if n = 1, where n is the number of replicas and p is the number of particles per replica. """ # [B, ...] -> [B*p, ...] inputs = torch.repeat_interleave( inputs, self._particles_per_replica, dim=0) if self._num_dynamics_replicas > 1: # [B*p, ...] -> [B*p, n, ...] inputs = inputs.unsqueeze(1).expand( -1, self._num_dynamics_replicas, *inputs.shape[1:]) return inputs @torch.no_grad() def _predict_multi_step_cost(self, observation, actions): """Compute the total cost by unrolling multiple steps according to the given initial observation and multi-step actions. Args: observation: the current observation for predicting quantities of future time steps actions (Tensor): a set of action sequences to shape [B, population, unroll_steps, action_dim] Returns: cost (Tensor): negation of accumulated predicted reward, with the shape of [B, population] """ batch_size, population_size, num_unroll_steps = actions.shape[0:3] state = self.get_initial_predict_state(batch_size) time_step = TimeStep() dyn_state = state.dynamics._replace(feature=observation) dyn_state = nest.map_structure( partial( self._expand_to_population, population_size=population_size), dyn_state) # expand to particles dyn_state = nest.map_structure(self._expand_to_particles, dyn_state) reward_state = state.reward reward = 0 for i in range(num_unroll_steps): action = actions[:, :, i, ...].view(-1, actions.shape[3]) action = self._expand_to_particles(action) time_step = time_step._replace(prev_action=action) time_step, dyn_state = self._predict_next_step( time_step, dyn_state) next_obs = time_step.observation # Note: currently using (next_obs, action), might need to # consider (obs, action) in order to be more compatible # with the conventional definition of the reward function reward_step, reward_state = self._calc_step_reward( next_obs, action, reward_state) reward = reward + reward_step cost = -reward # reshape cost # [B*par, n] -> [B, par*n] cost = cost.reshape( -1, self._particles_per_replica * self._num_dynamics_replicas) cost = cost.mean(-1) # reshape cost back to [batch size, population_size] cost = torch.reshape(cost, [batch_size, -1]) return cost def _calc_step_reward(self, obs, action, reward_state): """Calculate the step reward based on the given observation, action and state. Args: obs (Tensor): observation action (Tensor): action state: state for reward calculation Returns: reward (Tensor): compuated reward for the given input updated_state: updated state from the reward module """ reward, reward_state = self._reward_module.compute_reward( obs, action, reward_state) return reward, reward_state def _predict_with_planning(self, time_step: TimeStep, state: MbrlState, epsilon_greedy): action, planner_state = self._planner_module.predict_plan( time_step, state.planner, epsilon_greedy) dynamics_state = self._dynamics_module.update_state( time_step, state.dynamics) return AlgStep( output=action, state=state._replace( dynamics=dynamics_state, planner=planner_state), info=MbrlInfo())
[docs] def predict_step(self, time_step: TimeStep, state): return self._predict_with_planning( time_step, state, epsilon_greedy=self._epsilon_greedy)
[docs] def rollout_step(self, time_step: TimeStep, state): # note epsilon_greedy # 0.1 for random exploration return self._predict_with_planning( time_step, state, epsilon_greedy=0.0)
[docs] def train_step(self, inputs: TimeStep, state: MbrlState, rollout_info=None): dynamics_step = self._dynamics_module.train_step( inputs, state.dynamics) reward_step = self._reward_module.train_step(inputs, state.reward) plan_step = self._planner_module.train_step(inputs, state.planner) state = MbrlState( dynamics=dynamics_step.state, reward=reward_step.state, planner=plan_step.state) info = MbrlInfo( dynamics=dynamics_step.info, reward=reward_step.info, planner=plan_step.info) return AlgStep((), state, info)
[docs] def calc_loss(self, training_info): loss_dynamics = self._dynamics_module.calc_loss(training_info.dynamics) loss = loss_dynamics.loss loss = add_ignore_empty(loss, training_info.reward) loss = add_ignore_empty(loss, training_info.planner) return LossInfo(loss=loss, scalar_loss=loss_dynamics.scalar_loss)
[docs] def after_update(self, root_inputs, training_info): self._planner_module.after_update( root_inputs, training_info._replace(planner=training_info.planner))
[docs]@alf.configurable class LatentMbrlAlgorithm(MbrlAlgorithm): """Model-based RL algorithm in a latent space. """ def __init__(self, observation_spec, action_spec, planner_module_ctor: Callable[[Any, Any], PlanAlgorithm], reward_spec=TensorSpec(()), env=None, config: TrainerConfig = None, planner_optimizer=None, debug_summaries=False, name="LatentMbrlAlgorithm"): """Create an LatentMbrlAlgorithm. The LatentMbrlAlgorithm takes as input a planner module for making decisions on actions based on the latent representation of the current observation as well as a latent dynamics model. The latent representation as well as the latent dynamics is provided by a latent predictive representation module, which is an instance of ``PredictiveRepresentationLearner``. It is set through the ``set_latent_predictive_representation_module()`` function. The latent predictive representation module should have a function ``predict_multi_step`` for performing multi-step imagined rollout. Currently it is assumed that the training of the latent representation module is outside of the ``LatentMbrlAlgorithm``, although the ``LatentMbrlAlgorithm`` can also contribute to its training by using the latent representation in loss calculation. Args: observation_spec (nested TensorSpec): representing the observations. action_spec (BoundedTensorSpec): representing the actions. planner_module_ctor: used to constrcut module for generating planned action based on specified reward function and dynamics function reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing the reward(s). env (Environment): The environment to interact with. env is a batched environment, which means that it runs multiple simulations simultateously. env only needs to be provided to the root Algorithm. config (TrainerConfig): config for training. config only needs to be provided to the algorithm which performs `train_iter()` by itself. debug_summaries (bool): True if debug summaries should be created. name (str): The name of this algorithm. """ super().__init__( observation_spec, feature_spec=observation_spec, action_spec=action_spec, reward_spec=reward_spec, dynamics_module_ctor=None, reward_module=None, planner_module_ctor=planner_module_ctor, planner_optimizer=planner_optimizer, env=env, config=config, debug_summaries=debug_summaries, name=name) flat_action_spec = nest.flatten(action_spec) action_spec = flat_action_spec[0] assert action_spec.is_continuous, "only support \ continious control" num_actions = action_spec.shape[-1] self._action_spec = action_spec self._num_actions = num_actions self._latent_pred_rep_module = None # set it later
[docs] def set_latent_predictive_representation_module( self, latent_pred_rep_module: PredictiveRepresentationLearner): self._latent_pred_rep_module = latent_pred_rep_module
def _trainable_attributes_to_ignore(self): return ['_latent_pred_rep_module'] @torch.no_grad() def _predict_multi_step_cost(self, init_rep, actions): """Compute the total cost by unrolling multiple steps according to the given initial observation and multi-step actions. Args: init_rep: the current observation for predicting quantities of future time steps of shape [B, d] actions (Tensor): a set of action sequences to shape [B, population, unroll_steps, action_dim] Returns: cost (Tensor): negation of accumulated predicted reward, with the shape of [B, population] """ batch_size, population_size, num_unroll_steps = actions.shape[0:3] init_rep = self._expand_to_population(init_rep, population_size) # merge batch with population # [B, population, unroll_steps, ...] -> [B*population, unroll_steps, ...] actions = torch.reshape(actions, (-1, *actions.shape[2:])) pred_rewards = self._latent_pred_rep_module.predict_multi_step( init_rep, actions, target_field="reward") pred_rewards = pred_rewards.view(num_unroll_steps + 1, batch_size, population_size, -1) # [B, population, unroll_steps, reward_dim] # here we remove the predicted reward of the current step, # which is irrelevant to the optimization of future actions pred_rewards = pred_rewards[1:].permute(1, 2, 0, 3) # currently assume the first dimension is the overall reward # [B, population, unroll_steps] pred_rewards = pred_rewards[..., 0] cost = -pred_rewards cost = cost.sum(2) return cost def _predict_with_planning(self, time_step: TimeStep, state, epsilon_greedy): action, planner_state = self._planner_module.predict_plan( time_step, state.planner, epsilon_greedy) return AlgStep( output=action, state=state._replace(planner=planner_state), info=MbrlInfo())
[docs] def train_step(self, exp: Experience, state: MbrlState, rollout_info=None): # overwrite the behavior of base class ``train_step`` return AlgStep(output=(), state=state, info=MbrlInfo())
[docs] def calc_loss(self, training_info: MbrlInfo): # overwrite the behavior of base class ``calc_loss`` return LossInfo()