# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod
import torch
import torch.nn as nn
import alf
from alf.nest.utils import get_outer_rank
from alf.tensor_specs import TensorSpec
from alf.utils import common, math_ops
from alf.utils.averager import WindowAverager, EMAverager, AdaptiveAverager
[docs]@alf.configurable(whitelist=['max_dims_to_summarize'])
class Normalizer(nn.Module):
def __init__(self,
tensor_spec,
auto_update=True,
zero_mean=True,
unit_std=False,
variance_epsilon=1e-10,
debug_summaries=False,
max_dims_to_summarize=10,
name="Normalizer"):
r"""Create a base normalizer using a first-moment and a second-moment
averagers.
Given weights :math:`w_i` and samples :math:`x_i, i = 1 \cdots n`, let
.. math::
\begin{array}{lll}
m & = \sum_i w_i * x_i \; & \mbox{(first moment)} \\
m2 & = \sum_i w_i * x_i^2 \; & \mbox{(second moment)}
\end{array}
then
.. math::
\begin{array}{ll}
var & = \sum_i w_i * (x_i - m)^2 \\
& = \sum_i w_i * (x_i^2 + m^2 - 2*x_i*m) \\
& = m2 + m^2 - 2m^2 \\
& = m2 - m^2
\end{array}
which is the same result with the case when :math:`w_1=w_2=...=w_n=(1/n)`
NOTE: tf_agents' normalizer maintains a running average of variance which
is not correct mathematically, because the estimated variance contains
early components that don't measure all the current samples.
Args:
tensor_spec (nested TensorSpec): specs of the mean of tensors to be
normalized.
auto_update (bool): If True, automatically update mean and variance
for each call to ``normalize()``. Otherwise, the user needs to call
``update()``
zero_mean (bool): whether to make the normalized value be zero-mean
unit_std (bool): whether assume a unit std or not when normalizing.
If True, then the rewards are just subtracted by the mean.
variance_epsilon (float): a small value added to std for normalizing
debug_summaries (bool): True if debug summaries should be created.
max_dims_to_summarize (int): when ``debug_summaries=True``, the max
number of dims of the normalizer's statistics will be summarized.
Note that a large number could potentially dump a lot of TB plots,
consume much disk space, and slow down training. Default: 10.
name (str):
"""
super().__init__()
self._name = name
self._auto_update = auto_update
self._variance_epsilon = variance_epsilon
self._tensor_spec = tensor_spec
assert zero_mean or not unit_std, (
"Must at least subtract mean or divide std!")
if zero_mean:
self._mean_averager = self._create_averager()
else:
self._mean_averager = None
if not unit_std:
self._m2_averager = self._create_averager()
else:
self._m2_averager = None
self._debug_summaries = debug_summaries
self._max_dims_to_summarize = max_dims_to_summarize
@abstractmethod
def _create_averager(self):
"""
Create an averager. Derived classes must specify what averager to use.
"""
pass
[docs] def update(self, tensor):
"""Update the statistics given a new tensor.
"""
if self._mean_averager:
self._mean_averager.update(tensor)
if self._m2_averager:
sqr_tensor = alf.nest.map_structure(math_ops.square, tensor)
self._m2_averager.update(sqr_tensor)
if self._debug_summaries and alf.summary.should_record_summaries():
suffix = common.exe_mode_name()
def _reduce_along_batch_dims(x, spec, op):
bs = alf.layers.BatchSquash(get_outer_rank(x, spec))
x = bs.flatten(x)
x = op(x, dim=0)[0]
return x
def _summary(name, val):
with alf.summary.scope(self._name):
if val.ndim == 0:
alf.summary.scalar(name + "." + suffix, val)
elif val.numel() < self._max_dims_to_summarize:
val = val.reshape(-1) # val might be multi-rank
for i in range(val.numel()):
alf.summary.scalar(
name + "_" + str(i) + "." + suffix, val[i])
else:
alf.summary.scalar(name + ".min." + suffix, val.min())
alf.summary.scalar(name + ".max." + suffix, val.max())
def _summarize_all(path, t, m2, m):
if path:
path += "."
spec = TensorSpec.from_tensor(m if m2 is None else m2)
_summary(path + "tensor.batch_min",
_reduce_along_batch_dims(t, spec, torch.min))
_summary(path + "tensor.batch_max",
_reduce_along_batch_dims(t, spec, torch.max))
if m is not None:
_summary(path + "mean", m)
if m2 is not None:
_summary(path + "var", m2 - math_ops.square(m))
elif m2 is not None:
_summary(path + "second_moment", m2)
m2 = (self._m2_averager.get() if self._m2_averager else None)
m = (self._mean_averager.get() if self._mean_averager else None)
alf.nest.py_map_structure_with_path(_summarize_all, tensor, m2, m)
[docs] def normalize(self, tensor, clip_value=-1.0):
"""
Normalize a tensor with mean and variance
Args:
tensor (nested Tensor): each leaf can have arbitrary outer dims
with shape [B1, B2,...] + tensor_spec.shape.
clip_value (float): if positive, normalized values will be clipped to
[-clip_value, clip_value].
Returns:
normalized tensor
"""
if self._auto_update:
self.update(tensor)
return self._normalize(tensor, clip_value)
def _normalize(self, tensor, clip_value=-1.0):
def _normalize(m2, t, m):
# in some extreme cases, due to floating errors, var might be a very
# large negative value (close to 0)
if m2 is not None:
if m is not None:
var = torch.relu(m2 - math_ops.square(m))
else:
var = m2
m = torch.zeros_like(m2)
else:
var = torch.ones_like(m)
t = alf.layers.normalize_along_batch_dims(
t, m, var, variance_epsilon=self._variance_epsilon)
if clip_value > 0:
t = torch.clamp(t, -clip_value, clip_value)
return t
m2 = (self._m2_averager.get() if self._m2_averager else None)
m = (self._mean_averager.get() if self._mean_averager else None)
return alf.nest.map_structure(_normalize, m2, tensor, m)
[docs] def forward(self, input):
if self.training:
self.update(input)
return self._normalize(input)
[docs]@alf.configurable
class WindowNormalizer(Normalizer):
"""Normalization according to a recent window of samples.
"""
def __init__(self,
tensor_spec,
window_size=1000,
auto_update=True,
zero_mean=True,
unit_std=False,
variance_epsilon=1e-10,
debug_summaries=False,
name="WindowNormalizer"):
"""
Args:
tensor_spec (nested TensorSpec): specs of the mean of tensors to be
normalized.
window_size (int): the size of the recent window
auto_update (bool): If True, automatically update mean and variance
for each call to `normalize()`. Otherwise, the user needs to call
`update()`
zero_mean (bool): whether to make the normalized value be zero-mean
unit_std (bool): whether assume a unit std or not when normalizing.
If True, then the rewards are just subtracted by the mean.
variance_epislon (float): a small value added to std for normalizing
debug_summaries (bool): whether to generate debug summaries
name (str):
"""
self._window_size = window_size
super(WindowNormalizer, self).__init__(
tensor_spec=tensor_spec,
auto_update=auto_update,
zero_mean=zero_mean,
unit_std=unit_std,
variance_epsilon=variance_epsilon,
debug_summaries=debug_summaries,
name=name)
def _create_averager(self):
"""Returns a window averager."""
return WindowAverager(
tensor_spec=self._tensor_spec, window_size=self._window_size)
[docs]@alf.configurable
class ScalarWindowNormalizer(WindowNormalizer):
def __init__(self,
window_size=1000,
auto_update=True,
zero_mean=True,
unit_std=False,
variance_epsilon=1e-10,
debug_summaries=False,
name="ScalarWindowNormalizer"):
super(ScalarWindowNormalizer, self).__init__(
tensor_spec=TensorSpec((), dtype='float32'),
window_size=window_size,
auto_update=auto_update,
zero_mean=zero_mean,
unit_std=unit_std,
variance_epsilon=variance_epsilon,
debug_summaries=debug_summaries,
name=name)
[docs]@alf.configurable
class EMNormalizer(Normalizer):
"""Exponential moving normalizer: the normalization assigns exponentially
decayed weights to history samples.
"""
def __init__(self,
tensor_spec,
update_rate=1e-3,
auto_update=True,
zero_mean=True,
unit_std=False,
variance_epsilon=1e-10,
debug_summaries=False,
name="EMNormalizer"):
"""
Args:
tensor_spec (nested TensorSpec): specs of the mean of tensors to be
normalized.
update_rate (float): the update rate
auto_update (bool): If True, automatically update mean and variance
for each call to `normalize()`. Otherwise, the user needs to call
`update()`
zero_mean (bool): whether to make the normalized value be zero-mean
unit_std (bool): whether assume a unit std or not when normalizing.
If True, then the rewards are just subtracted by the mean.
variance_epislon (float): a small value added to std for normalizing
debug_summaries (bool): whether to generate debug summaries
name (str):
"""
self._update_rate = update_rate
super(EMNormalizer, self).__init__(
tensor_spec=tensor_spec,
auto_update=auto_update,
zero_mean=zero_mean,
unit_std=unit_std,
variance_epsilon=variance_epsilon,
debug_summaries=debug_summaries,
name=name)
def _create_averager(self):
"""Returns an exponential moving averager."""
return EMAverager(self._tensor_spec, self._update_rate)
[docs]@alf.configurable
class ScalarEMNormalizer(EMNormalizer):
def __init__(self,
update_rate=1e-3,
auto_update=True,
variance_epsilon=1e-10,
zero_mean=True,
unit_std=False,
debug_summaries=False,
name="ScalarEMNormalizer"):
super(ScalarEMNormalizer, self).__init__(
tensor_spec=TensorSpec((), dtype='float32'),
update_rate=update_rate,
auto_update=auto_update,
zero_mean=zero_mean,
unit_std=unit_std,
variance_epsilon=variance_epsilon,
debug_summaries=debug_summaries,
name=name)
[docs]@alf.configurable
class AdaptiveNormalizer(Normalizer):
def __init__(self,
tensor_spec,
speed=8.0,
auto_update=True,
zero_mean=True,
unit_std=False,
variance_epsilon=1e-10,
debug_summaries=False,
name="AdaptiveNormalizer"):
"""This normalizer gives higher weight to more recent samples for
calculating mean and variance. Roughly speaking, the weight for each
sample at time t is proportional to (t/T)^(speed-1), where T is the
current time step. See docs/streaming_averaging_amd_sampling.py for
detail.
Args:
tensor_spec (nested TensorSpec): specs of the mean of tensors to be
normalized.
speed (float): speed of updating mean and variance.
auto_update (bool): If True, automatically update mean and variance
for each call to `normalize()`. Otherwise, the user needs to call
`update()`
zero_mean (bool): whether to make the normalized value be zero-mean
unit_std (bool): whether assume a unit std or not when normalizing.
If True, then the rewards are just subtracted by the mean.
variance_epislon (float): a small value added to std for normalizing
debug_summaries (bool): whether to generate debug summaries
name (str):
"""
self._speed = speed
super(AdaptiveNormalizer, self).__init__(
tensor_spec=tensor_spec,
auto_update=auto_update,
variance_epsilon=variance_epsilon,
zero_mean=zero_mean,
unit_std=unit_std,
debug_summaries=debug_summaries,
name=name)
def _create_averager(self):
"""Create an adaptive averager."""
return AdaptiveAverager(
tensor_spec=self._tensor_spec, speed=self._speed)
[docs]@alf.configurable
class ScalarAdaptiveNormalizer(AdaptiveNormalizer):
def __init__(self,
speed=8.0,
auto_update=True,
zero_mean=True,
unit_std=False,
variance_epsilon=1e-10,
debug_summaries=False,
name="ScalarAdaptiveNormalizer"):
super(ScalarAdaptiveNormalizer, self).__init__(
tensor_spec=TensorSpec((), dtype='float32'),
speed=speed,
auto_update=auto_update,
zero_mean=zero_mean,
unit_std=unit_std,
variance_epsilon=variance_epsilon,
debug_summaries=debug_summaries,
name=name)