Source code for alf.algorithms.agent

# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Agent for integrating multiple algorithms."""

import copy
from typing import Callable

import alf
from alf.algorithms.actor_critic_algorithm import ActorCriticAlgorithm
from alf.algorithms.agent_helpers import AgentHelper
from alf.algorithms.config import TrainerConfig
from alf.algorithms.entropy_target_algorithm import (
    EntropyTargetAlgorithm, NestedEntropyTargetAlgorithm)
from alf.algorithms.icm_algorithm import ICMAlgorithm
from alf.algorithms.mbrl_algorithm import LatentMbrlAlgorithm
from alf.algorithms.predictive_representation_learner import \
                            PredictiveRepresentationLearner
from alf.algorithms.rl_algorithm import RLAlgorithm
from alf.data_structures import AlgStep, Experience
from alf.data_structures import TimeStep, namedtuple
from alf.tensor_specs import TensorSpec

AgentState = namedtuple(
    "AgentState", ["rl", "irm", "goal_generator", "repr", "rw"],
    default_value=())

AgentInfo = namedtuple(
    "AgentInfo",
    ["rl", "irm", "goal_generator", "entropy_target", "repr", "rw", "rewards"],
    default_value=())


[docs]@alf.configurable class Agent(RLAlgorithm): """Agent is a master algorithm that integrates different algorithms together. """ def __init__(self, observation_spec, action_spec, reward_spec=TensorSpec(()), env=None, config: TrainerConfig = None, rl_algorithm_cls=ActorCriticAlgorithm, reward_weight_algorithm_cls=None, representation_learner_cls=None, representation_use_rl_state: bool = False, goal_generator=None, intrinsic_reward_module=None, intrinsic_reward_coef=1.0, extrinsic_reward_coef=1.0, enforce_entropy_target=False, entropy_target_cls=None, optimizer=None, debug_summaries=False, name="AgentAlgorithm"): """Args: observation_spec (nested TensorSpec): representing the observations. action_spec (nested BoundedTensorSpec): representing the actions. reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing the reward(s). env (Environment): The environment to interact with. ``env`` is a batched environment, which means that it runs multiple simulations simultaneously. Running multiple environments in parallel is crucial to on-policy algorithms as it increases the diversity of data and decreases temporal correlation. ``env`` only needs to be provided to the root ``Algorithm``. config (TrainerConfig): config for training. config only needs to be provided to the algorithm which performs ``train_iter()`` by itself. rl_algorithm_cls (type): The algorithm class for learning the policy. It will be called as ``rl_algorithm_cls(observation_spec=?, action_spec=?, reward_spec=?, config=?, debug_summaries=?)``. reward_weight_algorithm_cls (type): The algorithm class for adjusting reward weights when multi-dim rewards are used. If provided, the the default ``reward_weights`` of ``rl_algorithm`` will be overwritten by this algorithm. representation_learner_cls (type): The algorithm class for learning the representation. If provided, the constructed learner will calculate the representation from the original observation as the observation for downstream algorithms such as ``rl_algorithm``. Similar to rl_algorithm_cls, it will be called as ``rl_algorithm_cls(observation_spec=?, action_spec=?, reward_spec=?, config=?, debug_summaries=?)``. representation_use_rl_state: When set to True, representation learner will receive (previous) state from the RL algorithm as input instead of its own state for ``rollout_step()`` and ``predict_step()``. This is particularly useful for algorithm such as MuZero representation learner, whose reanalyze component requires access to the RL algorithm's state. intrinsic_reward_module (Algorithm): an algorithm whose outputs is a scalar intrinsic reward. goal_generator (Algorithm): an algorithm which outputs a tuple of goal vector and a reward. The reward can be ``()`` if no reward is given. intrinsic_reward_coef (float): Coefficient for intrinsic reward extrinsic_reward_coef (float): Coefficient for extrinsic reward enforce_entropy_target (bool): If True, use ``(Nested)EntropyTargetAlgorithm`` to dynamically adjust entropy regularization so that entropy is not smaller than ``entropy_target`` supplied for constructing ``(Nested)EntropyTargetAlgorithm``. If this is enabled, make sure you don't use ``entropy_regularization`` for loss (see ``ActorCriticLoss`` or ``PPOLoss``). In order to use this, The ``AlgStep.info`` from ``rl_algorithm_cls.train_step()`` and ``rl_algorithm_cls.rollout_step()`` needs to contain ``action_distribution``. entropy_target_cls (type): If provided, will be used to dynamically adjust entropy regularization. optimizer (optimizer): The optimizer for training debug_summaries (bool): True if debug summaries should be created. name (str): Name of this algorithm. """ agent_helper = AgentHelper(AgentState) rl_observation_spec = observation_spec ## 0. representation learner representation_learner = None if representation_learner_cls is not None: representation_learner = representation_learner_cls( observation_spec=rl_observation_spec, action_spec=action_spec, reward_spec=reward_spec, config=config, debug_summaries=debug_summaries) rl_observation_spec = representation_learner.output_spec agent_helper.register_algorithm(representation_learner, "repr") self._representation_use_rl_state = representation_use_rl_state ## 1. goal generator if goal_generator is not None: agent_helper.register_algorithm(goal_generator, "goal_generator") rl_observation_spec = [ rl_observation_spec, goal_generator.action_spec ] ## 2. rl algorithm rl_algorithm = rl_algorithm_cls( observation_spec=rl_observation_spec, action_spec=action_spec, reward_spec=reward_spec, config=config, debug_summaries=debug_summaries) agent_helper.register_algorithm(rl_algorithm, "rl") if isinstance(rl_algorithm, LatentMbrlAlgorithm): assert isinstance(representation_learner, PredictiveRepresentationLearner), ( "need to use " "PredictiveRepresentationLearner") rl_algorithm.set_latent_predictive_representation_module( representation_learner) ## 3. intrinsic motivation module if intrinsic_reward_module is not None: agent_helper.register_algorithm(intrinsic_reward_module, "irm") ## 4. entropy target entropy_target_algorithm = None if entropy_target_cls or enforce_entropy_target: if entropy_target_cls is None: if alf.nest.is_nested(action_spec): entropy_target_cls = NestedEntropyTargetAlgorithm else: entropy_target_cls = EntropyTargetAlgorithm entropy_target_algorithm = entropy_target_cls( action_spec, debug_summaries=debug_summaries) agent_helper.register_algorithm(entropy_target_algorithm, "entropy_target") # 5. reward weight algorithm reward_weight_algorithm = None if reward_weight_algorithm_cls is not None: reward_weight_algorithm = reward_weight_algorithm_cls( reward_spec=reward_spec, debug_summaries=debug_summaries) agent_helper.register_algorithm(reward_weight_algorithm, "rw") # Initialize the reward weights of the rl algorithm rl_algorithm.set_reward_weights( reward_weight_algorithm.reward_weights) super().__init__( observation_spec=observation_spec, action_spec=action_spec, reward_spec=reward_spec, optimizer=optimizer, is_on_policy=rl_algorithm.on_policy, env=env, config=config, debug_summaries=debug_summaries, name=name, **agent_helper.state_specs()) for alg in (representation_learner, goal_generator, intrinsic_reward_module, entropy_target_algorithm, reward_weight_algorithm): if alg is not None: alg.set_on_policy(self.on_policy) self._representation_learner = representation_learner self._rl_algorithm = rl_algorithm self._reward_weight_algorithm = reward_weight_algorithm self._entropy_target_algorithm = entropy_target_algorithm self._intrinsic_reward_coef = intrinsic_reward_coef self._extrinsic_reward_coef = extrinsic_reward_coef self._irm = intrinsic_reward_module self._goal_generator = goal_generator self._agent_helper = agent_helper # Set ``use_rollout_state``` for all submodules using the setter. # Need to make sure that no submodules use ``self._use_rollout_state`` # before this line. self.use_rollout_state = self.use_rollout_state
[docs] def set_path(self, path): super().set_path(path) self._agent_helper.set_path(path)
[docs] def predict_step(self, time_step: TimeStep, state: AgentState): """Predict for one step.""" new_state = AgentState() observation = time_step.observation info = AgentInfo() if self._representation_learner is not None: input_state = state.rl if self._representation_use_rl_state else state.repr repr_step = self._representation_learner.predict_step( time_step, input_state) new_state = new_state._replace(repr=repr_step.state) info = info._replace(repr=repr_step.info) observation = repr_step.output if self._goal_generator is not None: goal_step = self._goal_generator.predict_step( time_step._replace(observation=observation), state.goal_generator) goal, goal_reward = goal_step.output new_state = new_state._replace(goal_generator=goal_step.state) info = info._replace(goal_generator=goal_step.info) observation = [observation, goal] rl_step = self._rl_algorithm.predict_step( time_step._replace(observation=observation), state.rl) new_state = new_state._replace(rl=rl_step.state) info = info._replace(rl=rl_step.info) return AlgStep(output=rl_step.output, state=new_state, info=info)
[docs] def rollout_step(self, time_step: TimeStep, state: AgentState): """Rollout for one step.""" new_state = AgentState() info = AgentInfo() observation = time_step.observation if self._representation_learner is not None: input_state = state.rl if self._representation_use_rl_state else state.repr repr_step = self._representation_learner.rollout_step( time_step, input_state) new_state = new_state._replace(repr=repr_step.state) info = info._replace(repr=repr_step.info) observation = repr_step.output rewards = {} if self._goal_generator is not None: goal_step = self._goal_generator.rollout_step( time_step._replace(observation=observation), state.goal_generator) new_state = new_state._replace(goal_generator=goal_step.state) info = info._replace(goal_generator=goal_step.info) goal, goal_reward = goal_step.output observation = [observation, goal] if goal_reward != (): rewards['goal_generator'] = goal_reward if self._irm is not None: irm_step = self._irm.rollout_step( time_step._replace(observation=observation), state=state.irm) info = info._replace(irm=irm_step.info) new_state = new_state._replace(irm=irm_step.state) rewards['irm'] = irm_step.output if rewards: info = info._replace(rewards=rewards) overall_reward = self._calc_overall_reward(time_step.reward, rewards) else: overall_reward = time_step.reward rl_time_step = time_step._replace( observation=observation, reward=overall_reward) rl_step = self._rl_algorithm.rollout_step(rl_time_step, state.rl) new_state = new_state._replace(rl=rl_step.state) info = info._replace(rl=rl_step.info) if self._entropy_target_algorithm: assert 'action_distribution' in rl_step.info._fields, ( "AlgStep from rl_algorithm.rollout() does not contain " "`action_distribution`, which is required by " "`enforce_entropy_target`") et_step = self._entropy_target_algorithm.rollout_step( (rl_step.info.action_distribution, time_step.step_type)) info = info._replace(entropy_target=et_step.info) if self._reward_weight_algorithm: rw_step = self._reward_weight_algorithm.rollout_step( time_step, state.rw) info = info._replace(rw=rw_step.info) return AlgStep(output=rl_step.output, state=new_state, info=info)
[docs] def train_step(self, time_step: TimeStep, state, rollout_info): new_state = AgentState() info = AgentInfo(rewards=rollout_info.rewards) observation = time_step.observation if self._representation_learner is not None: repr_step = self._representation_learner.train_step( time_step, state.repr, rollout_info.repr) new_state = new_state._replace(repr=repr_step.state) info = info._replace(repr=repr_step.info) observation = repr_step.output if self._goal_generator is not None: goal_step = self._goal_generator.train_step( time_step._replace(observation=observation), state.goal_generator, rollout_info.goal_generator) goal, goal_reward = goal_step.output info = info._replace(goal_generator=goal_step.info) new_state = new_state._replace(goal_generator=goal_step.state) observation = [observation, goal] if self._irm is not None: irm_step = self._irm.train_step( time_step._replace(observation=observation), state=state.irm) info = info._replace(irm=irm_step.info) new_state = new_state._replace(irm=irm_step.state) rl_step = self._rl_algorithm.train_step( time_step._replace(observation=observation), state.rl, rollout_info.rl) new_state = new_state._replace(rl=rl_step.state) info = info._replace(rl=rl_step.info) if self._entropy_target_algorithm: assert 'action_distribution' in rl_step.info._fields, ( "PolicyStep from rl_algorithm.train_step() does not contain " "`action_distribution`, which is required by " "`enforce_entropy_target`") et_step = self._entropy_target_algorithm.train_step( (rl_step.info.action_distribution, time_step.step_type)) info = info._replace(entropy_target=et_step.info) return AlgStep(output=rl_step.output, state=new_state, info=info)
[docs] def train_step_offline(self, time_step: TimeStep, state, rollout_info, pre_train): new_state = AgentState() info = AgentInfo(rewards=rollout_info.rewards) observation = time_step.observation if self._representation_learner is not None: repr_step = self._representation_learner.train_step_offline( time_step, state.repr, rollout_info.repr) new_state = new_state._replace(repr=repr_step.state) info = info._replace(repr=repr_step.info) observation = repr_step.output if self._goal_generator is not None: goal_step = self._goal_generator.train_step_offline( time_step._replace(observation=observation), state.goal_generator, rollout_info.goal_generator) goal, goal_reward = goal_step.output info = info._replace(goal_generator=goal_step.info) new_state = new_state._replace(goal_generator=goal_step.state) observation = [observation, goal] if self._irm is not None: irm_step = self._irm.train_step_offline( time_step._replace(observation=observation), state=state.irm) info = info._replace(irm=irm_step.info) new_state = new_state._replace(irm=irm_step.state) rl_step = self._rl_algorithm.train_step_offline( time_step._replace(observation=observation), state.rl, rollout_info.rl, pre_train) new_state = new_state._replace(rl=rl_step.state) info = info._replace(rl=rl_step.info) if self._entropy_target_algorithm: assert 'action_distribution' in rl_step.info._fields, ( "PolicyStep from rl_algorithm.train_step() does not contain " "`action_distribution`, which is required by " "`enforce_entropy_target`") et_step = self._entropy_target_algorithm.train_step_offline( (rl_step.info.action_distribution, time_step.step_type)) info = info._replace(entropy_target=et_step.info) return AlgStep(output=rl_step.output, state=new_state, info=info)
def _calc_overall_reward(self, extrinsic_reward, intrinsic_rewards): overall_reward = extrinsic_reward if self._extrinsic_reward_coef != 1: overall_reward *= self._extrinsic_reward_coef if 'irm' in intrinsic_rewards: overall_reward += self._intrinsic_reward_coef * intrinsic_rewards[ 'irm'] if 'goal_generator' in intrinsic_rewards: overall_reward += intrinsic_rewards['goal_generator'] return overall_reward
[docs] def calc_loss(self, info: AgentInfo): """Calculate loss.""" if info.rewards != (): for name, reward in info.rewards.items(): self.summarize_reward("reward/%s" % name, reward) algorithms = [ self._representation_learner, self._rl_algorithm, self._irm, self._goal_generator, self._entropy_target_algorithm ] algorithms = list(filter(lambda a: a is not None, algorithms)) return self._agent_helper.accumulate_loss_info(algorithms, info)
[docs] def calc_loss_offline(self, info, pre_train): """Calculate loss for the offline RL branch.""" if info.rewards != (): for name, reward in info.rewards.items(): self.summarize_reward("reward_offline/%s" % name, reward) algorithms = [ self._representation_learner, self._rl_algorithm, self._irm, self._goal_generator, self._entropy_target_algorithm ] algorithms = list(filter(lambda a: a is not None, algorithms)) return self._agent_helper.accumulate_loss_info(algorithms, info, True, pre_train)
[docs] def after_update(self, experience, train_info: AgentInfo): """Call ``after_update()`` of the RL algorithm and goal generator, respectively. """ algorithms = [ self._rl_algorithm, self._representation_learner, self._goal_generator ] algorithms = list(filter(lambda a: a is not None, algorithms)) self._agent_helper.after_update(algorithms, experience, train_info)
[docs] def after_train_iter(self, experience, info: AgentInfo): """Call ``after_train_iter()`` of the RL algorithm and goal generator, respectively. """ algorithms = [ self._rl_algorithm, self._representation_learner, self._goal_generator, self._reward_weight_algorithm ] algorithms = list(filter(lambda a: a is not None, algorithms)) self._agent_helper.after_train_iter(algorithms, experience, info) if self._reward_weight_algorithm: self._rl_algorithm.set_reward_weights( self._reward_weight_algorithm.reward_weights)
[docs] def preprocess_experience(self, root_inputs, rollout_info, batch_info): """Add intrinsic rewards to extrinsic rewards if there is an intrinsic reward module. Also call ``preprocess_experience()`` of the rl algorithm. """ exp = root_inputs rewards = rollout_info.rewards if rewards != (): rewards = copy.copy(rewards) rewards['overall'] = self._calc_overall_reward( root_inputs.reward, rewards) exp = exp._replace(reward=rewards['overall']) if self._representation_learner: exp, repr_info = self._representation_learner.preprocess_experience( exp, rollout_info.repr, batch_info) rollout_info = rollout_info._replace(repr=repr_info) exp, rl_info = self._rl_algorithm.preprocess_experience( exp, rollout_info.rl, batch_info) # Expand discounted_return in batch_info to the correct shape, and # populate to rl_info. if hasattr(rl_info, "discounted_return") and batch_info.discounted_return != (): discounted_return = batch_info.discounted_return.unsqueeze( 1).expand(exp.reward.shape[:2]) rl_info = rl_info._replace(discounted_return=discounted_return) return exp, rollout_info._replace(rl=rl_info)
[docs] def summarize_rollout(self, experience): """First call ``RLAlgorithm.summarize_rollout()`` to summarize basic rollout statisics. If the rl algorithm has overridden this function, then also call its customized version. """ super(Agent, self).summarize_rollout(experience) if (super(Agent, self).summarize_rollout.__func__ != self._rl_algorithm.summarize_rollout.__func__): self._rl_algorithm.summarize_rollout( experience._replace(rollout_info=experience.rollout_info.rl))