Source code for alf.networks.networks

# Copyright (c) 2021 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Various concrete Networks."""

import copy
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import typing
from typing import Callable, Dict, Optional, Tuple

import alf
from alf.initializers import variance_scaling_init
from alf.utils.math_ops import identity
from alf.utils.common import expand_dims_as, is_eval
from .network import Network, wrap_as_network

__all__ = [
    'LSTMCell', 'GRUCell', 'NoisyFC', 'Residue', 'TemporalPool', 'Delay',
    'AMPWrapper'
]


[docs]class LSTMCell(Network): r"""A long short-term memory (LSTM) cell. .. math:: \begin{array}{ll} i = \sigma(W_{ii} x + b_{ii} + W_{hi} h + b_{hi}) \\ f = \sigma(W_{if} x + b_{if} + W_{hf} h + b_{hf}) \\ g = \tanh(W_{ig} x + b_{ig} + W_{hg} h + b_{hg}) \\ o = \sigma(W_{io} x + b_{io} + W_{ho} h + b_{ho}) \\ c' = f * c + i * g \\ h' = o * \tanh(c') \\ \end{array} where :math:`\sigma` is the sigmoid function, and :math:`*` is the Hadamard product. """ def __init__(self, input_size, hidden_size, name='LSTMCell'): """ Args: input_size (int): The number of expected features in the input `x` hidden_size (int): The number of features in the hidden state `h` """ state_spec = (alf.TensorSpec((hidden_size, )), alf.TensorSpec((hidden_size, ))) super().__init__( input_tensor_spec=alf.TensorSpec((input_size, )), state_spec=state_spec, name=name) self._cell = nn.LSTMCell( input_size=input_size, hidden_size=hidden_size)
[docs] def forward(self, input, state): h_state, c_state = self._cell(input, state) return h_state, (h_state, c_state)
[docs]class GRUCell(Network): r"""A gated recurrent unit (GRU) cell .. math:: \begin{array}{ll} r = \sigma(W_{ir} x + b_{ir} + W_{hr} h + b_{hr}) \\ z = \sigma(W_{iz} x + b_{iz} + W_{hz} h + b_{hz}) \\ n = \tanh(W_{in} x + b_{in} + r * (W_{hn} h + b_{hn})) \\ h' = (1 - z) * n + z * h \end{array} where :math:`\sigma` is the sigmoid function, and :math:`*` is the Hadamard product. """ def __init__(self, input_size, hidden_size, name='GRUCell'): """ Args: input_size (int): The number of expected features in the input `x` hidden_size (int): The number of features in the hidden state `h` """ super().__init__( input_tensor_spec=alf.TensorSpec((input_size, )), state_spec=alf.TensorSpec((hidden_size, )), name=name) self._cell = nn.GRUCell(input_size, hidden_size)
[docs] def forward(self, input, state): h = self._cell(input, state) return h, h
[docs]class Residue(Network): """Residue block. It performs ``y = activation(x + block(x))``. """ def __init__(self, block, input_tensor_spec=None, activation=torch.relu_, name='Residue'): """ Args: block (Callable): input_tensor_spec (nested TensorSpec): input tensor spec for ``block`` if it cannot be infered from ``block`` activation (Callable): activation function """ block = wrap_as_network(block, input_tensor_spec) super().__init__( input_tensor_spec=block.input_tensor_spec, state_spec=block.state_spec, name='Residue') self._block = block self._activation = activation
[docs] def forward(self, x, state=()): y, state = self._block(x, state) return self._activation(x + y), state
[docs]class TemporalPool(Network): """Pool features temporally. Suppose input_size=(), stack_size=2, pooling_size=2, the following table shows the output of different mode for an input sequence of 1,2,3,4,5 (ignoring batch dimension) 1, 2, 3, 4, 5 skip: [0, 1], [0, 1], [1, 3], [1, 3], [3, 5] avg: [0, 0], [0, 1.5], [0, 1.5], [1.5, 3.5], [1.5, 3.5] max: [0, 0], [0, 2], [0, 2], [2, 4], [2, 4] Note that for 'avg' and 'max', the result is zero for the first ``pooling_size - 1`` steps because it needs ``pooling_size`` input to calculate the result. After that, the output changes every ``pooling_size`` steps as the new pooling result available. On the other hand, for 'skip', the first input is immediately reflected in the output because it is a valid way of skipping. Example: .. code-block:: python # A temporal CNN with progressively large temporal receptive field. cnn = alf.networks.Sequential([ alf.networks.TemporalPool(256, 3, 1), torch.nn.Flatten(), alf.layers.FC(768, 256, activation=torch.relu_), alf.networks.TemporalPool(256, 3, 2), torch.nn.Flatten(), alf.layers.FC(768, 256, activation=torch.relu_), alf.networks.TemporalPool(256, 3, 4), torch.nn.Flatten(), alf.layers.FC(768, 256, activation=torch.relu_)]) Note that the output of the above network changes every 4 steps, which may make the response too slow for many tasks. So a practical way of using ``TemporalPool`` is to combine it with ``Residue`` so that the output will not lag: .. code-block:: python block = alf.networks.Residue( alf.networks.Sequential([ alf.networks.TemporalPool(256, 3, 2), torch.nn.Flatten(), alf.layers.FC(768, 256, activation=torch.relu_)])) """ def __init__(self, input_size, stack_size, pooling_size=1, dtype=torch.float32, mode='skip', name='TemporalPool'): """ Args: input_size (int|tuple[int]): shape of the input stack_size (int): stack the features from so many steps pooling_size (int): if > 1, perform a pooling first. ``pooling_size`` steps of features will be pooled as single feature vector according to ``mode`` mode (str): one of ('skip', 'avg', 'max'), only effective if pooling_size > 1. 'skip': only keeping features at step ``t * pooling_size`` 'avg': features are averaged for each window of ``pooling_size`` steps. The pooling results for first ``pooling_size - 1`` steps are 0. 'max': features are maxed for each window of ``pooling_size`` steps The pooling results for first ``pooling_size - 1`` steps are 0. Returns: tuple of: - tensor of shape (stack_size, input_size) - internal states """ if isinstance(input_size, typing.Iterable): input_size = tuple(input_size) else: input_size = (input_size, ) shape = (stack_size, ) + input_size input_tensor_spec = alf.TensorSpec(input_size, dtype=dtype) self._pooling_size = pooling_size if pooling_size == 1: state_spec = alf.TensorSpec((stack_size - 1, ) + input_size, dtype) elif mode == 'skip': self._pool_func = self._skip_pool pool_state_spec = () self._update_step = 1 elif mode == 'avg': self._pool_func = self._avg_pool pool_state_spec = input_tensor_spec self._update_step = 0 elif mode == 'max': self._pool_func = self._max_pool pool_state_spec = input_tensor_spec self._update_step = 0 else: raise ValueError("Unknown mode '%s'" % mode) if pooling_size > 1: state_spec = (alf.TensorSpec(shape, input_tensor_spec.dtype), pool_state_spec, alf.TensorSpec((), dtype=torch.int64)) super().__init__(input_tensor_spec, state_spec=state_spec, name=name)
[docs] def forward(self, x, state): if self._pooling_size == 1: output = torch.cat([state, x.unsqueeze(1)], dim=1) return output, output[:, 1:, ...] else: output, pool_state, step = state step = step + 1 pool, pool_state = self._pool_func(x, pool_state, step) step = step % self._pooling_size output = torch.where( expand_dims_as(step == self._update_step, output), torch.cat( [output[:, 1:, ...], pool.unsqueeze(1)], dim=1), output) return output, (output, pool_state, step)
def _skip_pool(self, x, state, step): return x, () def _avg_pool(self, x, state, step): w = expand_dims_as(1. / step.to(torch.float32), x) state = torch.where( expand_dims_as(step == 1, x), x, torch.lerp(state, x, w)) return state, state def _max_pool(self, x, state, step): state = torch.where( expand_dims_as(step == 1, x), x, torch.max(x, state)) return state, state
[docs]class Delay(Network): """The output is the input of the ``delay`` step ago. Args: input_tensor_spec (nested TensorSpec): representing the input delay (int): if 0, there is no delay and the output is same as the input. """ def __init__(self, input_tensor_spec, delay=1, name='Delay'): if delay == 0: state_spec = () self._forward = lambda i, s: (i, ()) elif delay == 1: state_spec = input_tensor_spec self._forward = lambda i, s: (s, i) else: state_spec = (input_tensor_spec, ) * delay self._forward = lambda i, s: (s[0], s[1:] + (i, )) super().__init__( input_tensor_spec=input_tensor_spec, state_spec=state_spec, name=name)
[docs] def forward(self, input, state): return self._forward(input, state)
[docs]class AMPWrapper(Network): """Wrap a network to run in a given AMP context. Args: enabled: whether to enable AMP autocast net: the wrapped network """ def __init__(self, enabled: bool, net: Network): super().__init__( net.input_tensor_spec, state_spec=net.state_spec, name=net.name) self._net = net self._enabled = enabled
[docs] def forward(self, input, state): if torch.is_autocast_enabled() and not self._enabled: input = alf.nest.map_structure( lambda x: x.float() if x.dtype.is_floating_point else x, input) with torch.cuda.amp.autocast(self._enabled): return self._net(input, state)
[docs]@alf.configurable @alf.repr_wrapper class NoisyFC(Network): r"""The Noisy Linear Layer discribed in Fortunato et. al. `Noisy Networks for Exploration <https://arxiv.org/abs/1706.10295>`_ In short, the original weight :math:`w` and bias :math:`b` of FC layer are replaced with :math:`w + w_\sigma \odot \epislon^w` and :math:`b + b_\sigma \odot \epsion^b` where :math:`\epsilon^w` and :math:`\epsilon^b` are noise and :math:`w, w_\sigma, b, b_\sigma` are trainable parameters. Some details: 1. The noise for each sample in a batch is different. 2. The noise is maintained as state. It has a probability of `new_noise_prob` to change to new noise. 3. Since the initial state is always 0, a new noise will always be generated for zero state. 4. If it is running in eval mode (i.e., common.is_eval() is True), noise will be disabled (i.e. same as alf.layers.FC). 5. The noise is factorized Gaussian noise as described in the paper. Args: input_size: input size. output_size: output size. activation: activation function. std_init: the scaling factor for the initial value of weight_sigma and bias_sigma. new_noise_prob: the probability of resample the noise. use_bn: whether use batch normalization. use_ln: whether use layer normalization bn_ctor: will be called as ``bn_ctor(num_features)`` to create the BN layer. kernel_initializer: initializer for the FC layer kernel. If none is provided a ``variance_scaling_initializer`` with gain as ``kernel_init_gain`` will be used. kernel_init_gain: a scaling factor (gain) applied to the std of kernel init distribution. It will be ignored if ``kernel_initializer`` is not None. bias_init_value: a constant for the initial bias value. This is ignored if ``bias_initializer`` is provided. bias_initializer: initializer for the bias parameter. weight_opt_args: If provided, it will be used as optimizer arguments for weight. And it will be combined with zero_mean=False and fixed_norm=False as optimizer arguments for weight_sigma. bias_opt_args: If provided, it will be used as optimizer arguments for bias. And it will be combined with zero_mean=False as optimizer arguments for bias_sigma. """ def __init__(self, input_size: int, output_size: int, std_init: float = 0.5, new_noise_prob: float = 0.01, activation: Callable = identity, use_bn: bool = False, use_ln: bool = False, bn_ctor: Callable = nn.BatchNorm1d, kernel_initializer: Optional[Callable] = None, kernel_init_gain: float = 1.0, bias_init_value: float = 0.0, bias_initializer: Optional[Callable] = None, weight_opt_args: Optional[Dict] = None, bias_opt_args: Optional[Dict] = None): super().__init__( input_tensor_spec=alf.TensorSpec((input_size, )), state_spec=(alf.TensorSpec((input_size, )), alf.TensorSpec((output_size, )))), self._input_size = input_size self._output_size = output_size self._activation = activation self._std_init = std_init self._weight = nn.Parameter(torch.empty(output_size, input_size)) self._weight_sigma = nn.Parameter(torch.empty(output_size, input_size)) self._bias = nn.Parameter(torch.empty(output_size)) self._bias_sigma = nn.Parameter(torch.empty(output_size)) self._use_bn = use_bn self._use_ln = use_ln if use_bn: self._bn = bn_ctor(output_size) else: self._bn = None if use_ln: self._ln = nn.LayerNorm(output_size) else: self._ln = None self._new_noise_prob = new_noise_prob self._kernel_initializer = kernel_initializer self._kernel_init_gain = kernel_init_gain self._bias_init_value = bias_init_value self._bias_initializer = bias_initializer self.reset_parameters() if weight_opt_args: self._weight.opt_args = weight_opt_args weight_opt_args = copy.copy(weight_opt_args) weight_opt_args['zero_mean'] = False weight_opt_args['fixed_norm'] = False self._weight_sigma.opt_args = weight_opt_args if bias_opt_args and self._bias is not None: self._bias.opt_args = bias_opt_args bias_opt_args = copy.copy(bias_opt_args) bias_opt_args['zero_mean'] = False self._bias_sigma.opt_args = bias_opt_args @property def input_size(self): return self._input_size @property def output_size(self): return self._output_size @property def weight(self): return self._weight @property def bias(self): return self._bias
[docs] def reset_parameters(self): """Initialize the parameters.""" if self._kernel_initializer is None: variance_scaling_init( self._weight.data, gain=self._kernel_init_gain, nonlinearity=self._activation) else: self._kernel_initializer(self._weight.data) self._weight_sigma.data.fill_( self._std_init / math.sqrt(self._input_size)) if self._bias_initializer is not None: self._bias_initializer(self._bias.data) else: nn.init.constant_(self._bias.data, self._bias_init_value) self._bias_sigma.data.fill_( self._std_init / math.sqrt(self._output_size)) if self._use_ln: self._ln.reset_parameters() if self._use_bn: self._bn.reset_parameters()
def _scale_noise(self, size): x = torch.randn(size) return x.sign().mul_(x.abs().sqrt_())
[docs] def forward(self, input: torch.Tensor, state: Tuple[torch.Tensor]): """Forward computation. Args: inputs: its shape should be ``[batch_size, input_size]` state: tuple of noise Returns: Tensor: with shape as ``[batch_size, output_size]`` """ epsilon_in, epsilon_out = state # y = bias + input @ weight.t() y = torch.addmm(self._bias, input, self._weight.t()) if not is_eval(): batch_size = input.shape[0] new_epsilon_in = self._scale_noise((batch_size, self._input_size)) new_epsilon_out = self._scale_noise((batch_size, self._output_size)) new_noise = torch.rand(batch_size) < self._new_noise_prob # The initial state is always 0. So we need to generate new noise # for initial state. new_noise = new_noise | ((epsilon_in == 0).all(dim=1) & (epsilon_out == 0).all(dim=1)) new_noise = new_noise.unsqueeze(-1) epsilon_in = torch.where(new_noise, new_epsilon_in, epsilon_in) epsilon_out = torch.where(new_noise, new_epsilon_out, epsilon_out) noise_in = input * epsilon_in # x = bias_sigma + noise_in @ weight_sigma.t() x = torch.addmm(self._bias_sigma, noise_in, self._weight_sigma.t()) # Although y.addcmul_(x, epsilon_out) is better, it can have problem # of dtype mismatch when AMP is enabled. y = y + x * epsilon_out if self._use_ln: y = self._ln(y) if self._use_bn: y = self._bn(y) return self._activation(y), (epsilon_in, epsilon_out)