Source code for alf.networks.actor_distribution_networks

# Copyright (c) 2021 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ActorDistributionNetwork and ActorRNNDistributionNetwork."""
from typing import Callable
from functools import partial

import torch
import torch.distributions as td
import torch.nn as nn

import alf
import alf.nest as nest
from .encoding_networks import EncodingNetwork, LSTMEncodingNetwork
from .normalizing_flow_networks import RealNVPNetwork
from .projection_networks import NormalProjectionNetwork, CategoricalProjectionNetwork
from .preprocessor_networks import PreprocessorNetwork
from alf.tensor_specs import BoundedTensorSpec, TensorSpec
from alf.networks.network import Network


[docs]@alf.configurable class ActorDistributionNetworkBase(Network): """A base class for ``ActorDistributionNetwork`` and ``ActorDistributionRNNNetwork``. Can also be used to create customized actor networks by providing different encoding network creators. """ def __init__(self, input_tensor_spec: alf.NestedTensorSpec, action_spec: alf.NestedTensorSpec, encoding_network_ctor: Callable, discrete_projection_net_ctor: Callable, continuous_projection_net_ctor: Callable, name: str = 'ActorDistributionNetworkBase', **encoder_kwargs): """ Args: input_tensor_spec: the tensor spec of the input. action_spec: the tensor spec of the action. encoding_network_ctor: the creator of the encoding network that does the heavy lifting of the actor. discrete_projection_net_ctor (ProjectionNetwork): constructor that generates a discrete projection network that outputs discrete actions. continuous_projection_net_ctor (ProjectionNetwork): constructor that generates a continuous projection network that outputs continuous actions. name: name of the network encoder_kwargs: the extra keyword arguments to the encoding network """ super().__init__(input_tensor_spec, name=name) if encoder_kwargs.get('kernel_initializer', None) is None: encoder_kwargs[ 'kernel_initializer'] = torch.nn.init.xavier_uniform_ self._action_spec = action_spec self._encoding_net = encoding_network_ctor(input_tensor_spec, **encoder_kwargs) self._create_projection_net(discrete_projection_net_ctor, continuous_projection_net_ctor) def _create_projection_net(self, discrete_projection_net_ctor, continuous_projection_net_ctor): """If there are :math:`N` action specs, then create :math:`N` projection networks which can be a mixture of categoricals and normals. """ def _create(spec): if spec.is_discrete: net = discrete_projection_net_ctor( input_size=self._encoding_net.output_spec.shape[0], action_spec=spec) else: net = continuous_projection_net_ctor( input_size=self._encoding_net.output_spec.shape[0], action_spec=spec) return net self._projection_net = nest.map_structure(_create, self._action_spec) if nest.is_nested(self._projection_net): # need this for torch to pickup the parameters of all the modules self._projection_net_module_list = nn.ModuleList( nest.flatten(self._projection_net))
[docs] def forward(self, observation, state=()): """Computes an action distribution given an observation. Args: observation (torch.Tensor): consistent with ``input_tensor_spec`` state: empty for API consistent with ``ActorRNNDistributionNetwork`` Returns: act_dist (torch.distributions): action distribution state: empty """ encoding, state = self._encoding_net(observation, state) act_dist = nest.map_structure(lambda proj: proj(encoding)[0], self._projection_net) return act_dist, state
[docs] def make_parallel(self, n): """Create a ``ParallelActorDistributionNetwork`` using ``n`` replicas of ``self``. The initialized network parameters will be different. """ return ParallelActorDistributionNetwork(self, n, "parallel_" + self._name)
@property def state_spec(self): """Return the state spec of the actor network. It is simply the state spec of the encoding network.""" return self._encoding_net.state_spec
[docs]@alf.configurable class ActorDistributionNetwork(ActorDistributionNetworkBase): """Network which outputs temporally uncorrelated action distributions.""" def __init__(self, input_tensor_spec, action_spec, input_preprocessors=None, preprocessing_combiner=None, conv_layer_params=None, fc_layer_params=None, activation=torch.relu_, kernel_initializer=None, use_fc_bn=False, discrete_projection_net_ctor=CategoricalProjectionNetwork, continuous_projection_net_ctor=NormalProjectionNetwork, name="ActorDistributionNetwork"): """ Args: input_tensor_spec (TensorSpec): the tensor spec of the input action_spec (TensorSpec): the action spec input_preprocessors (nested InputPreprocessor): a nest of `InputPreprocessor`, each of which will be applied to the corresponding input. If not None, then it must have the same structure with ``input_tensor_spec`` (after reshaping). If any element is None, then it will be treated as math_ops.identity. This arg is helpful if you want to have separate preprocessings for different inputs by configuring a gin file without changing the code. For example, embedding a discrete input before concatenating it to another continuous vector. preprocessing_combiner (NestCombiner): preprocessing called on complex inputs. Note that this combiner must also accept `input_tensor_spec` as the input to compute the processed tensor spec. For example, see `alf.nest.utils.NestConcat`. This arg is helpful if you want to combine inputs by configuring a gin file without changing the code. conv_layer_params (tuple[tuple]): a tuple of tuples where each tuple takes a format ``(filters, kernel_size, strides, padding)``, where ``padding`` is optional. fc_layer_params (tuple[int]): a tuple of integers representing hidden FC layer sizes. activation (nn.functional): activation used for hidden layers. kernel_initializer (Callable): initializer for all the layers excluding the projection net. If none is provided a default xavier_uniform will be used. use_fc_bn (bool): whether use Batch Normalization for the internal FC layers (i.e. FC layers except the last one). discrete_projection_net_ctor (ProjectionNetwork): constructor that generates a discrete projection network that outputs discrete actions. continuous_projection_net_ctor (ProjectionNetwork): constructor that generates a continuous projection network that outputs continuous actions. name (str): """ super().__init__( input_tensor_spec=input_tensor_spec, action_spec=action_spec, encoding_network_ctor=EncodingNetwork, discrete_projection_net_ctor=discrete_projection_net_ctor, continuous_projection_net_ctor=continuous_projection_net_ctor, name=name, input_preprocessors=input_preprocessors, preprocessing_combiner=preprocessing_combiner, conv_layer_params=conv_layer_params, fc_layer_params=fc_layer_params, activation=activation, kernel_initializer=kernel_initializer, use_fc_bn=use_fc_bn)
[docs]class ParallelActorDistributionNetwork(Network): """Perform ``n`` actor distribution computations in parallel.""" def __init__(self, actor_network: ActorDistributionNetwork, n: int, name="ParallelActorDistributionNetwork"): """ It creates a parallelized version of ``actor_network``. Args: actor_network (ActorDistributionNetwork): non-parallelized actor network n (int): make ``n`` replicas from ``actor_network`` with different initialization. name (str): """ super().__init__( input_tensor_spec=actor_network.input_tensor_spec, name=name) self._encoding_net = actor_network._encoding_net.make_parallel(n) self._projection_net = actor_network._projection_net.make_parallel(n) self._output_spec = self._projection_net.output_spec
[docs] def forward(self, observation, state=()): """Computes action distribution given a batch of observations. Args: inputs (tuple): A tuple of Tensors consistent with `input_tensor_spec``. state (tuple): Empty for API consistent with ``ActorDistributionRNNNetwork``. """ encoding, state = self._encoding_net(observation, state) act_dist = nest.map_structure(lambda proj: proj(encoding)[0], self._projection_net) return act_dist, state
@property def state_spec(self): """Return the state spec of the actor network. It is simply the state spec of the encoding network.""" return self._encoding_net.state_spec
[docs]@alf.configurable class ActorDistributionRNNNetwork(ActorDistributionNetworkBase): """Network which outputs temporally correlated action distributions.""" def __init__(self, input_tensor_spec, action_spec, input_preprocessors=None, preprocessing_combiner=None, conv_layer_params=None, fc_layer_params=None, lstm_hidden_size=100, actor_fc_layer_params=None, activation=torch.relu_, kernel_initializer=None, discrete_projection_net_ctor=CategoricalProjectionNetwork, continuous_projection_net_ctor=NormalProjectionNetwork, name="ActorRNNDistributionNetwork"): """ Args: input_tensor_spec (TensorSpec): the tensor spec of the input action_spec (TensorSpec): the action spec input_preprocessors (nested InputPreprocessor): a nest of ``InputPreprocessor``, each of which will be applied to the corresponding input. If not None, then it must have the same structure with ``input_tensor_spec`` (after reshaping). If any element is None, then it will be treated as math_ops.identity. This arg is helpful if you want to have separate preprocessings for different inputs by configuring a gin file without changing the code. For example, embedding a discrete input before concatenating it to another continuous vector. preprocessing_combiner (NestCombiner): preprocessing called on complex inputs. Note that this combiner must also accept ``input_tensor_spec`` as the input to compute the processed tensor spec. For example, see `alf.nest.utils.NestConcat`. This arg is helpful if you want to combine inputs by configuring a gin file without changing the code. conv_layer_params (tuple[tuple]): a tuple of tuples where each tuple takes a format ``(filters, kernel_size, strides, padding)``, where ``padding`` is optional. fc_layer_params (tuple[int]): a tuple of integers representing hidden FC layers for encoding the observation. lstm_hidden_size (int or tuple[int]): the hidden size(s) of the LSTM cell(s). Each size corresponds to a cell. If there are multiple sizes, then lstm cells are stacked. actor_fc_layer_params (tuple[int]): a tuple of integers representing hidden FC layers that are applied after the lstm cell's output. activation (nn.functional): activation used for hidden layers. kernel_initializer (Callable): initializer for all the layers excluding the projection net. If none is provided a default xavier_uniform will be used. discrete_projection_net_ctor (ProjectionNetwork): constructor that generates a discrete projection network that outputs discrete actions. continuous_projection_net_ctor (ProjectionNetwork): constructor that generates a continuous projection network that outputs continuous actions. name (str): """ super().__init__( input_tensor_spec=input_tensor_spec, action_spec=action_spec, encoding_network_ctor=LSTMEncodingNetwork, discrete_projection_net_ctor=discrete_projection_net_ctor, continuous_projection_net_ctor=continuous_projection_net_ctor, name=name, input_preprocessors=input_preprocessors, preprocessing_combiner=preprocessing_combiner, conv_layer_params=conv_layer_params, pre_fc_layer_params=fc_layer_params, hidden_size=lstm_hidden_size, post_fc_layer_params=actor_fc_layer_params, activation=activation, kernel_initializer=kernel_initializer)
[docs]class UnitNormalActorDistributionNetwork(Network): """Outputs a constant unit normal regardless of the inputs. """ def __init__(self, input_tensor_spec, action_spec, name="UnitNormalActorDistributionNetwork"): super().__init__(input_tensor_spec, name=name) self._action_spec = action_spec
[docs] def forward(self, inputs, state=()): outer_rank = alf.nest.utils.get_outer_rank(inputs, self._input_tensor_spec) outer_dims = alf.nest.get_nest_shape(inputs)[:outer_rank] means = self._action_spec.zeros(outer_dims) stds = self._action_spec.ones(outer_dims) normal_dist = alf.utils.dist_utils.DiagMultivariateNormal( loc=means, scale=stds) return normal_dist, state
[docs]@alf.configurable class LatentActorDistributionNetwork(Network): """Generating an actor distribution by transforming a prior action distribution (e.g., standard Normal noise :math:`\mathcal{N}(0,1)`) with a normalizing flow network. The resulting distribution might have an arbitrary shape. .. warning:: Like some invertible transform such as ``StableTanh``, the inverse computation of a normalizing flow transform might cause numerical issues. For policy gradient methods like AC and PPO, transform caches are usually invalidated because of detaching actions for PG loss. So ``LatentActorDistributionNetwork`` is best suitable for non PG algorithms like DDPG and SAC. See ``alf/docs/notes/compute_probs_of_transformed_dist.rst`` for details. """ def __init__(self, input_tensor_spec: alf.NestedTensorSpec, action_spec: alf.NestedTensorSpec, prior_actor_distribution_network_ctor: Callable = UnitNormalActorDistributionNetwork, normalizing_flow_network_ctor: Callable = RealNVPNetwork, conditional_flow: bool = True, scale_distribution: bool = False, dist_squashing_transform: td.Transform = alf.utils.dist_utils. StableTanh(), name: str = "LatentActorDistributionNetwork"): """ Args: input_tensor_spec: the tensor spec of the input action_spec: the action spec prior_actor_distribution_network_ctor: a constructor that creates any actor distribution network. The only requirement is that this class returns an action distribution (could be transformed) for ``forward()``. normalizing_flow_network_ctor: a constructor that creates a normalizing flow network which is used to transform the prior action distribution. conditional_flow: whether to make the normalizing flow network use inputs to condition its transformations. Only valid for normalizing flow nets that support this option. scale_distribution: Whether or not to scale the output distribution to ensure that the output aciton fits within the ``action_spec``. dist_squashing_transform: A distribution Transform which transforms values into :math:`(-1, 1)`. Default to ``dist_utils.StableTanh()`` name: name of the network """ super().__init__(input_tensor_spec, name=name) self._prior_actor_network = prior_actor_distribution_network_ctor( input_tensor_spec=input_tensor_spec, action_spec=action_spec) self._nf_network = normalizing_flow_network_ctor( input_tensor_spec=action_spec, conditional_input_tensor_spec=(input_tensor_spec if conditional_flow else None)) self._conditional_flow = conditional_flow self._scale_distribution = scale_distribution if scale_distribution: assert isinstance(action_spec, BoundedTensorSpec), \ ("When squashing the mean or scaling the distribution, bounds " + "are required for the action spec!") means, magnitudes = alf.utils.spec_utils.spec_means_and_magnitudes( action_spec) self._squash_transforms = [ dist_squashing_transform, alf.utils.dist_utils.AffineTransform( loc=means, scale=magnitudes) ]
[docs] def forward(self, inputs, state=()): distribution, state = self._prior_actor_network(inputs, state) if not self._conditional_flow: inputs = None nf_transform = self._nf_network.make_invertible_transform(inputs) transforms = [nf_transform] if self._scale_distribution: transforms = transforms + self._squash_transforms transformed_dist = td.TransformedDistribution(distribution, transforms) return transformed_dist, state