# Copyright (c) 2019 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for generate summary."""
from absl import logging
import functools
import numpy as np
import os
import time
import torch
import torch.distributions as td
import alf
from alf.data_structures import LossInfo
from alf.nest import is_namedtuple, is_nested, py_map_structure_with_path, map_structure
from alf.utils import dist_utils
from alf.summary import should_record_summaries, get_global_counter
from typing import List, Optional
DEFAULT_BUCKET_COUNT = 30
def _summary_wrapper(summary_func):
"""Summary wrapper
Wrapper summary function to reduce cost for data computation
"""
@functools.wraps(summary_func)
def wrapper(*args, **kwargs):
if should_record_summaries():
summary_func(*args, **kwargs)
return wrapper
[docs]@_summary_wrapper
def histogram_discrete(name, data, bucket_min, bucket_max, step=None):
"""histogram for discrete data.
Args:
name (str): name for this summary
data (Tensor): A ``Tensor`` integers of any shape.
bucket_min (int): represent bucket min value
bucket_max (int): represent bucket max value
bucket count is calculate as ``bucket_max - bucket_min + 1``
and output will have this many buckets.
step (None|Tensor): step value for this summary. this defaults to
``alf.summary.get_global_counter()``
"""
bins = torch.arange(bucket_min, bucket_max + 1).cpu()
# For N bins, there should be N+1 bin edges
bin_edges = bins.to(torch.float32) - 0.5
bin_edges = torch.cat([bin_edges, bin_edges[-1:] + 1.])
alf.summary.histogram(name, data, step=step, bins=bin_edges)
[docs]@_summary_wrapper
def histogram_continuous(name,
data,
bucket_min=None,
bucket_max=None,
bucket_count=DEFAULT_BUCKET_COUNT,
step=None):
"""histogram for continuous data.
Args:
name (str): name for this summary
data (Tensor): A ``Tensor`` of any shape.
bucket_min (float|None): represent bucket min value,
if None value, ``data.min()`` will be used
bucket_max (float|None): represent bucket max value,
if None value, ``data.max()`` will be used
bucket_count (int): positive ``int``. The output will have this many buckets.
step (None|Tensor): step value for this summary. this defaults to
``alf.summary.get_global_counter()``
"""
data = data.to(torch.float64)
if bucket_min is None:
bucket_min = data.min()
else:
bucket_min = torch.as_tensor(bucket_min)
if bucket_max is None:
bucket_max = data.max()
else:
bucket_max = torch.as_tensor(bucket_max)
bins = (
bucket_min +
(torch.arange(bucket_count + 1, dtype=torch.float64) / bucket_count) *
(bucket_max - bucket_min))
data = data.clamp(bucket_min, bucket_max)
alf.summary.histogram(name, data, step=step, bins=bins.cpu())
[docs]@_summary_wrapper
@alf.configurable
def summarize_variables(name_and_params, with_histogram=True):
"""Add summaries for variables.
Args:
name_and_params (list[(str, Parameter)]): A list of ``(name, Parameter)``
tuples.
with_histogram (bool): If True, generate histogram.
"""
for var_name, var in name_and_params:
var_values = var
if with_histogram and torch.all(torch.isfinite(var_values)):
# Need to make sure all values are finite to avoid the histogram range
# error
alf.summary.histogram(
name='summarize_vars/' + var_name + '_value', data=var_values)
alf.summary.scalar(
name='summarize_vars/' + var_name + '_value_norm',
data=var_values.norm())
[docs]@_summary_wrapper
@alf.configurable
def summarize_gradients(name_and_params, with_histogram=True):
"""Add summaries for gradients.
Args:
name_and_params (list[(str, Parameter)]): A list of ``(name, Parameter)``
tuples.
with_histogram (bool): If True, generate histogram.
"""
for var_name, var in name_and_params:
if var.grad is None:
continue
grad_values = var.grad
if with_histogram:
if torch.all(grad_values.isfinite()):
alf.summary.histogram(
name='summarize_grads/' + var_name + '_gradient',
data=grad_values)
alf.summary.scalar(
name='summarize_grads/' + var_name + '_gradient_norm',
data=grad_values.norm())
alf.summary.histogram = _summary_wrapper(alf.summary.histogram)
[docs]@_summary_wrapper
def add_nested_summaries(prefix, data):
"""Add summary of a nest of data.
Args:
prefix (str): the prefix of the names of the summaries
data (dict or namedtuple): data to be summarized
"""
def _summarize(path, x):
if isinstance(x, torch.Tensor):
alf.summary.scalar(prefix + '/' + path, x)
py_map_structure_with_path(_summarize, data)
[docs]@_summary_wrapper
@alf.configurable
def summarize_per_category_loss(loss_info: LossInfo,
summarize_count: bool = False,
label_names: Optional[List[str]] = None):
"""Add summary about each category of the unaggregated ``loss_info.loss``
of the shape (T, B), or (B, ) by partitioning it according to
``loss_info.batch_label``, which has the same shape as ``loss_info.loss``.
It also creates summarization of the number of samples encountered
for each category.
Args:
loss_info: do per-category summarization if
``loss_info.batch_label`` is present, and skip otherwise
summarize_count: whether to summarize the number of samples
for each category as well
label_names: the names of each category to be used
in tensorboard summary. The category number will be used if
``label_names`` is None.
"""
if loss_info.batch_label != ():
assert loss_info.batch_label.shape == loss_info.loss.shape, (
"shape mis-match between batch_label shape {} and loss "
"shape {}".format(loss_info.batch_label.shape,
loss_info.loss.shape))
# (T, B) -> (T * B, )
loss = loss_info.loss.reshape(-1)
batch_label = loss_info.batch_label.int().reshape(-1)
labels = torch.unique(batch_label)
labels = labels.tolist()
for label in labels:
subset_indices = (batch_label == label)
subset_loss = loss[subset_indices]
if label_names is None:
label_str = label
else:
label_str = label_names[label]
alf.summary.scalar(
'loss/loss_for_category_{}'.format(label_str),
data=subset_loss.mean())
if summarize_count:
alf.summary.scalar(
'loss/sample_count_for_category_{}'.format(label_str),
data=subset_indices.sum())
else:
return
[docs]@_summary_wrapper
def summarize_loss(loss_info: LossInfo):
"""Add summary about ``loss_info``
Args:
loss_info (LossInfo): ``loss_info.extra`` must be a namedtuple
"""
if not isinstance(loss_info.loss, tuple):
alf.summary.scalar('loss', data=loss_info.loss)
if loss_info.gns != ():
alf.summary.scalar('gradient_noise_scale', data=loss_info.gns)
if not loss_info.extra:
return
# Support extra as namedtuple or dict (more flexible)
if is_namedtuple(loss_info.extra) or isinstance(loss_info.extra, dict):
add_nested_summaries('loss', loss_info.extra)
[docs]@_summary_wrapper
def summarize_nest(prefix, nest):
def _summarize(path, tensor):
add_mean_hist_summary(prefix + "/" + path, tensor)
alf.nest.py_map_structure_with_path(_summarize, nest)
[docs]@_summary_wrapper
def summarize_action(actions, action_specs, name="action"):
"""Generate histogram summaries for actions.
Actions whose rank is more than 1 will be skipped.
Args:
actions (nested Tensor): actions to be summarized
action_specs (nested TensorSpec): spec for the actions
name (str): name of the summary
"""
action_specs = alf.nest.flatten(action_specs)
actions = alf.nest.flatten(actions)
for i, (action, action_spec) in enumerate(zip(actions, action_specs)):
if len(action_spec.shape) > 1:
continue
if action_spec.is_discrete:
histogram_discrete(
name="%s/%s" % (name, i),
data=action,
bucket_min=int(action_spec.minimum),
bucket_max=int(action_spec.maximum))
else:
if len(action_spec.shape) == 0:
action_dim = 1
else:
action_dim = action_spec.shape[-1]
action = torch.reshape(action, (-1, action_dim))
def _get_val(a, i):
return a if len(a.shape) == 0 else a[i]
for a in range(action_dim):
histogram_continuous(
name="%s/%s/%s/value" % (name, i, a),
data=action[:, a],
bucket_min=_get_val(action_spec.minimum, a),
bucket_max=_get_val(action_spec.maximum, a))
alf.summary.scalar("%s/%s/%s/mean" % (name, i, a),
action[:, a].mean())
[docs]@_summary_wrapper
def summarize_distribution(name, distributions):
"""Generate summary for distributions.
Currently the following types of distributions are supported:
* Normal, StableCauchy, Beta: mean and std of each dimension will be summarized
* Above distribution wrapped by Independent and TransformedDistribution:
the base distribution is summarized
* Tensor: each dimenstion dist[..., a] will be summarized
Note that unsupported distributions will be ignored (no error reported).
Args:
name (str): name of the summary
distributions (nested td.distribuation.Distribution): distributions to
be summarized.
"""
actions = alf.nest.flatten(distributions)
for i, dist in enumerate(actions):
if isinstance(dist, torch.Tensor):
# dist might be a Tensor
action_dim = dist.shape[-1]
for a in range(action_dim):
add_mean_hist_summary("%s_loc/%s/%s" % (name, i, a),
dist[..., a])
else:
dist = dist_utils.get_base_dist(dist)
if isinstance(dist, (td.Normal, dist_utils.StableCauchy,
dist_utils.TruncatedDistribution)):
loc = dist.loc
log_scale = dist.scale.log()
elif isinstance(dist, td.Beta):
loc = dist.mean
log_scale = 0.5 * dist.variance.log()
else:
continue
action_dim = loc.shape[-1]
for a in range(action_dim):
add_mean_hist_summary("%s_log_scale/%s/%s" % (name, i, a),
log_scale[..., a])
add_mean_hist_summary("%s_loc/%s/%s" % (name, i, a),
loc[..., a])
[docs]def add_mean_hist_summary(name, value):
"""Generate mean and histogram summary of ``value``.
Args:
name (str): name of the summary
value (Tensor): tensor to be summarized
"""
alf.summary.histogram(name + "/value", value)
add_mean_summary(name + "/mean", value)
[docs]def safe_mean_hist_summary(name, value, mask=None):
"""Generate mean and histogram summary of ``value``.
It skips the summary if ``value`` is empty.
Args:
name (str): name of the summary
value (Tensor): tensor to be summarized
mask (bool Tensor): optional mask to indicate which element of value
to use. Its shape needs to be same as that of ``value``
"""
if mask is not None:
value = value[mask]
if np.prod(value.shape) > 0:
add_mean_hist_summary(name, value)
[docs]def add_mean_summary(name, value):
"""Generate mean summary of ``value``.
Args:
name (str): name of the summary
value (Tensor): tensor to be summarized
"""
if not value.dtype.is_floating_point:
value = value.to(torch.float32)
alf.summary.scalar(name, value.mean())
[docs]def safe_mean_summary(name, value, mask=None):
"""Generate mean summary of ``value``.
It skips the summary if ``value`` is empty.
Args:
name (str): name of the summary
value (Tensor): tensor to be summarized
mask (bool Tensor): optional mask to indicate which element of value
to use. Its shape needs to be same as that of ``value``
"""
if mask is not None:
value = value[mask]
if np.prod(value.shape) > 0:
add_mean_summary(name, value)
_contexts = {}
[docs]class record_time(object):
"""A context manager for record the time.
It records the average time spent under the context between
two summaries.
Example:
.. code-block:: python
with record_time("time/calc"):
long_function()
"""
def __init__(self, tag):
"""Create a context object for recording time.
By default, record_time will do cuda.synchronize() before entering and
after leaving the context to measure the time accurately. This behavior
can be disabled by setting environment variable ALF_RECORD_TIME_SYNC to 0
if you suspect synchronization slow down your code. See
https://pytorch.org/docs/stable/notes/cuda.html#asynchronous-execution.
Args:
tag (str): the summary tag for the the time.
"""
sync = os.environ.get("ALF_RECORD_TIME_SYNC", "1") != "0"
self._tag = tag
self._sync = sync
caller = logging.get_absl_logger().findCaller()
# token is a string of filename:lineno:tag
token = caller[0] + ':' + str(caller[1]) + ':' + tag
if token not in _contexts:
_contexts[token] = {'time': 0., 'c0': int(get_global_counter())}
self._counter = _contexts[token]
def __enter__(self):
if self._sync and torch.cuda.is_available():
torch.cuda.synchronize()
self._t0 = time.time()
def __exit__(self, type, value, traceback):
if self._sync and torch.cuda.is_available():
torch.cuda.synchronize()
self._counter['time'] += time.time() - self._t0
if should_record_summaries():
c0 = self._counter['c0']
c1 = int(get_global_counter())
if c1 > c0:
alf.summary.scalar(self._tag,
self._counter['time'] / (c1 - c0))
self._counter['time'] = .0
self._counter['c0'] = c1
[docs]def summarize_tensor_gradients(name, tensor, batch_dims=1, clone=False):
"""Summarize the gradient of ``tensor`` during backward.
Args:
name (str): name of the summary
tensor (nested Tensor): tensor of which the gradient is to be summarized.
batch_dims (int): first so many dimensions are treated as batch dimensions
clone (bool): If True, ``tensor`` will first be cloned. This is useful
if ``tensor`` is used in multiple places and you only want to summarize
the gradient from one place. If False, the gradient will be the sum
from all gradients backpropped to ``tensor``.
Returns:
``tensor`` or cloned ``tensor``: the cloned ``tensor`` should be used for
the downstream calculations.
"""
def _hook(grad, name):
norm = grad.reshape(*grad.shape[0:batch_dims], -1).norm(dim=-1)
alf.summary.scalar(name + '/max_norm', norm.max())
alf.summary.scalar(name + '/avg_norm', norm.mean())
def _register_hook1(tensor, name):
if tensor.requires_grad:
if clone:
tensor = tensor.clone()
tensor.register_hook(functools.partial(_hook, name=name))
return tensor
name = '/' + alf.summary.scope_name() + name
if not is_nested(tensor):
return _register_hook1(tensor, name)
else:
def _register_hook(path, x):
return _register_hook1(x, name + '/' + path)
tensor = py_map_structure_with_path(_register_hook, tensor)
return tensor
[docs]def summarize_distribution_gradient(name,
distribution,
batch_dims=1,
clone=False):
"""Summarize the gradient of the parameters of ``distribution`` during backward.
Args:
name (str): name of the summary
distribution (nested Distribution): distribution of which the gradient is to be summarized.
batch_dims (int): first so many dimensions are treated as batch dimensions
clone (bool): If True, ``distribution`` will first be cloned. This is useful
if ``distribution`` is used in multiple places and you only want to summarize
the gradient from one place. If False, the gradient will be the sum
from all gradients backpropped to ``distribution``.
Returns:
``distribution`` or cloned ``distribution``: the cloned ``distribution``
should be used for the downstream calculations.
"""
dist_params = dist_utils.distributions_to_params(distribution)
if clone:
spec = dist_utils.extract_spec(distribution)
dist_params = map_structure(torch.clone, dist_params)
distribution = dist_utils.params_to_distributions(dist_params, spec)
summarize_tensor_gradients(
name, dist_params, batch_dims=batch_dims, clone=False)
return distribution