Source code for alf.metrics.metrics

# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A set of metrics.
Converted to PyTorch from the TF version.
https://github.com/tensorflow/agents/blob/master/tf_agents/metrics/tf_metrics.py
"""
from typing import List
import numpy as np

import torch

import alf
import alf.utils.data_buffer as db
from alf.data_structures import TimeStep, StepType

from alf.utils import common
from . import metric


[docs]class MetricBuffer(torch.nn.Module): """A metric buffer for computing average metric values. The buffer is assumed to store only scalar values.""" def __init__(self, max_len, dtype): """ Args: max_len (int): maximum length of the buffer dtype (torch.dtype): dtype of the content of the buffer """ super().__init__() self._dtype = dtype self._buf = db.DataBuffer( data_spec=alf.tensor_specs.TensorSpec((), dtype=dtype), capacity=max_len, device='cpu')
[docs] def append(self, value): """Append multiple values to the buffer. Args: value (Tensor): a batch of scalars with the shape :math:`[B]`. """ self._buf.add_batch(value)
[docs] def mean(self): if self._buf.current_size == 0: # avoid nan return torch.tensor(0, dtype=self._dtype) return self._buf.get_all()[:self._buf.current_size].mean()
[docs] def latest(self): """Return the value added most recently. """ assert self._buf.current_size > 0, "no valid latest value!" return self._buf.get_batch_by_indices(self._buf.current_pos - 1)
[docs] def clear(self): return self._buf.clear()
[docs]class EnvironmentSteps(metric.StepMetric): """Counts the number of steps taken in the environment after FrameSkip. If Frames are skipped by any of the environment wrappers, a separate metric AverageEnvInfoMetric['num_env_frames'] will report the actual frame count including skipped ones. """ def __init__(self, name='EnvironmentSteps', prefix='Metrics', dtype=torch.int64): super().__init__(name=name, dtype=dtype, prefix=prefix) self.register_buffer('_environment_steps', torch.zeros((), dtype=dtype))
[docs] def call(self, time_step): """Increase the number of environment_steps according to ``time_step``. Step count is not increased on ``time_step.is_first()`` since that step is not part of any episode. Args: time_step (alf.data_structures.TimeStep): batched tensor Returns: The arguments, for easy chaining. """ steps = (torch.logical_not(time_step.is_first())).type(self._dtype) num_steps = torch.sum(steps) self._environment_steps.add_(num_steps) return time_step
[docs] def result(self): return self._environment_steps
[docs] def reset(self): self._environment_steps.fill_(0)
[docs]class NumberOfEpisodes(metric.StepMetric): """Counts the number of episodes in the environment.""" def __init__(self, name='NumberOfEpisodes', prefix='Metrics', dtype=torch.int64): super(NumberOfEpisodes, self).__init__( name=name, dtype=dtype, prefix=prefix) self.register_buffer('_number_episodes', torch.zeros((), dtype=dtype))
[docs] def call(self, time_step): """Increase the number of number_episodes according to ``time_step``. It would increase for all ``time_step.is_last()``. Args: time_step (alf.data_structures.TimeStep): batched tensor Returns: The arguments, for easy chaining. """ episodes = time_step.is_last().type(self._dtype) num_episodes = torch.sum(episodes) self._number_episodes.add_(num_episodes) return time_step
[docs] def result(self): return self._number_episodes
[docs] def reset(self): self._number_episodes.fill_(0)
[docs]class AverageEpisodicAggregationMetric(metric.StepMetric): """A base metric to aggregate quantities over an episode. It supports accumulating a nest of scalar values. NOTE: normally this class and its sub-classes report metrics by summing values over the whole episode. However, there are two special treatments: 1. if ``_extract_metric_values()`` returns a nested structure in which a dictionary or namedtuple has a field with postfix "@step", the corresponding value will be averaged instead of summed over the whole episode length, so that a per-step average value is reported. 2. If a field has a postfix "@max", then the aggregated value will be the maximum (instead of sum) of step values across the episode. This class supports partial aggregation, where if at any step the extracted metric value is not finite (inf or nan), then that step's value will be skipped for aggregation. If a field is skipped for an entire episode, its accumulated value won't be pushed into the metric buffer. """ def __init__(self, name="AverageEpisodicAggregationMetric", prefix='Metrics', dtype=torch.float32, buffer_size=10, example_time_step=None): """ Args: name (str): prefix (str): a prefix indicating the category of the metric dtype (torch.dtype): dtype of metric values. Should be floating types in order to be averaged. buffer_size (int): number of episodes the metric value will be averaged across example_time_step (nest): an example of the time step where the metric values are extracted from. If ``None``, a zero scalar is used as the example metric value. """ super(AverageEpisodicAggregationMetric, self).__init__( name=name, dtype=dtype, prefix=prefix) if example_time_step is None: example_metric_value = torch.zeros((), device='cpu') else: example_metric_value = self._extract_metric_values( example_time_step.cpu()) self._batch_size = alf.nest.get_nest_batch_size(example_time_step) self._buffer_size = buffer_size self._initialize(example_metric_value) # ``self._current_step`` will be set to zero for the first step, and is # added by one otherwise. Therefore, at the episode end, its value # equals to episode length - 1. self._current_step = torch.zeros(self._batch_size, device='cpu') def _extract_metric_values(self, time_step): """Extract metrics from the time step. The return can be a nest.""" raise NotImplementedError() def _initialize(self, example_metric_value): def _init_buf(val): return MetricBuffer(max_len=self._buffer_size, dtype=self._dtype) def _init_acc(val): accumulator = torch.zeros( self._batch_size, dtype=self._dtype, device='cpu') return accumulator def _init_mask(val): return torch.zeros( self._batch_size, dtype=torch.bool, device='cpu') def _init_step(val): return torch.zeros( self._batch_size, dtype=self._dtype, device='cpu') self._buffer = alf.nest.map_structure(_init_buf, example_metric_value) self._accumulator = alf.nest.map_structure(_init_acc, example_metric_value) # which samples of a batch in ``self._accumulator`` are valid for being # put into ``self._buffer`` when step_type==LAST self._mask = alf.nest.map_structure(_init_mask, example_metric_value) self._steps = alf.nest.map_structure(_init_step, example_metric_value)
[docs] def call(self, time_step): """Accumulate values from the time step. The values are defined by subclasses' ``_extract_metric_values()``. It will ignore the values of first time steps. Args: time_step (alf.data_structures.TimeStep): batched tensor Returns: The arguments, for easy chaining. """ self._current_step = torch.where(time_step.is_first(), torch.zeros_like(self._current_step), self._current_step + 1) values = self._extract_metric_values(time_step) assert all( alf.nest.flatten( alf.nest.map_structure( lambda val: list(val.shape) == [self._batch_size], values))), ("Value shape is not correct " "(only scalar values are supported).") is_first = time_step.is_first() def _update_accumulator_(path, mask, step, acc, val): """In-place update of the accumulators and mask.""" val_valid = torch.isfinite(val) # If at any step the value is valid, then the acc value becomes valid mask[:] = torch.where(is_first, torch.zeros_like(mask), mask | val_valid) # Only step+1 if the value is valid step[:] = torch.where(is_first, torch.zeros_like(step), step + val_valid.to(self._dtype)) if path.endswith("@max"): # Don't max invalid values val = torch.where(val_valid, val, torch.full_like( val, -np.inf)) acc[:] = torch.where(is_first, torch.full_like(acc, -float('inf')), torch.maximum(acc, val.to(self._dtype))) else: # Don't sum invalid values val = torch.where(val_valid, val, torch.zeros_like(val)) # Zero out batch indices where a new episode is starting. # Update with new values; Ignores first step whose reward comes from # the boundary transition of the last step from the previous episode. acc[:] = torch.where(is_first, torch.zeros_like(acc), acc + val.to(self._dtype)) alf.nest.py_map_structure_with_path(_update_accumulator_, self._mask, self._steps, self._accumulator, values) def _episode_end_aggregate_(path, mask, step, buf, acc): value = self._extract_and_process_acc_value( acc, last_episode_indices) # If the metric's name ends with '@step', the value will # be further averaged over episode length so that the # result is per-step value. if path.endswith('@step'): value = value / step[last_episode_indices] mask = mask[last_episode_indices] value = value[mask] if value.numel() > 0: buf.append(value) # Extract the final accumulated value and do customizable processing # via ``_extract_and_process_acc_value``, and add the processed # result to buffer last_episode_indices = torch.where(time_step.is_last())[0] if len(last_episode_indices) > 0: alf.nest.py_map_structure_with_path( _episode_end_aggregate_, self._mask, self._steps, self._buffer, self._accumulator) return time_step
def _extract_and_process_acc_value(self, acc, last_episode_indices): """Extract the final accumulated value and perform some optional customizable processing. Args: acc (Tensor): batched tensor representing an accumulator last_episode_indices (Tensor): indices representing the location of the position of the last step Returns: The value of the accumulator at the episode end. """ return acc[last_episode_indices]
[docs] def result(self): return alf.nest.map_structure(lambda buf: buf.mean(), self._buffer)
[docs] def latest(self): """Return the value added most recently. """ return alf.nest.map_structure(lambda buf: buf.latest(), self._buffer)
[docs] def reset(self): alf.nest.map_structure(lambda buf: buf.clear(), self._buffer) alf.nest.map_structure(lambda acc: acc.fill_(0), self._accumulator)
[docs]class AverageReturnMetric(AverageEpisodicAggregationMetric): """Metric for computing the average return.""" def __init__(self, example_time_step: TimeStep, name='AverageReturn', prefix='Metrics', dtype=torch.float32, buffer_size=10): super(AverageReturnMetric, self).__init__( name=name, dtype=dtype, prefix=prefix, buffer_size=buffer_size, example_time_step=example_time_step) def _extract_metric_values(self, time_step): """Accumulate immediate rewards to get episodic return.""" ndim = time_step.step_type.ndim if time_step.reward.ndim == ndim: return time_step.reward else: reward = time_step.reward.reshape(*time_step.step_type.shape, -1) return [reward[..., i] for i in range(reward.shape[-1])]
[docs]@alf.configurable class AverageDiscountedReturnMetric(AverageEpisodicAggregationMetric): """Metric for computing the average discounted episodic return. It is calculated according to the following formula: .. math:: \begin{array}{ll} R &=\frac{1}{L} (r_1 + (1+\gamma) r_2 + (1+\gamma+\gamma^2) r_3 + \cdots) \\ &= \frac{1}{L}\sum_{l=1}^L \sum_{k=0}^{l-1} \gamma^k r_l, \end{array} where :math:`\gamma` is the reward discount, and :math:`r_1` denotes the reward due to the first action, which is received at the second time step. :math:`L` equals to the episode length - 1. Note that if the last step is not due to time limit, the discounted return calculated from the formula above is unbiased. If the last step is due to time limit, it is a biased estimate and its expectation is lower than the ground-truth (when rewards are non-negative). """ def __init__(self, example_time_step: TimeStep, name='AverageDiscountedReturn', prefix='Metrics', dtype=torch.float32, discount=0.99, reward_transformer=None, buffer_size=10): """ Args: discount (float): the discount factor for calculating the discounted return reward_transformer (Callable): if provided, will calculate the discounted return using the transformed reward. It will be called as ``transformed_reward = reward_transformer(original_reward)``. reward_clip (tuple): in the format (min, max), to optionally plot return based on clipped reward when environment isn't clipping. """ self._discount = discount batch_size = alf.nest.get_nest_batch_size(example_time_step) self._accumulated_discount = torch.zeros(batch_size, device='cpu') self._timestep_discount = torch.zeros(batch_size, device='cpu') self._reward_transformer = reward_transformer self._current_step = torch.zeros(batch_size, device='cpu') super().__init__( name=name, dtype=dtype, prefix=prefix, buffer_size=buffer_size, example_time_step=example_time_step) def _extract_metric_values(self, time_step): """Accumulate discounted immediate rewards to get discounted episodic return. It also updates the accumulated discount and step count. """ self._update_discount(time_step) ndim = time_step.step_type.ndim reward = time_step.reward if self._reward_transformer is not None: # RewardNormalizer may change its statistics if the exe_mode is # ROLLOUT. We don't want that happen for metric calculation. old_mode = common.set_exe_mode(common.EXE_MODE_OTHER) reward = self._reward_transformer(reward) common.set_exe_mode(old_mode) if time_step.reward.ndim == ndim: discounted_reward = reward * self._accumulated_discount else: reward = reward.reshape(*time_step.step_type.shape, -1) discounted_reward = list( (reward * self._accumulated_discount.unsqueeze(-1)).permute( reward.ndim - 1, *torch.arange(reward.ndim - 1))) return discounted_reward def _update_discount(self, time_step): """Set/Update the values of ``self._accumulated_discount``. If this is the first step, ``self._accumulated_discount`` will be set to zero. Otherwise, it is multiplied by ``discount`` and added by one. The updated accumulated discount will be used for computing the accumulated contribution of the the reward received at the current step to the discounted return. Args: time_step (alf.data_structures.TimeStep): batched tensor """ is_first = time_step.is_first() # update discount for the next time step self._accumulated_discount = (self._discount * self._timestep_discount * self._accumulated_discount + 1) self._timestep_discount = time_step.discount self._accumulated_discount = torch.where( is_first, torch.zeros_like(self._accumulated_discount), self._accumulated_discount) def _extract_and_process_acc_value(self, acc, last_episode_indices): """Extract the final accumulated value and divide by the number of steps of an episode. Args: acc (Tensor): batched tensor representing an accumulator last_episode_indices (Tensor): indices representing the location of the position of the last step Returns: The value of the accumulator at the episode end. """ return acc[last_episode_indices] / self._current_step[ last_episode_indices]
[docs]@alf.configurable class EpisodicStartAverageDiscountedReturnMetric( AverageDiscountedReturnMetric): """Metric for computing the discounted return from episode start states. It is calculated according to the following formula: .. math:: \begin{array}{ll} R &=r_1 + \gamma r_2 + \gamma^2 r_3 + \cdots \\ &= \sum_{l=1}^L \gamma^{l-1} r_l, \end{array} where :math:`\gamma` is the reward discount, and :math:`r_1` denotes the reward due to the first action, which is received at the second time step. :math:`L` equals to the episode length - 1. Note that if the last step is not due to time limit, the discounted return calculated from the formula above is unbiased. If the last step is due to time limit, it is a biased estimate and its expectation is lower than the ground-truth (when rewards are non-negative). """ def __init__(self, example_time_step: TimeStep, name='EpisodicStartAverageDiscountedReturn', prefix='Metrics', buffer_size=10, reward_transformer=None): super().__init__( name=name, prefix=prefix, buffer_size=buffer_size, example_time_step=example_time_step, reward_transformer=reward_transformer) def _extract_metric_values(self, time_step): """Accumulate discounted immediate rewards to get discounted episodic return. It also updates the accumulated discount and step count. """ self._update_discount_and_step_count(time_step) ndim = time_step.step_type.ndim if time_step.reward.ndim == ndim: discounted_reward = time_step.reward * self._accumulated_discount else: reward = time_step.reward.reshape(*time_step.step_type.shape, -1) discounted_reward = list( (reward * self._accumulated_discount.unsqueeze(-1)).permute( reward.ndim - 1, *torch.arange(reward.ndim - 1))) return discounted_reward def _update_discount_and_step_count(self, time_step): """Set/Update the values of ``self._accumulated_discount`` and the value of ``self._current_step``. If this is the first step, ``self._accumulated_discount`` will be set to zero. Otherwise, it is multiplied by ``discount`` and added by one. The updated accumulated discount will be used for computing the accumulated contribution of the the reward received at the current step to the discounted return. ``self._current_step`` will be set to zero for the first step, and is added by one otherwise. Therefore, at the episode end, its value equals to episode length - 1. Args: time_step (alf.data_structures.TimeStep): batched tensor """ is_first = time_step.is_first() # update discount for the next time step self._accumulated_discount *= self._discount self._accumulated_discount.masked_fill_( self._accumulated_discount == 0, 1.) self._accumulated_discount.masked_fill_(is_first, 0.) def _extract_and_process_acc_value(self, acc, last_episode_indices): """Extract the final accumulated value. Args: acc (Tensor): batched tensor representing an accumulator last_episode_indices (Tensor): indices representing the location of the position of the last step Returns: The value of the accumulator at the episode end. """ return acc[last_episode_indices]
[docs]@alf.configurable class AverageRewardMetric(AverageDiscountedReturnMetric): """Metric for computing the average reward per time step for each episode. """ def __init__(self, example_time_step: TimeStep, name='AverageReward', prefix='Metrics', buffer_size=10): super().__init__( example_time_step=example_time_step, name=name, prefix=prefix, buffer_size=buffer_size, discount=0)
[docs]class AverageEpisodeLengthMetric(AverageEpisodicAggregationMetric): """Metric for computing the average episode length.""" def __init__(self, example_time_step: TimeStep, name='AverageEpisodeLength', prefix='Metrics', dtype=torch.float32, buffer_size=10): super(AverageEpisodeLengthMetric, self).__init__( name=name, dtype=dtype, prefix=prefix, buffer_size=buffer_size, example_time_step=example_time_step) def _extract_metric_values(self, time_step): """Return a constant of 1 each time, except for ``time_step.is_first()``. The first time step is the boundary step and needs to be ignored, different from ``tf_agents`` """ return torch.where(time_step.is_first(), torch.zeros_like(time_step.step_type), torch.ones_like(time_step.step_type))
[docs]@alf.configurable(whitelist=['fields']) class AverageEnvInfoMetric(AverageEpisodicAggregationMetric): """Metric for computing average quantities contained in the environment info. An example of env info (which can be a nest) has to be provided when constructing an instance in order to initialize the accumulator and buffer with the same nested structure. """ def __init__(self, example_time_step: TimeStep, name="AverageEnvInfoMetric", prefix="Metrics", dtype=torch.float32, fields: List[str] = None, buffer_size=10): """ Args: fields: a list of fields to include in the average env info metric. If None, all fields will be included. """ self._fields = fields super(AverageEnvInfoMetric, self).__init__( name=name, dtype=dtype, prefix=prefix, buffer_size=buffer_size, example_time_step=example_time_step) def _extract_metric_values(self, time_step): if self._fields is None: return time_step.env_info else: return {f: time_step.env_info[f] for f in self._fields}