Source code for alf.algorithms.lagrangian_reward_weight_algorithm

# Copyright (c) 2021 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""LagrangianRewardWeightAlgorithm."""

from functools import partial

import torch
import torch.nn as nn
from torch.nn import functional as F

import alf
from alf.algorithms.algorithm import Algorithm
from alf.data_structures import namedtuple, AlgStep, LossInfo
from alf.tensor_specs import TensorSpec
from alf.utils import tensor_utils
from alf.utils.averager import EMAverager

LagInfo = namedtuple("LagInfo", ["rollout_reward"], default_value=())


def _inv_softplus(tensor):
    return torch.where(tensor > 20., tensor, tensor.expm1().log())


[docs]@alf.configurable(blacklist=["reward_spec"]) class LagrangianRewardWeightAlgorithm(Algorithm): """An algorithm that adjusts reward weights according to untransformed rollout rewards. The adjustment is expected to be performed after every training iteration. Generally speaking, for each reward dimension, the algorithm compares an individual reward per step to an average expected threshold, and if the reward is greater than the threshold (requirement satisfied) then it decreases the reward weight; otherwise it increases the weight. .. note:: This algorithm doesn't put a constraint on per-step basis since it only learns a single, state-independent weight for each reward dim. Also, a reward is always assumed to be the higher the better. """ def __init__(self, reward_spec, reward_thresholds, optimizer, init_weights=1., max_weight=None, reward_weight_normalization=True, lambda_transform=F.softplus, debug_summaries=False, name="LagrangianRewardWeightAlgorithm"): """ Args: reward_spec (TensorSpec): a rank-1 tensor spec representing multi-dim rewards. reward_thresholds (list[float]|None]): a list of floating numbers, each representing a desired minimum reward threshold in expectation. If any entry is None, then the corresponding reward weight won't be tuned; either its init value or its normalized init value (if ``reward_weight_normalization=True``) will be used. optimizer (optimizer): optimizer for learning the reward weights. init_weights (float|list[float]): the initial reward weights. max_weight (float): the reward weights will be clipped up to this value reward_weight_normalization (bool): whether project the weights to a simplex (sum-to-one normalization) lambda_transform (Callable): the transform function to make sure all lambdas (reward weights) are positive. Currently only support ``F.softplus`` and ``torch.exp``. debug_summaries (bool): name (str): """ super(LagrangianRewardWeightAlgorithm, self).__init__( debug_summaries=debug_summaries, name=name) self._reward_spec = reward_spec assert reward_spec.numel > 1, ( "Only multi-dim reward needs this algorithm!") assert (isinstance(reward_thresholds, (list, tuple)) and len(reward_thresholds) == reward_spec.numel), ( "Mismatch between len(reward_weights)=%s and reward_dim=%s" % (len(reward_thresholds), reward_spec.numel)) self._reward_training_mask = torch.tensor( [t is not None for t in reward_thresholds], dtype=torch.float32) self._reward_thresholds = torch.tensor( [0. if t is None else t for t in reward_thresholds]) self._reward_weight_normalization = reward_weight_normalization lambda_init = torch.tensor(init_weights) if lambda_init.ndim == 0: lambda_init = tensor_utils.tensor_extend_new_dim( lambda_init, 0, reward_spec.numel) assert torch.all( lambda_init >= 0.), "Initial weights must be non-negative!" inv_mapping = dict() inv_mapping[F.softplus] = _inv_softplus inv_mapping[torch.exp] = torch.log # convert to softplus space self._lambda_transform = lambda_transform self._inv_lambda_transform = inv_mapping[lambda_transform] self._lambdas = nn.Parameter(self._inv_lambda_transform(lambda_init)) if max_weight is not None: self._max_lambda = self._inv_lambda_transform( torch.tensor(max_weight)) else: self._max_lambda = None self._optimizer = optimizer self._optimizer.add_param_group({'params': self._lambdas}) @property def reward_weights(self): """Return the detached reward weights. These weights are expected not to be changed by external code.""" weights = self._lambda_transform(self._lambdas).detach().clone() if self._reward_weight_normalization: weights = weights / weights.sum() return weights def _trainable_attributes_to_ignore(self): return ["_lambdas"]
[docs] def predict_step(self, inputs, state=None): return AlgStep()
[docs] def rollout_step(self, inputs, state=None): return AlgStep( info=LagInfo(rollout_reward=inputs.untransformed.reward))
def _calc_loss(self, train_info: LagInfo): """Retrieve *untransformed* rollout rewards from ``train_info`` and compute the loss for training lambdas. """ # [T, B, reward_dim] reward_weights = self._lambda_transform(self._lambdas) loss = ((train_info.rollout_reward - self._reward_thresholds).detach() * (reward_weights * self._reward_training_mask)) loss = loss.sum(dim=-1).mean() return LossInfo(scalar_loss=loss, extra=reward_weights)
[docs] def after_train_iter(self, root_inputs, train_info: LagInfo): """Perform one gradient step of updating lambdas.""" loss = self._calc_loss(train_info) loss, reward_weights = loss.scalar_loss, loss.extra self._optimizer.zero_grad() loss.backward() self._optimizer.step() # capped at the upper limit if self._max_lambda is not None: self._lambdas.data.copy_( torch.minimum(self._lambdas, self._max_lambda)) if self._debug_summaries: with alf.summary.scope(self._name): alf.summary.scalar("cost", loss) for i in range(len(self._reward_thresholds)): alf.summary.scalar("reward_threshold/%d" % i, self._reward_thresholds[i]) alf.summary.scalar("lambda/%d" % i, reward_weights[i])
[docs]@alf.configurable(blacklist=["reward_spec"]) class LagrangianPredRewardWeightAlgorithm(LagrangianRewardWeightAlgorithm): """Similar to ``LagrangianRewardWeightAlgorithm``, except that the rewards used to compare with the thresholds are collected by prediction steps instead of by rollout steps. For harsh target constraints, it is important to remove the rollout stochasticity otherwise the agent's constraint satisfaction ability will usually be under-estimated. Because prediction output is not directly passed to training, in order to use the rewards from prediction to train the weights, here we use an ``Averager`` to maintain the reward statistics. Inside every ``after_train_iter`` we perform a gradient step by querying the current averager value. .. note:: This algorithm asserts ``TrainerConfig.evaluate=True``. """ def __init__(self, reward_spec, reward_thresholds, optimizer, init_weights=1., max_weight=None, reward_weight_normalization=True, pred_rewards_averager_ctor=partial( EMAverager, update_rate=1e-4), debug_summaries=False, name="LagrangianPredRewardWeightAlgorithm"): """ Args: reward_spec (TensorSpec): a rank-1 tensor spec representing multi-dim rewards. reward_thresholds (list[float]|None]): a list of floating numbers, each representing a desired minimum reward threshold in expectation. If any entry is None, then the corresponding reward weight won't be tuned; either its init value or its normalized init value (if ``reward_weight_normalization=True``) will be used. optimizer (optimizer): optimizer for learning the reward weights. init_weights (float|list[float]): the initial reward weights. max_weight (float): the reward weights will be clipped up to this value reward_weight_normalization (bool): whether project the weights to a simplex (sum-to-one normalization) pred_rewards_averager_ctor (Callable): callable for creating an averager to maintain a moving average of prediction rewards. If None, ``EMAverager`` with an update rate of ``1e-4`` will be used. debug_summaries (bool): name (str): """ assert alf.get_config_value('TrainerConfig.evaluate'), ( "This algorithm must have the evaluation mode turned on!") super(LagrangianPredRewardWeightAlgorithm, self).__init__( reward_spec=reward_spec, reward_thresholds=reward_thresholds, optimizer=optimizer, init_weights=init_weights, max_weight=max_weight, reward_weight_normalization=reward_weight_normalization, debug_summaries=debug_summaries, name=name) self._pred_rewards_averager = pred_rewards_averager_ctor(reward_spec)
[docs] def predict_step(self, inputs, state=None): self._pred_rewards_averager.update(inputs.untransformed.reward) return AlgStep()
def _calc_loss(self, train_info: LagInfo): """Retrieve *untransformed* prediction rewards from the averager and train lambdas. """ # [T, B, reward_dim] reward_weights = self._lambda_transform(self._lambdas) pred_rewards = self._pred_rewards_averager.get() loss = ((pred_rewards - self._reward_thresholds).detach() * (reward_weights * self._reward_training_mask)) loss = loss.sum() if self._debug_summaries: with alf.summary.scope(self._name): for i in range(len(self._reward_thresholds)): alf.summary.scalar("average_pred_reward/%d" % i, pred_rewards[i]) return LossInfo(scalar_loss=loss, extra=reward_weights)