Source code for alf.algorithms.icm_algorithm

# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch

import alf
from alf.algorithms.algorithm import Algorithm
from alf.data_structures import TimeStep, namedtuple, AlgStep, LossInfo, StepType
from alf.networks import EncodingNetwork
from alf.nest.utils import NestConcat
from alf.tensor_specs import TensorSpec
from alf.utils import math_ops
from alf.utils.normalizers import ScalarAdaptiveNormalizer, AdaptiveNormalizer

ICMInfo = namedtuple("ICMInfo", ["step_type", "forward_loss", "inverse_loss"])


[docs]@alf.configurable class ICMAlgorithm(Algorithm): """Intrinsic Curiosity Module This module generate the intrinsic reward based on predition error of observation. See Pathak et al "Curiosity-driven Exploration by Self-supervised Prediction" """ def __init__(self, action_spec, observation_spec=None, hidden_size=256, reward_adapt_speed=8.0, encoding_net: EncodingNetwork = None, forward_net: EncodingNetwork = None, inverse_net: EncodingNetwork = None, activation=torch.relu_, optimizer=None, name="ICMAlgorithm"): """Create an ICMAlgorithm. Args action_spec (nested TensorSpec): agent's action spec observation_spec (nested TensorSpec): agent's observation spec. If not None, then a normalizer will be used to normalize the observation. hidden_size (int or tuple[int]): size of hidden layer(s) reward_adapt_speed (float): how fast to adapt the reward normalizer. rouphly speaking, the statistics for the normalization is calculated mostly based on the most recent T/speed samples, where T is the total number of samples. encoding_net (Network): network for encoding observation into a latent feature. Its input is same as the input of this algorithm. forward_net (Network): network for predicting next feature based on previous feature and action. It should accept input with spec [feature_spec, encoded_action_spec] and output a tensor of shape feature_spec. For discrete action, encoded_action is an one-hot representation of the action. For continuous action, encoded action is same as the original action. inverse_net (Network): network for predicting previous action given the previous feature and current feature. It should accept input with spec [feature_spec, feature_spec] and output tensor of shape (num_actions,). activation (torch.nn.functional): activation used for constructing any of the forward net and inverse net, if not provided. optimizer (torch.optim.Optimizer): The optimizer for training name (str): """ if encoding_net is not None: feature_spec = encoding_net.output_spec else: feature_spec = observation_spec super(ICMAlgorithm, self).__init__( train_state_spec=feature_spec, predict_state_spec=(), optimizer=optimizer, name=name) flat_action_spec = alf.nest.flatten(action_spec) assert len( flat_action_spec) == 1, "ICM doesn't suport nested action_spec" flat_feature_spec = alf.nest.flatten(feature_spec) assert len( flat_feature_spec) == 1, "ICM doesn't support nested feature_spec" action_spec = flat_action_spec[0] if action_spec.is_discrete: self._num_actions = int(action_spec.maximum - action_spec.minimum + 1) else: self._num_actions = action_spec.shape[-1] self._action_spec = action_spec self._observation_normalizer = None if observation_spec is not None: self._observation_normalizer = AdaptiveNormalizer( tensor_spec=observation_spec) feature_dim = flat_feature_spec[0].shape[-1] self._encoding_net = encoding_net if isinstance(hidden_size, int): hidden_size = (hidden_size, ) if forward_net is None: encoded_action_spec = TensorSpec((self._num_actions, ), dtype=torch.float32) forward_net = EncodingNetwork( name="forward_net", input_tensor_spec=[feature_spec, encoded_action_spec], preprocessing_combiner=NestConcat(), fc_layer_params=hidden_size, activation=activation, last_layer_size=feature_dim, last_activation=math_ops.identity) self._forward_net = forward_net if inverse_net is None: inverse_net = EncodingNetwork( name="inverse_net", input_tensor_spec=[feature_spec, feature_spec], preprocessing_combiner=NestConcat(), fc_layer_params=hidden_size, activation=activation, last_layer_size=self._num_actions, last_activation=math_ops.identity, last_kernel_initializer=torch.nn.init.zeros_) self._inverse_net = inverse_net self._reward_normalizer = ScalarAdaptiveNormalizer( speed=reward_adapt_speed) def _encode_action(self, action): if self._action_spec.is_discrete: return torch.nn.functional.one_hot(action, self._num_actions).to( torch.float32) else: return action def _step(self, time_step: TimeStep, state, calc_rewards=True): """This step is for both `rollout_step` and `train_step`. Args: time_step (TimeStep): input time_step data for ICM state (Tensor): state for ICM (previous observation) calc_rewards (bool): whether calculate rewards Returns: AlgStep: output: empty tuple () state: observation info (ICMInfo): """ feature = time_step.observation prev_action = time_step.prev_action.detach() # normalize observation for easier prediction if self._observation_normalizer is not None: feature = self._observation_normalizer.normalize(feature) if self._encoding_net is not None: feature, _ = self._encoding_net(feature) prev_feature = state forward_pred, _ = self._forward_net( [prev_feature.detach(), self._encode_action(prev_action)]) # nn.MSELoss doesn't support reducing along a dim forward_loss = 0.5 * torch.mean( math_ops.square(forward_pred - feature.detach()), dim=-1) action_pred, _ = self._inverse_net([prev_feature, feature]) if self._action_spec.is_discrete: inverse_loss = torch.nn.CrossEntropyLoss(reduction='none')( input=action_pred, target=prev_action.to(torch.int64)) else: # nn.MSELoss doesn't support reducing along a dim inverse_loss = 0.5 * torch.mean( math_ops.square(action_pred - prev_action), dim=-1) intrinsic_reward = () if calc_rewards: intrinsic_reward = forward_loss.detach() intrinsic_reward = self._reward_normalizer.normalize( intrinsic_reward) return AlgStep( output=intrinsic_reward, state=feature, info=ICMInfo( step_type=time_step.step_type, forward_loss=forward_loss, inverse_loss=inverse_loss))
[docs] def predict_step(self, inputs: TimeStep, state): return self._step(inputs, state)
[docs] def rollout_step(self, inputs: TimeStep, state): return self._step(inputs, state)
[docs] def train_step(self, inputs: TimeStep, state, rollout_info=None): return self._step(inputs, state, calc_rewards=False)
[docs] def calc_loss(self, info: ICMInfo): mask = (info.step_type != StepType.FIRST).to(torch.float32) forward_loss = (info.forward_loss * mask).mean() inverse_loss = (info.inverse_loss * mask).mean() return LossInfo( scalar_loss=forward_loss + inverse_loss, extra=dict(forward_loss=forward_loss, inverse_loss=inverse_loss))