Source code for alf.utils.plot_tb_curves

# Copyright (c) 2020 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
import numpy as np
import os
import glob
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter

import matplotlib
import matplotlib.pyplot as plt
# Style gallery: https://tonysyu.github.io/raw_content/matplotlib-style-gallery/gallery.html
plt.style.use('seaborn-dark')

import alf.nest as nest
from alf.data_structures import namedtuple


def _compute_y_interval(interval_mode, ys):
    """Given several aligned y curves, compute a y value interval at each x.

    ``interval_mode`` should be one of the four options: 1) "std", 2) "minmax",
    and 3) "CI_X". The last one means confidence interval, where 'X'
    should be one of ``(80,85,90,95,99)`` indicating the confidence level (percentage).

    Returns:
        tuple - a triplet of mean of y, lower interval bound, and upper interval bound
    """
    CI_Z = {"80": 1.282, "85": 1.440, "90": 1.645, "95": 1.960, "99": 2.576}

    def _get_ci_z(interval_mode):
        """Get the corresponding Z value used in confidence interval computation."""
        # mode -> "CI_X" where X is the percentage
        ci_level = interval_mode.split("_")[1]
        assert ci_level in CI_Z, "Invalid level value %s!" % ci_level
        return CI_Z[ci_level]

    y = np.array(list(map(np.mean, zip(*ys))))
    std = np.array(list(map(np.std, zip(*ys))))
    if interval_mode == "std":
        min_y, max_y = y - std, y + std
    elif interval_mode == "minmax":
        min_y = np.array(list(map(np.min, zip(*ys))))
        max_y = np.array(list(map(np.max, zip(*ys))))
    elif interval_mode.startswith("CI_"):
        z = _get_ci_z(interval_mode)
        margin_err = std / np.sqrt(len(ys)) * z
        min_y, max_y = y - margin_err, y + margin_err
    else:
        raise ValueError("Invalid interval mode: %s" % interval_mode)

    return y, min_y, max_y


[docs]class MeanCurve( namedtuple( "MeanCurve", ['x', 'y', 'min_y', 'max_y', 'ay', 'min_ay', 'max_ay', 'name'], default_value=None)):
[docs] @classmethod def from_curves(cls, x, ys, interval_mode="std", name="MeanCurve"): """Compute various curve statistics from a set of individual curves ``ys`` and a common ``x``, and create a class instance. Args: x (np.array): x steps ys (list[np.array]): a list of curves interval_mode (str): mode for computing error margin around the mean y curve. Should be one of the four options: 1) "std", 2) "minmax", and 3) "CI_X". The last one means confidence interval, where 'X' should be one of ``(80,85,90,95,99)`` indicating the confidence level (percentage). name (str): """ # mean curve, lower and upper curve y, min_y, max_y = _compute_y_interval(interval_mode, ys) ays = [np.mean(y, keepdims=True) for y in ys] # mean average_y, lower and upper average_y # average_y can be used to indicate the changing trend of y ay, min_ay, max_ay = map(lambda z: z.squeeze(-1), _compute_y_interval(interval_mode, ays)) return cls( x=x, y=y, min_y=min_y, max_y=max_y, ay=ay, min_ay=min_ay, max_ay=max_ay, name=name)
[docs] def final_y(self, N=1): return tuple( map(lambda y: np.mean(y[-N:]), (self.y, self.min_y, self.max_y)))
[docs]class MeanCurveReader(object): """Read and compute a ``MeanCurve`` from one or multiple TB event files. A ``MeanCurveReader`` is suitable for one method on one task with multiple runs. """ def _get_metric_name(self): raise NotImplementedError() @property def x_label(self): raise NotImplementedError() @property def y_label(self): raise NotImplementedError() def __init__(self, event_file, x_steps=None, name="MeanCurveReader", smoothing=None, interval_mode="std"): """ Args: event_file (str|list[str]): a string or a list of strings where each should point to a valid TB dir, e.g., ending with "eval/" or "train/". The curves of these files will be averaged. It's the user's responsibility to ensure that it's meaningful to group these event files and show their mean and variance. x_steps (list[int]): we support merging curves that have different :math:`x` into a ``MeanCurve``. For example, if there are three curves: .. code-block:: python curve1 x: (1, 9), curve2 x: (0, 10), curve3 x: (0, 8), then the merged ``MeanCurve`` will have :math:`(1, 8)` as the final :math:`x` range. Each curve's new :math:`y` values will be interpolated w.r.t. this common :math:`x` range approperiately given their original :math:`y=f(x)` curve. The common :math:`x` range will be automatically determined as in the example if this argument ``x_steps==None``. Alternatively, the user can specify a pre-defined list of integers for interpolation. name (str): name of the mean curve. smoothing (int | float): if None, no smoothing is applied; if int, it's the window width of a Savitzky-Golay filter; if float, it's the smoothing weight of a running average (higher -> smoother). interval_mode (str): should be one of the four options: 1) "std", 2) "minmax", and 3) "CI_X". The last one means confidence interval, where 'X' should be one of ``(80,85,90,95,99)`` indicating the confidence level (percentage). Returns: MeanCurve: a mean curve structure. """ if not isinstance(event_file, list): event_file = [event_file] else: assert len(event_file) > 0, "Empty event file list!" ys = [] scalar_events_list = [] for ef in event_file: event_acc = EventAccumulator(ef) event_acc.Reload() # 'scalar_events' is a list of ScalarEvent(wall_time, step, value) scalar_events = event_acc.Scalars(self._get_metric_name()) scalar_events_list.append(scalar_events) if x_steps is None: max_x, min_x, num_steps = int(1e15), 0, 0 for scalar_events in scalar_events_list: steps = [se.step for se in scalar_events] max_x = min(max_x, steps[-1]) min_x = max(min_x, steps[0]) # In case we always summarize every step in the first interval, # len(steps) is much bigger than we expected. So we need to calculate. num_steps = max( num_steps, (steps[-1] - steps[0]) // (steps[-1] - steps[-2])) # calcuate x_steps by evenly dividing (min_x, max_x) assert max_x > min_x and num_steps > 1 delta_x = (max_x - min_x) / (num_steps - 1) x_steps = np.arange(num_steps) * delta_x + min_x for scalar_events in scalar_events_list: steps, values = zip(*[(se.step, se.value) for se in scalar_events]) y = self._interpolate_and_smooth_if_necessary( steps, values, x_steps, smoothing) ys.append(np.array(y)) x = x_steps self._mean_curve = MeanCurve.from_curves( x=x, ys=ys, interval_mode=interval_mode, name=name) self._name = name @property def name(self): return self._name def __call__(self): return self._mean_curve def _interpolate_and_smooth_if_necessary(self, steps, values, output_x, smoothing=None, kind="linear"): """First interpolate the ``(steps, values)`` pair to get a function. Then for the range ``(min_step, max_step)``, compute the values using the fitted function. Lastly apply a smoothing to the curve if a smoothing factor is specified. The reason why we have the interpolation is that the x steps are not always the same for multiple random runs (e.g., environment steps). So we need to first adjust x steps according to some reference minmax steps. Args: steps (list[int]): x values values (list[float]): y values output_x (list[int]): x values for the output curve smoothing (int | float): if None, no smoothing is applied; if int, it's the window width of a Savitzky-Golay filter; if float, it's the smoothing weight of a running average (higher -> smoother). kind (str): Interpolation type. Common options: "linear" (default), "nearest", "cubic", "quadratic", etc. For a complete list, see ``scipy.interpolate.interp1d()``. Returns: tuple: the first is the adjusted x values and the second is the interpolated and smoothed y values. """ # a rouch check to make sure the interpolation won't be too much assert abs(steps[-1] - output_x[-1]) / output_x[-1] < 0.05, ( "Inconsistent final steps! actual %d output %d" % (steps[-1], output_x[-1])) func = interp1d(steps, values, kind=kind, fill_value='extrapolate') new_values = func(output_x) if isinstance(smoothing, int): new_values = savgol_filter(new_values, smoothing, polyorder=1) elif smoothing is not None: assert 0 < smoothing < 1 new_values = ema_smooth(new_values, weight=smoothing) return new_values
[docs]def ema_smooth(scalars, weight=0.6, speed=64., adaptive=False, mode="forward"): r"""EMA smoothing, following TB's official implementation: https://github.com/tensorflow/tensorboard/blob/master/tensorboard/components/vz_line_chart2/line-chart.ts#L695 For adaptive EMA, the incoming weight decreases as the time increases. Args: scalars (list[float]): an array of floats to be smoothed, where the array index represents incoming time steps. weight (float): the weight of history. The history is updated as ``history * weight + scalar * (1 - weight)``. Only useful when ``adaptive=False``. speed (int): an integer number specifying the adpative weight. Only useful when ``adaptive=True``. A higher speed means a smaller average window. adaptive (bool): whether use adaptive weighting or not. If True, then later scalars will have smaller incoming weights (proportional to the inverse of array index). mode (str): "forward" | "both". For "forward" mode, the moving average goes from the array beginning to end. For "both" mode, the moving average has an additional backward pass, and the final smoothed value is an average of forward and backward passes. """ def _smooth_one_pass(scalars): last = 0 debias_w = 0 smoothed = [] w = weight for i, point in enumerate(scalars): if adaptive: w = 1 - speed / (i + speed) last = last * w + (1 - w) * point # Calculate smoothed value debias_w = debias_w * w + (1 - w) smoothed.append(last / debias_w) return smoothed smoothed_forward = _smooth_one_pass(scalars) if mode != "forward": smoothed_backward = _smooth_one_pass(scalars[::-1]) smoothed = np.mean( np.array([smoothed_forward, smoothed_backward[::-1]]), axis=0) else: smoothed = smoothed_forward return smoothed
[docs]class EnvironmentStepsReturnReader(MeanCurveReader): """Create a mean curve reader that reads AverageReturn values.""" def _get_metric_name(self): return "Metrics_vs_EnvironmentSteps/AverageReturn" @property def x_label(self): return "Environment Steps" @property def y_label(self): return "Average Episodic Return"
[docs]class EnvironmentStepsSuccessReader(MeanCurveReader): """Create a mean curve reader that reads Success rates.""" def _get_metric_name(self): return "Metrics_vs_EnvironmentSteps/success" @property def x_label(self): return "Environment Steps" @property def y_label(self): return "Success Rate"
[docs]class IterationsReturnReader(MeanCurveReader): """Create a mean curve reader that reads AverageReturn values.""" def _get_metric_name(self): return "Metrics/AverageReturn" @property def x_label(self): return "Training Iterations" @property def y_label(self): return "Average Episodic Return"
[docs]class IterationsSuccessReader(MeanCurveReader): """Create a mean curve reader that reads Success rates.""" def _get_metric_name(self): return "Metrics/success" @property def x_label(self): return "Training Iterations" @property def y_label(self): return "Success Rate"
[docs]class MeanCurveGroupReader(object): r"""Group several ``MeanCurveReader`` results. A ``MeanCurveGroupReader`` is suitable for one method on multiple tasks, each task with multiple runs. To aggregate across tasks, each task must be provided with a performance range :math:`(y_0, y_1)` that will be used to normalize performance for that task as :math:`\frac{y - y_0}{y_1 - y_0}`. If the ranges are not provided, no normalization will be done. The aggregation is simply averaging the statistics of individual ``MeanCurve``. """ def __init__(self, mean_curve_readers, task_performance_ranges=None, name="MeanCurveGroupReader"): """ Args: mean_curve_readers (list[MeanCurveReader]): a list of ``MeanCurveReader`` of multiple tasks for one method. It's the user's responsibility to ensure that it's meaningful to group these task event files and show their mean and variance. task_performance_ranges (list[tuple(float)]): a list of tuples, where each tuple is a pair of floats used for normalizing the corresponding task. If None, no normalization will be performed. name (str): name of the method """ def _normalize(y, y0, y1): return (y - y0) / (y1 - y0) if task_performance_ranges is None: task_performance_ranges = [(0., 1.)] * len(mean_curve_readers) assert len(mean_curve_readers) == len(task_performance_ranges) curves = [reader() for reader in mean_curve_readers] agg_vals = dict(y=[], min_y=[], max_y=[], ay=[], min_ay=[], max_ay=[]) for c, (y0, y1) in zip(curves, task_performance_ranges): assert len(c.x) == len(curves[0].x) for key in agg_vals.keys(): agg_vals[key].append(_normalize(getattr(c, key), y0, y1)) for key, val in agg_vals.items(): agg_vals[key] = np.mean(val, axis=0) self._mean_curve = MeanCurve( x=curves[0].x, name=curves[0].name, **agg_vals) self._x_label = mean_curve_readers[0].x_label self._name = name @property def x_label(self): return self._x_label @property def y_label(self): return "Normalized Score" @property def name(self): return self._name def __call__(self): return self._mean_curve
[docs]class CurvesPlotter(object): """Plot several ``MeanCurve``s in a figure. The curve colors will form a cycle over 10 default colors. The user should make sure that the ``MeanCurve``s to plot are meaningful to be compared in one figure. For each ``MeanCurve``, its ``y`` field will be plotted as the mean, its ``min_y`` and ``max_y`` will be plotted by a shaded area around ``y``, and its ``x`` determines the x-axis range. """ def __init__(self, mean_curves, y_clipping=None, x_range=None, y_range=None, x_ticks=None, x_label=None, y_label=None, x_scaled_and_aligned=False, figsize=(4, 4), dpi=100, linestyle='-', linewidth=2, std_alpha=0.2, colors=None, markers=None, bg_color='white', grid_color='#e6e5e3', plot_mean_only=False, legend_kwargs=dict(loc="best"), title=None): r""" Args: mean_curves (MeanCurve|list[MeanCurve]): each ``MeanCurve`` should correspond to a different method. x_range (tuple[float]): a tuple of ``(min_x, max_x)`` for showing on the figure. If None, then ``(0, 1)`` will be used. This argument is only used when ``x_scaled_and_aligned==True``. y_range (tuple[float]): a tuple of ``(min_y, max_y)`` for showing on the figure. If None, then it will be decided according to the ``y`` values. Note that this range won't change ``y`` data; it's only used by matplotlib for drawing ``y`` limits. x_ticks (list[float]): x ticks shown along x axis y_clipping (tuple[float]): the y values will be clipped to this range if not None. Because of smoothing in ``MeanCurveReader`` and/or std region, the input y values might be out of this range. x_label (str): shown besides x-axis y_label (str): shown besides y-axis x_scaled_and_aligned (bool): If True, the x axes of all ``MeanCurve`` will be scaled and aligned so that the lower and upper :math:`x` bounds of all curves will be ``x_range``, and each curve's :math:`x` axix will be proportionally scaled. If False, the :math:`x` axis will be plotted according to :math:`x` of each ``MeanCurve`` as it is. Note that this process only involves :math:`x` scaling and no interpolation of :math:`y` values will ever be performed. For example, we have three ``MeanCurves`` to be plotted in a figure: .. code-block:: python mean_curve1 x: (0, 100) mean_curve2 x: (20, 80) mean_curve3 x: (100, 200) with ``x_range==(0,1)``. Then in the plotted figure, the :math:`x` range (not x-ticks which can be specified differently!) will be .. code-block:: python mean_curve1 x: (0, 0.5) mean_curve2 x: (0.1, 0.4) mean_curve3 x: (0.5, 1) figsize (tuple[int]): a tuple of ints determining the size of the figure in inches. A larger figure size will allow for longer texts, more axes or more ticklabels to be shown. dpi (int): Dots per inches. How many pixels each inch contains. A ``figsize`` of ``(w,h)`` consists of ``w*h*dpi**2`` pixels. linestyle (str|list[str]): the line style to plot. Possible values: '-' ('solid'), '--' ('dashed'), '-.' (dashdot), and ':' ('dotted'). If a string, then all curves will have the same style; otherwise each option will apply to the corresponding curve. linewidth (int): the thickness of lines to plot. Default: 2. std_alpha (float): the transparency value for plotting shaded area around a curve. bg_color (str): the background color of the figure grid_color (str): color of the dashed grid lines plot_mean_only (bool): Whether only plot the mean curve without shaded regions. legend_kwargs (dict): kwargs for plotting the legend. If None, then no legend will be plotted. title (str): title of the figure """ self._fig, ax = plt.subplots(1, figsize=figsize, dpi=dpi) if not isinstance(mean_curves, list): mean_curves = [mean_curves] if colors is None: colors = ['C%d' % i for i in range(10)] if markers is None: markers = [''] * len(mean_curves) if x_scaled_and_aligned: if x_range is None: x_range = (0., 1.) scaled_x = [] # determine the lower and upper bounds of actual x min_x, max_x = int(1e15), 0 for mc in mean_curves: max_x = max(max_x, mc.x[-1]) min_x = min(min_x, mc.x[0]) def _scale(x): # compute a scaled x according to the bounds return ((x - min_x) / (max_x - min_x) * (x_range[-1] - x_range[0]) + x_range[0]) for mc in mean_curves: x0, x1 = _scale(mc.x[0]), _scale(mc.x[-1]) delta_x = (x1 - x0) / (len(mc.y) - 1) scaled_x.append(np.arange(len(mc.y)) * delta_x + x0) def _clip_y(y): return np.clip(y, y_clipping[0], y_clipping[1]) if y_clipping else y if not isinstance(linestyle, list): linestyle = [linestyle] * len(mean_curves) elif len(linestyle) < len(mean_curves): linestyle += linestyle[-1:] * (len(mean_curves) - len(linestyle)) for i, c in enumerate(mean_curves): color = colors[i % len(colors)] x = (scaled_x[i] if x_scaled_and_aligned else c.x) ax.plot( x, _clip_y(c.y), color=color, marker=markers[i], lw=linewidth, linestyle=linestyle[i], label=c.name) if not plot_mean_only: ax.fill_between( x, _clip_y(c.max_y), _clip_y(c.min_y), facecolor=color, alpha=std_alpha) if legend_kwargs is not None: ax.legend(**legend_kwargs) if bg_color is not None: ax.set_facecolor(bg_color) if grid_color is not None: ax.grid(linestyle='--', color=grid_color) else: ax.grid(linestyle='-') if x_ticks is not None: ax.set_xticks(x_ticks) ax.ticklabel_format(axis="x", style="sci", scilimits=(0, 0)) if y_range: ax.set_ylim(y_range) if x_label: ax.set_xlabel(x_label) if y_label: ax.set_ylabel(y_label) if title: ax.set_title(title)
[docs] def plot(self, output_path, dpi=200, transparent=False, close_fig=True): """Plot curves and save the figure to disk. Args: output_path (str): the output file path dpi (int): dpi for the figure. A higher value results in higher resolution. transparent (bool): If True, then the figure has a transparent background. close_fig (bool): whether to close/release this figure after plotting. If ``False``, the user has to close it manually. """ self._fig.savefig( output_path, dpi=dpi, transparent=transparent, bbox_inches='tight') if close_fig: plt.close(self._fig)
def _get_curve_path(dir): return os.path.join(os.getenv("HOME"), "tensorboard_curves", dir) if __name__ == "__main__": """Plotting examples.""" methods = ["sac", "ddpg"] tasks = ["kickball", "navigation"] curve_readers = [[ EnvironmentStepsReturnReader( event_file=glob.glob(_get_curve_path("%s_%s/*/eval" % (m, t))), x_steps=np.arange(0, 5000000, 10000), name="%s_%s" % (m, t), smoothing=3) for t in tasks ] for m in methods] # Scale and align x-axis of SAC and DDPG on task "kickball" plotter = CurvesPlotter([cr[0]() for cr in curve_readers], x_label=curve_readers[0][0].x_label, y_label=curve_readers[0][0].y_label, y_range=(0, 1.0), x_range=(0, 5000000)) plotter.plot(output_path="/tmp/kickball.pdf") # Now, to compare SAC with DDPG on navigation and kickball at the same time, # we use the normalized score. # [kickball, navigation] random_return = [0., -10.] # obtained by evaluating a random policy sac_trained_return = [100., 50.] # obtained by evaluating trained SAC task_performance_ranges = list(zip(random_return, sac_trained_return)) curve_group_readers = [ MeanCurveGroupReader(cr, task_performance_ranges, m) for m, cr in zip(methods, curve_readers) ] plotter = CurvesPlotter([cgr() for cgr in curve_group_readers], x_range=(0, 5000000), x_label=curve_group_readers[0].x_label, y_label=curve_group_readers[0].y_label) plotter.plot(output_path="/tmp/normalized_score.pdf")