# Copyright (c) 2021 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Phasic Policy Gradient Algorithm."""
from __future__ import annotations
import torch
from typing import Callable, Optional
import alf
from alf.algorithms.ppg import DisjointPolicyValueNetwork, PPGRolloutInfo, PPGTrainInfo, PPGAuxAlgorithm, PPGAuxOptions, ppg_network_forward
from alf.algorithms.off_policy_algorithm import OffPolicyAlgorithm
from alf.algorithms.config import TrainerConfig
from alf.algorithms.ppo_loss import PPOLoss
from alf.networks import Network, EncodingNetwork
from alf.data_structures import TimeStep, AlgStep, LossInfo, make_experience
from alf.tensor_specs import TensorSpec
# TODO(breakds): When needed, implement the support for multi-dimensional reward.
[docs]@alf.configurable
class PPGAlgorithm(OffPolicyAlgorithm):
"""PPG Algorithm.
Implementation of the paper: https://arxiv.org/abs/2009.04416
PPG can be viewed as a variant of PPO, with two differences:
1. It uses a special network structure (DisjointPolicyValueNetwork) that has
an extra auxiliary value head in addition to the policy head and value
head. In the current implementation, the auxiliary value head also tries
to estimate the value function, similar to the (actual) value head.
2. It does PPO update in normal iterations. However, after every specified
number of iterations, it will perform auxiliary phase updates based on
auxiliary phase losses (different from PPO loss, see
algorithms/ppg/ppg_aux_phase_loss.py for details). Auxiliary phase
updates does not require new rollouts. Instead it is performed on all of
the experience collected since the last auxiliary phase update.
"""
def __init__(
self,
observation_spec,
action_spec,
reward_spec=TensorSpec(()),
env=None,
config: Optional[TrainerConfig] = None,
aux_options: PPGAuxOptions = PPGAuxOptions(),
encoding_network_ctor: Callable[..., Network] = EncodingNetwork,
policy_optimizer: Optional[torch.optim.Optimizer] = None,
aux_optimizer: Optional[torch.optim.Optimizer] = None,
epsilon_greedy=None,
checkpoint: Optional[str] = None,
debug_summaries: bool = False,
name: str = "PPGAlgorithm"):
"""Args:
observation_spec (nested TensorSpec): representing the observations.
action_spec (nested BoundedTensorSpec): representing the actions.
reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing
the reward(s).
env (Environment): The environment to interact with. env is a
batched environment, which means that it runs multiple
simulations simultateously. env only needs to be provided to the
root Algorithm. NOTE: env will default to None if PPGAlgorithm
is run via Agent.
config (TrainerConfig): config for training. config only needs to be
provided to the algorithm which performs ``train_iter()`` by
itself.
aux_options: Options that controls the auxiliary phase training.
encoding_network_ctor (Callable[[TensorSpec], Network]): Function to
construct the encoding network from an input tensor spec. The
constructed network will be called with ``forward(observation,
state)``.
policy_optimizer (torch.optim.Optimizer): The optimizer for training
the policy phase of PPG.
aux_optimizer (torch.optim.Optimizer): The optimizer for training
the auxiliary phase of PPG.
epsilon_greedy (float): a floating value in [0,1], representing the
chance of action sampling instead of taking argmax. This can
help prevent a dead loop in some deterministic environment like
Breakout. Only used for evaluation. If None, its value is taken
from ``config.epsilon_greedy`` and then
``alf.get_config_value(TrainerConfig.epsilon_greedy)``.
It is used in ``predict_step()`` during evaluation.
checkpoint (None|str): a string in the format of "prefix@path",
where the "prefix" is the multi-step path to the contents in the
checkpoint to be loaded. "path" is the full path to the checkpoint
file saved by ALF. Refer to ``Algorithm`` for more details.
debug_summaries (bool): True if debug summaries should be created.
name (str): Name of this algorithm.
"""
dual_actor_value_network = DisjointPolicyValueNetwork(
observation_spec=observation_spec,
action_spec=action_spec,
encoding_network_ctor=encoding_network_ctor)
super().__init__(
config=config,
env=env,
observation_spec=observation_spec,
action_spec=action_spec,
reward_spec=reward_spec,
predict_state_spec=dual_actor_value_network.state_spec,
train_state_spec=dual_actor_value_network.state_spec,
checkpoint=checkpoint,
optimizer=policy_optimizer)
# When aux phase update is enabled, a sub algorithm named
# "PPGAuxAlgorithm" will be created. The sub algorithm shares the same
# network as the main algorithm, but updates the parameters with a
# different loss and optimizer. ``_trainable_attributes_to_ignore()`` is
# defined to prevent the network parameters being managed by two
# different optimizers.
if aux_options.enabled:
self._aux_algorithm = PPGAuxAlgorithm(
observation_spec=observation_spec,
action_spec=action_spec,
reward_spec=reward_spec,
config=config,
optimizer=aux_optimizer,
dual_actor_value_network=dual_actor_value_network,
aux_options=aux_options,
debug_summaries=debug_summaries)
else:
# A None ``_aux_algorithm`` means not performaning aux
# phase update at all.
self._aux_algorithm = None
self._network = dual_actor_value_network
self._loss = PPOLoss(debug_summaries=debug_summaries)
if epsilon_greedy is None:
epsilon_greedy = alf.utils.common.get_epsilon_greedy(config)
self._predict_step_epsilon_greedy = epsilon_greedy
self._ensure_summary = alf.summary.EnsureSummary()
def _trainable_attributes_to_ignore(self):
return ['_aux_algorithm']
[docs] def rollout_step(self, inputs: TimeStep, state) -> AlgStep:
"""Rollout step for PPG algorithm
Besides running the network prediction, it does one extra thing to store
the experience in the auxiliary replay buffer so that it can be consumed
by the auxiliary phase updates.
"""
policy_step = ppg_network_forward(self._network, inputs, state)
if self._aux_algorithm:
exp = make_experience(inputs.cpu(), policy_step, state)
self._aux_algorithm.observe_for_aux_replay(exp)
return policy_step
[docs] def train_step(self, inputs: TimeStep, state,
plain_rollout_info: PPGRolloutInfo) -> AlgStep:
alg_step = ppg_network_forward(
self._network, inputs, state, require_aux=False)
train_info = PPGTrainInfo(
action=plain_rollout_info.action,
rollout_log_prob=plain_rollout_info.log_prob,
rollout_value=plain_rollout_info.value,
rollout_action_distribution=plain_rollout_info.
action_distribution).absorbed(alg_step.info)
return alg_step._replace(info=train_info)
[docs] def calc_loss(self, info: PPGTrainInfo) -> LossInfo:
return self._loss(info)
[docs] def predict_step(self, inputs: TimeStep, state):
return ppg_network_forward(
self._network,
inputs,
state,
epsilon_greedy=self._predict_step_epsilon_greedy)
[docs] def after_train_iter(self, experience, info: PPGTrainInfo):
"""Run auxiliary update if conditions are met
PPG requires running auxiliary update after certain number of
iterations policy update. This is checked and performed at the
after_train_iter() hook currently.
"""
if not self._aux_algorithm:
return
self._ensure_summary.tick()
if alf.summary.get_global_counter(
) % self._aux_algorithm.interval == 0:
with self._ensure_summary:
with alf.summary.scope(self._aux_algorithm.name):
self._aux_algorithm.train_from_replay_buffer(
update_global_counter=False)