Source code for alf.algorithms.bc_algorithm

# Copyright (c) 2022 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Behavior Cloning (BC) Algorithm."""

import alf
from alf.algorithms.config import TrainerConfig
from alf.algorithms.off_policy_algorithm import OffPolicyAlgorithm
from alf.data_structures import TimeStep, LossInfo, namedtuple
from alf.data_structures import AlgStep
from alf.networks import ActorNetwork
from alf.tensor_specs import TensorSpec, BoundedTensorSpec
from alf.utils import dist_utils

BcState = namedtuple("BcState", ["actor"], default_value=())

BcInfo = namedtuple("BcInfo", ["actor"], default_value=())

BcLossInfo = namedtuple("LossInfo", ["actor"], default_value=())


[docs]@alf.configurable class BcAlgorithm(OffPolicyAlgorithm): r"""Behavior cloning algorithm. Behavior cloning is an offline approach to learn a policy :math:`\pi_{\theta}(a|s)`, which is a function that maps an input observation :math:`s` to an action :math:`a`. The paramerates (:math:`\theta`) of this policy is learned by using the expert action as supervision for training, e.g., by maximizing the probability of the expert actions on the training data :math:`D`: :math:`\max_{\theta} E_{(s,a)~D}\log \pi_{\theta}(a|s)` Reference: :: Pomerleau ALVINN: An Autonomous Land Vehicle in a Neural Network, NeurIPS 1988. """ def __init__(self, observation_spec, action_spec: BoundedTensorSpec, reward_spec=TensorSpec(()), actor_network_cls=ActorNetwork, actor_optimizer=None, env=None, config: TrainerConfig = None, checkpoint=None, debug_summaries=False, epsilon_greedy=None, name="BcAlgorithm"): """ Args: observation_spec (nested TensorSpec): representing the observations. action_spec (nested BoundedTensorSpec): representing the actions; can be a mixture of discrete and continuous actions. The number of continuous actions can be arbitrary while only one discrete action is allowed currently. If it's a mixture, then it must be a tuple/list ``(discrete_action_spec, continuous_action_spec)``. reward_spec (Callable): a rank-1 or rank-0 tensor spec representing the reward(s). For interface compatiblity purpose. Not actually used in BcAlgorithm. actor_network_cls (Callable): is used to construct the actor network. The constructed actor network is a determinstic network and will be used to generate continuous actions. actor_optimizer (torch.optim.optimizer): The optimizer for actor. env (Environment): The environment to interact with. ``env`` is a batched environment, which means that it runs multiple simulations simultateously. ``env` only needs to be provided to the root algorithm. config (TrainerConfig): config for training. It only needs to be provided to the algorithm which performs ``train_iter()`` by itself. checkpoint (None|str): a string in the format of "prefix@path", where the "prefix" is the multi-step path to the contents in the checkpoint to be loaded. "path" is the full path to the checkpoint file saved by ALF. Refer to ``Algorithm`` for more details. debug_summaries (bool): True if debug summaries should be created. epsilon_greedy (float): a floating value in [0,1], representing the chance of action sampling instead of taking argmax. This can help prevent a dead loop in some deterministic environment like Breakout. Only used for evaluation. If None, its value is taken from ``config.epsilon_greedy`` and then ``alf.get_config_value(TrainerConfig.epsilon_greedy)``. name (str): The name of this algorithm. """ if epsilon_greedy is None: epsilon_greedy = alf.utils.common.get_epsilon_greedy(config) self._epsilon_greedy = epsilon_greedy actor_network = actor_network_cls( input_tensor_spec=observation_spec, action_spec=action_spec) action_state_spec = actor_network.state_spec super().__init__( observation_spec=observation_spec, action_spec=action_spec, reward_spec=reward_spec, train_state_spec=BcState(actor=action_state_spec), predict_state_spec=BcState(actor=action_state_spec), reward_weights=None, env=env, config=config, checkpoint=checkpoint, debug_summaries=debug_summaries, name=name) self._actor_network = actor_network if actor_optimizer is not None and actor_network is not None: self.add_optimizer(actor_optimizer, [actor_network]) self._actor_optimizer = actor_optimizer def _predict_action(self, observation, state): action_dist, actor_network_state = self._actor_network( observation, state=state) return action_dist, actor_network_state
[docs] def predict_step(self, inputs: TimeStep, state: BcState): action_dist, new_state = self._predict_action( inputs.observation, state=state.actor) action = dist_utils.epsilon_greedy_sample(action_dist, self._epsilon_greedy) return AlgStep(output=action, state=BcState(actor=new_state))
def _actor_train_step_imitation(self, inputs: TimeStep, rollout_info, action_dist): exp_action = rollout_info.action im_loss = -action_dist.log_prob(exp_action) actor_info = LossInfo(loss=im_loss, extra=BcLossInfo(actor=im_loss)) return actor_info
[docs] def train_step_offline(self, inputs: TimeStep, state, rollout_info, pre_train=False): action_dist, new_state = self._predict_action( inputs.observation, state=state.actor) actor_loss = self._actor_train_step_imitation(inputs, rollout_info, action_dist) if self._debug_summaries and alf.summary.should_record_summaries(): with alf.summary.scope(self._name): alf.summary.scalar("imitation_loss", actor_loss.loss.mean()) info = BcInfo(actor=actor_loss) return AlgStep( rollout_info.action, state=BcState(actor=new_state), info=info)
[docs] def calc_loss_offline(self, info, pre_train=False): actor_loss = info.actor return LossInfo( loss=actor_loss.loss, extra=BcLossInfo(actor=actor_loss.extra))