# Copyright (c) 2020 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DynamicsNetwork"""
import functools
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import alf
import alf.utils.math_ops as math_ops
import alf.nest as nest
from alf.initializers import variance_scaling_init
from alf.tensor_specs import TensorSpec
from .network import Network
from .encoding_networks import EncodingNetwork
from .projection_networks import NormalProjectionNetwork
[docs]@alf.configurable
class DynamicsNetwork(Network):
"""Create an instance of DynamicsNetwork."""
def __init__(self,
input_tensor_spec,
output_tensor_spec,
joint_fc_layer_params=None,
activation=torch.relu_,
kernel_initializer=None,
prob=False,
continuous_projection_net_ctor=NormalProjectionNetwork,
name="DynamicsNetwork"):
"""Creates an instance of `DynamicsNetwork` for predicting the next
observation given current observation and action.
Args:
input_tensor_spec: A tuple of TensorSpecs (observation_spec, action_spec)
representing the inputs.
joint_fc_layer_params (tuple[int]): a tuple of integers representing
hidden FC layer sizes FC layers after merging observations and
actions.
activation (nn.functional): activation used for hidden layers. The
last layer will not be activated.
kernel_initializer (Callable): initializer for all the layers but
the last layer. If none is provided a variance_scaling_initializer
with uniform distribution will be used.
prob (bool): If True, use the probabistic mode of network; otherwise,
use the determinstic mode of network.
continuous_projection_net_ctor (ProjectionNetwork): constructor that
generates a continuous projection network that outputs
a distribution.
name (str):
"""
super().__init__(input_tensor_spec, name=name)
observation_spec, action_spec = input_tensor_spec
out_size = output_tensor_spec.shape[0]
flat_action_spec = nest.flatten(action_spec)
if len(flat_action_spec) > 1:
raise ValueError(
'Only a single action is supported by this network')
if kernel_initializer is None:
kernel_initializer = functools.partial(
variance_scaling_init,
gain=1.0 / 2.0,
mode='fan_in',
distribution='truncated_normal',
nonlinearity=math_ops.identity)
self._single_action_spec = flat_action_spec[0]
self._prob = prob
if self._prob:
self._joint_encoder = EncodingNetwork(
TensorSpec(
(observation_spec.shape[0] + action_spec.shape[0], )),
fc_layer_params=joint_fc_layer_params,
activation=activation,
kernel_initializer=kernel_initializer)
# the output spec is named as ``action_spec`` in projection_net
self._projection_net = continuous_projection_net_ctor(
# note that in the case of multi-replica, should use [-1]
input_size=self._joint_encoder.output_spec.shape[-1],
action_spec=output_tensor_spec,
squash_mean=False,
scale_distribution=False,
state_dependent_std=True)
else:
self._joint_encoder = EncodingNetwork(
TensorSpec(
(observation_spec.shape[0] + action_spec.shape[0], )),
fc_layer_params=joint_fc_layer_params,
activation=activation,
kernel_initializer=kernel_initializer,
last_activation=math_ops.identity,
last_layer_size=out_size)
self._projection_net = None
self._output_spec = TensorSpec((out_size, ))
[docs] def forward(self, inputs, state=()):
"""Computes prediction given inputs.
Args:
inputs: A tuple of Tensors consistent with `input_tensor_spec`
state: empty for API consistency
Returns:
out: a tensor of the size [B, n, d] if self._prob is False
and a distribution if self._prob is True.
state: empty
"""
observations, actions = inputs
encoded_obs = observations
encoded_action = actions
joint = torch.cat([encoded_obs, encoded_action], -1)
out, _ = self._joint_encoder(joint)
if self._projection_net is not None:
out, _ = self._projection_net(out)
return out, state
[docs] def make_parallel(self, n):
"""Create a ``ParallelCriticNetwork`` using ``n`` replicas of ``self``.
The initialized network parameters will be different.
"""
return ParallelDynamicsNetwork(self, n, "parallel_" + self._name)
[docs]class ParallelDynamicsNetwork(Network):
"""Create ``n`` DynamicsNetwork in parallel."""
def __init__(self,
dynamics_network: DynamicsNetwork,
n: int,
name="ParallelDynamicsNetwork"):
"""
It create a parallelized version of ``DynamicsNetwork``.
Args:
dynamics_network (DynamicsNetwork): non-parallelized dynamics network
n (int): make ``n`` replicas from ``dynamics_network`` with different
initializations.
name (str):
"""
super().__init__(
input_tensor_spec=dynamics_network.input_tensor_spec, name=name)
self._joint_encoder = dynamics_network._joint_encoder.make_parallel(
n, True)
self._prob = dynamics_network._prob
if self._prob:
self._projection_net = \
dynamics_network._projection_net.make_parallel(n)
else:
self._projection_net = None
self._output_spec = TensorSpec((n, ) +
dynamics_network.output_spec.shape)
[docs] def forward(self, inputs, state=()):
"""Computes prediction given inputs.
Args:
inputs: A tuple of Tensors consistent with `input_tensor_spec`
state: empty for API consistency
Returns:
out: a tensor of the size [B, n, d] if self._prob is False
and a distribution if self._prob is True.
state: empty
"""
observations, actions = inputs
encoded_obs = observations
encoded_action = actions
joint = torch.cat([encoded_obs, encoded_action], -1)
out, _ = self._joint_encoder(joint)
if self._projection_net is not None:
out, _ = self._projection_net(out)
return out, state