# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ActorNetworks"""
import functools
import math
from typing import Callable
import torch
import torch.nn as nn
import alf
from .encoding_networks import EncodingNetwork, LSTMEncodingNetwork
from .preprocessor_networks import PreprocessorNetwork
import alf.layers as layers
import alf.nest as nest
from alf.initializers import variance_scaling_init
from alf.networks import Network
from alf.tensor_specs import TensorSpec, BoundedTensorSpec
from alf.utils import common, math_ops, spec_utils
[docs]@alf.configurable
class ActorNetworkBase(Network):
"""A base class for ``ActorNetwork`` and ``ActorRNNNetwork``.
Can also be used to create customized actor networks by providing
different encoding network creators.
"""
def __init__(self,
input_tensor_spec: alf.NestedTensorSpec,
action_spec: alf.NestedTensorSpec,
encoding_network_ctor: Callable = EncodingNetwork,
squashing_func=torch.tanh,
name="ActorNetworkBase",
**encoder_kwargs):
"""
Args:
input_tensor_spec: the tensor spec of the input.
action_spec: the tensor spec of the action.
encoding_network_ctor: the creator of the encoding network that does
the heavy lifting of the actor.
squashing_func: the activation function used to squashing
the output to the range :math:`(-1, 1)`. Default to ``tanh``.
name: name of the network
encoder_kwargs: the extra keyword arguments to the encoding network
"""
super().__init__(input_tensor_spec, name=name)
if encoder_kwargs.get('kernel_initializer', None) is None:
encoder_kwargs['kernel_initializer'] = functools.partial(
variance_scaling_init,
gain=math.sqrt(1.0 / 3),
mode='fan_in',
distribution='uniform')
self._action_spec = action_spec
flat_action_spec = nest.flatten(action_spec)
self._flat_action_spec = flat_action_spec
is_continuous = [
single_action_spec.is_continuous
for single_action_spec in flat_action_spec
]
assert all(is_continuous), "only continuous action is supported"
self._encoding_net = encoding_network_ctor(
input_tensor_spec,
name=self.name + '.encoding_net',
**encoder_kwargs)
last_kernel_initializer = functools.partial(torch.nn.init.uniform_, \
a=-0.003, b=0.003)
self._action_layers = nn.ModuleList()
self._squashing_func = squashing_func
for single_action_spec in flat_action_spec:
self._action_layers.append(
layers.FC(
self._encoding_net.output_spec.shape[0],
single_action_spec.shape[0],
kernel_initializer=last_kernel_initializer))
[docs] def forward(self, observation, state=()):
"""Computes action given an observation.
Args:
inputs: A tensor consistent with ``input_tensor_spec``
state: empty for API consistent with ``ActorRNNNetwork``
Returns:
tuple:
- action (torch.Tensor): a tensor consistent with ``action_spec``
- state: empty
"""
encoded_obs, state = self._encoding_net(observation, state)
actions = []
i = 0
for layer, spec in zip(self._action_layers, self._flat_action_spec):
pre_activation = layer(encoded_obs)
action = self._squashing_func(pre_activation)
action = spec_utils.scale_to_spec(action, spec)
if alf.summary.should_summarize_output():
alf.summary.scalar(
name='summarize_output/' + self.name + '.action_layer.' +
str(i) + '.pre_activation.output_norm.' +
common.exe_mode_name(),
data=torch.mean(
pre_activation.norm(
dim=list(range(1, pre_activation.ndim)))))
a_name = (
'summarize_output/' + self.name + '.action_layer.' + str(i)
+ '.action.output_norm.' + common.exe_mode_name())
alf.summary.scalar(
name=a_name,
data=torch.mean(
action.norm(dim=list(range(1, action.ndim)))))
actions.append(action)
i += 1
output_actions = nest.pack_sequence_as(self._action_spec, actions)
return output_actions, state
@property
def state_spec(self):
"""Return the state spec of the actor network. It is simply the state spec
of the encoding network."""
return self._encoding_net.state_spec
[docs]@alf.configurable
class ActorNetwork(ActorNetworkBase):
def __init__(self,
input_tensor_spec: TensorSpec,
action_spec: BoundedTensorSpec,
input_preprocessors=None,
preprocessing_combiner=None,
conv_layer_params=None,
fc_layer_params=None,
activation=torch.relu_,
squashing_func=torch.tanh,
kernel_initializer=None,
name="ActorNetwork"):
"""Creates an instance of ``ActorNetwork``, which maps the inputs to
actions (single or nested) through a sequence of deterministic layers.
Args:
input_tensor_spec (TensorSpec): the tensor spec of the input.
action_spec (BoundedTensorSpec): the tensor spec of the action.
input_preprocessors (nested Network|nn.Module|None): a nest of
input preprocessors, each of which will be applied to the
corresponding input. If not None, then it must
have the same structure with ``input_tensor_spec`` (after reshaping).
If any element is None, then it will be treated as ``math_ops.identity``.
This arg is helpful if you want to have separate preprocessings
for different inputs by configuring a gin file without changing
the code. For example, embedding a discrete input before concatenating
it to another continuous vector.
preprocessing_combiner (NestCombiner): preprocessing called on
complex inputs. Note that this combiner must also accept
``input_tensor_spec`` as the input to compute the processed
tensor spec. For example, see ``alf.nest.utils.NestConcat``. This
arg is helpful if you want to combine inputs by configuring a
gin file without changing the code.
conv_layer_params (tuple[tuple]): a tuple of tuples where each
tuple takes a format ``(filters, kernel_size, strides, padding)``,
where ``padding`` is optional.
fc_layer_params (tuple[int]): a tuple of integers representing hidden
FC layer sizes.
activation (nn.functional): activation used for hidden layers. The
last layer will not be activated.
squashing_func (Callable): the activation function used to squashing
the output to the range :math:`(-1, 1)`. Default to ``tanh``.
kernel_initializer (Callable): initializer for all the layers but
the last layer. If none is provided a ``variance_scaling_initializer``
with uniform distribution will be used.
name (str): name of the network
"""
super(ActorNetwork, self).__init__(
input_tensor_spec=input_tensor_spec,
action_spec=action_spec,
encoding_network_ctor=EncodingNetwork,
squashing_func=squashing_func,
name=name,
input_preprocessors=input_preprocessors,
preprocessing_combiner=preprocessing_combiner,
conv_layer_params=conv_layer_params,
fc_layer_params=fc_layer_params,
activation=activation,
kernel_initializer=kernel_initializer)
[docs]@alf.configurable
class ActorRNNNetwork(ActorNetworkBase):
def __init__(self,
input_tensor_spec: TensorSpec,
action_spec: BoundedTensorSpec,
input_preprocessors=None,
preprocessing_combiner=None,
conv_layer_params=None,
fc_layer_params=None,
lstm_hidden_size=100,
actor_fc_layer_params=None,
activation=torch.relu_,
squashing_func=torch.tanh,
kernel_initializer=None,
name="ActorRNNNetwork"):
"""Creates an instance of `ActorRNNNetwork`, which maps the inputs
(observation and states) to actions (single or nested) through a
sequence of deterministic layers.
Args:
input_tensor_spec (TensorSpec): the tensor spec of the input.
action_spec (BoundedTensorSpec): the tensor spec of the action.
input_preprocessors (nested Network|nn.Module|None): a nest of
input preprocessors, each of which will be applied to the
corresponding input. If not None, then it must
have the same structure with ``input_tensor_spec`` (after reshaping).
If any element is None, then it will be treated as ``math_ops.identity``.
This arg is helpful if you want to have separate preprocessings
for different inputs by configuring a gin file without changing
the code. For example, embedding a discrete input before concatenating
it to another continuous vector.
preprocessing_combiner (NestCombiner): preprocessing called on
complex inputs. Note that this combiner must also accept
``input_tensor_spec`` as the input to compute the processed
tensor spec. For example, see ``alf.nest.utils.NestConcat``. This
arg is helpful if you want to combine inputs by configuring a
gin file without changing the code.
conv_layer_params (tuple[tuple]): a tuple of tuples where each
tuple takes a format ``(filters, kernel_size, strides, padding)``,
where ``padding`` is optional.
fc_layer_params (tuple[int]): a tuple of integers representing hidden
FC layer sizes.
lstm_hidden_size (int or tuple[int]): the hidden size(s)
of the LSTM cell(s). Each size corresponds to a cell. If there
are multiple sizes, then lstm cells are stacked.
actor_fc_layer_params (tuple[int]): a tuple of integers representing
hidden FC layers that are applied after the lstm cell's output.
activation (nn.functional): activation used for hidden layers. The
last layer will not be activated.
squashing_func (Callable): the activation function used to squashing
the output to the range :math:`(-1, 1)`. Default to ``tanh``.
kernel_initializer (Callable): initializer for all the layers but
the last layer. If none is provided a variance_scaling_initializer
with uniform distribution will be used.
name (str): name of the network
"""
super(ActorRNNNetwork, self).__init__(
input_tensor_spec,
action_spec,
encoding_network_ctor=LSTMEncodingNetwork,
squashing_func=squashing_func,
name=name,
input_preprocessors=input_preprocessors,
preprocessing_combiner=preprocessing_combiner,
conv_layer_params=conv_layer_params,
pre_fc_layer_params=fc_layer_params,
hidden_size=lstm_hidden_size,
post_fc_layer_params=actor_fc_layer_params,
activation=activation,
kernel_initializer=kernel_initializer)