# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for doing moving average."""
import torch
import torch.nn as nn
import alf
from alf.tensor_specs import TensorSpec
from alf.utils.data_buffer import DataBuffer
from alf.nest.utils import get_outer_rank
[docs]def average_outer_dims(tensor, spec):
"""
Args:
tensor (Tensor): a single Tensor
spec (TensorSpec):
Returns:
the average tensor across outer dims
"""
outer_dims = get_outer_rank(tensor, spec)
return tensor.mean(dim=list(range(outer_dims)))
[docs]@alf.configurable
class WindowAverager(nn.Module):
def __init__(self,
tensor_spec: TensorSpec,
window_size,
name="WindowAverager"):
"""
WindowAverager calculate the average of the past ``window_size`` samples.
Args:
tensor_spec (nested TensorSpec): the ``TensorSpec`` for the value to be
averaged
window_size (int): the size of the window
name (str): name of this averager
"""
super().__init__()
self._name = name
self._buf = alf.nest.map_structure(
# Should put data on the default device instead of "cpu"
lambda spec: DataBuffer(spec, window_size, alf.get_default_device(
)),
tensor_spec)
self._tensor_spec = tensor_spec
[docs] def update(self, tensor):
"""Update the average.
Args:
tensor (nested Tensor): value for updating the average; outer dims
will be averaged first before being added.
Returns:
None
"""
alf.nest.map_structure(
lambda buf, t, spec: buf.add_batch(
average_outer_dims(t.detach(), spec).unsqueeze(0)), self._buf,
tensor, self._tensor_spec)
[docs] def get(self):
"""Get the current average.
Returns:
Tensor: the current average
"""
def _get(buf):
n = torch.max(buf.current_size,
torch.ones_like(buf.current_size)).to(torch.float32)
return torch.sum(buf.get_all(), dim=0) * (1. / n)
return alf.nest.map_structure(_get, self._buf)
[docs] def average(self, tensor):
"""Combines ``self.update`` and ``self.get`` in one step. Can be handy
in practice.
Args:
tensor (nested Tensor): a value for updating the average; outer dims
will be averaged first before being added
Returns:
Tensor: the current average
"""
self.update(tensor)
return self.get()
[docs]@alf.configurable
class ScalarWindowAverager(WindowAverager):
"""WindowAverager for scalar value"""
def __init__(self,
window_size,
dtype=torch.float32,
name="ScalarWindowAverager"):
"""
Args:
window_size (int): the size of the window
dtype (torch.dtype): dtype of the scalar
name (str): name of this averager
"""
super().__init__(
tensor_spec=TensorSpec(shape=(), dtype=dtype),
window_size=window_size,
name=name)
[docs]@alf.configurable
class EMAverager(nn.Module):
r"""Class for exponential moving average. Suppose the update rate is
:math:`\alpha`, and the quantity to be averaged is denoted as :math:`x`, then
.. math::
x_t = (1-\alpha)x_{t-1} + \alpha x
The average is corrected by a mass :math:`w_t` as :math:`\frac{x_t}{w_t}``,
and the mass is calculated as:
.. math::
w_t = (1-\alpha) * w_{t-1} + \alpha
Note that update rate can be a fixed floating number or a variable. If it is
a variable, the update rate can be changed by the user.
"""
def __init__(self, tensor_spec: TensorSpec, update_rate,
name="EMAverager"):
"""
Args:
tensor_spec (nested TensorSpec): the ``TensorSpec`` for the value to be
averaged
update_rate (float|Variable): the update rate
name (str): name of this averager
"""
super().__init__()
self._name = name
self._tensor_spec = tensor_spec
self._update_rate = update_rate
var_id = [0]
def _create_variable(tensor_spec):
var = tensor_spec.zeros()
self.register_buffer("_var%s" % var_id[0], var)
var_id[0] += 1
return var
self._average = alf.nest.map_structure(_create_variable, tensor_spec)
# mass can be shared by different structure elements
self.register_buffer("_mass", torch.zeros((), dtype=torch.float64))
[docs] def update(self, tensor):
"""Update the average.
Args:
tensor (nested Tensor): value for updating the average; outer dims
will be first averaged before being added to the average
Returns:
None
"""
alf.nest.map_structure(
lambda average, t, spec: average.add_(
torch.as_tensor(self._update_rate, dtype=t.dtype) * (
average_outer_dims(t.detach(), spec) - average)),
self._average, tensor, self._tensor_spec)
self._mass.add_(
torch.as_tensor(self._update_rate, dtype=torch.float64) *
(1 - self._mass))
[docs] def get(self):
"""Get the current average.
Returns:
Tensor: the current average
"""
return alf.nest.map_structure(
lambda average: average / self._mass.clamp(min=self._update_rate).
to(average.dtype), self._average)
[docs] def average(self, tensor):
"""Combines ``self.update`` and ``self.get`` in one step. Can be handy
in practice.
Args:
tensor (nested Tensor): a value for updating the average; outer dims
will be first averaged before being added to the average
Returns:
Tensor: the current average
"""
self.update(tensor)
return self.get()
[docs]@alf.configurable
class ScalarEMAverager(EMAverager):
"""EMAverager for scalar value"""
def __init__(self,
update_rate,
dtype=torch.float32,
name="ScalarEMAverager"):
"""
Args:
udpate_rate (float|Variable): update rate
dtype (torch.dtype): dtype of the scalar
name (str): name of this averager
"""
super().__init__(
tensor_spec=TensorSpec(shape=(), dtype=dtype),
update_rate=update_rate,
name=name)
[docs]@alf.configurable
class AdaptiveAverager(EMAverager):
"""Averager with adaptive update_rate.
This averager gives higher weight to more recent samples for calculating the
average. Roughly speaking, the weight for each sample at time :math:`t` is
roughly proportional to :math:`(t/T)^{speed-1}`, where :math:`T` is the
current time step. See ``notes/streaming_averaging_amd_sampling.py`` for
detail.
"""
def __init__(self,
tensor_spec: TensorSpec,
speed=10.,
name="AdaptiveAverager"):
"""
Args:
tensor_spec (nested TensorSpec): the ``TensorSpec`` for the value to be
averaged
speed (float): speed of updating mean and variance.
name (str): name of this averager
"""
update_rate = torch.ones((), dtype=torch.float64)
super().__init__(tensor_spec, update_rate)
self.register_buffer("_update_ema_rate", update_rate)
self.register_buffer("_total_steps",
torch.as_tensor(speed, dtype=torch.int64))
self._speed = speed
[docs] def update(self, tensor):
"""Update the average.
Args:
tensor (nested Tensor): a value for updating the average; outer dims
will be first averaged before being added to the average
"""
self._update_ema_rate.fill_(
self._speed / self._total_steps.to(torch.float64))
self._total_steps.add_(1)
super().update(tensor)
[docs]@alf.configurable
class ScalarAdaptiveAverager(AdaptiveAverager):
"""AdaptiveAverager for scalar value."""
def __init__(self,
speed=10,
dtype=torch.float32,
name="ScalarAdaptiveAverager"):
"""
Args:
speed (float): speed of updating mean and variance.
dtype (torch.dtype): dtype of the scalar
name (str): name of this averager
"""
super().__init__(
tensor_spec=TensorSpec(shape=(), dtype=dtype),
speed=speed,
name=name)