Source code for alf.algorithms.functional_particle_vi_algorithm

# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ParticleVI algorithm on parameterized functions."""

from absl import logging
import functools
import math
import numpy as np
import torch
import torch.nn.functional as F
from typing import Callable

import alf
from alf.algorithms.algorithm import Algorithm
from alf.algorithms.config import TrainerConfig
from alf.algorithms.particle_vi_algorithm import ParVIAlgorithm
from alf.data_structures import AlgStep, LossInfo, namedtuple
from alf.networks import EncodingNetwork, ParamNetwork
from alf.tensor_specs import TensorSpec
from alf.nest.utils import get_outer_rank
from alf.utils import common, math_ops, summary_utils
from alf.utils.summary_utils import record_time
from alf.utils.sl_utils import classification_loss, regression_loss, auc_score
from alf.utils.sl_utils import predict_dataset


def _expand_to_replica(inputs, replicas, spec):
    """Expand the inputs of shape [B, ...] to [B, n, ...] if n > 1,
        where n is the number of replicas. When n = 1, the unexpanded
        inputs will be returned.
    Args:
        inputs (Tensor): the input tensor to be expanded
        spec (TensorSpec): the spec of the unexpanded inputs. It is used to
            determine whether the inputs is already an expanded one. If it
            is already expanded, inputs will be returned without any
            further processing.
    Returns:
        Tensor: the expaneded inputs or the original inputs.
    """
    outer_rank = get_outer_rank(inputs, spec)
    if outer_rank == 1 and replicas > 1:
        return inputs.unsqueeze(1).expand(-1, replicas, *inputs.shape[1:])
    else:
        return inputs


[docs]@alf.configurable class FuncParVIAlgorithm(ParVIAlgorithm): """Functional ParVI Algorithm Functional ParVI algorithm maintains a set of functional particles, where each particle is a neural network. All particles are updated using particle-based VI approaches. There are two ways of treating a neural network as a particle: * All the weights of the neural network as a particle. * Outputs of the neural network for an input mini-batch as a particle. """ def __init__(self, data_creator=None, data_creator_outlier=None, input_tensor_spec=None, output_dim=None, param_net: ParamNetwork = None, conv_layer_params=None, fc_layer_params=None, use_conv_bias=False, use_conv_ln=False, use_fc_bias=True, use_fc_ln=False, activation=torch.relu_, last_activation=math_ops.identity, last_use_bias=True, last_use_ln=False, num_particles=10, entropy_regularization=1., loss_type="classification", voting="soft", par_vi="svgd", function_vi=False, function_bs=None, function_extra_bs_ratio=0.1, function_extra_bs_sampler='uniform', function_extra_bs_std=1., critic_hidden_layers=(100, 100), critic_iter_num=2, critic_l2_weight=10., critic_use_bn=True, num_train_classes=10, optimizer=None, critic_optimizer=None, logging_network=False, logging_training=False, logging_evaluate=False, config: TrainerConfig = None, debug_summaries=False, name="FuncParVIAlgorithm"): """ Args: data_creator (Callable): called as ``data_creator()`` to get a tuple of ``(train_dataloader, test_dataloader)`` data_creator_outlier (Callable): called as ``data_creator()`` to get a tuple of ``(outlier_train_dataloader, outlier_test_dataloader)`` input_tensor_spec (nested TensorSpec): the (nested) tensor spec of the input. If nested, then ``preprocessing_combiner`` must not be None. It must be provided if ``data_creator`` is not provided. output_dim (int): dimension of the output of the generated network. It must be provided if ``data_creator`` is not provided. param_net (ParamNetwork): input parametric network. conv_layer_params (tuple[tuple]): a tuple of tuples where each tuple takes a format ``(filters, kernel_size, strides, padding, pooling_kernel)``, where ``padding`` and ``pooling_kernel`` are optional. fc_layer_params (tuple[tuple]): a tuple of tuples where each tuple takes a format ``(FC layer sizes. use_bias)``, where ``use_bias`` is optional. use_conv_bias (bool|None): whether use bias for conv layers. If None, will use ``not use_bn`` for conv layers. use_conv_ln (bool): whether use layer normalization for conv layers. use_fc_bias (bool): whether use bias for fc layers. use_fc_ln (bool): whether use layer normalization for fc layers. activation (Callable): activation used for all the layers but the last layer. last_activation (Callable): activation function of the additional layer specified by ``last_layer_param``. Note that if ``last_layer_param`` is not None, ``last_activation`` has to be specified explicitly. last_use_bias (bool): whether use bias for the last layer last_use_ln (bool): whether use normalization for the last layer. num_particles (int): number of sampling particles entropy_regularization (float): weight of the repulsive term in par_vi. function_vi (bool): whether to use funciton value based par_vi, current supported by [``svgd2``, ``svgd3``, ``gfsf``]. function_bs (int): mini batch size for par_vi training. Needed for critic initialization when function_vi is True. function_extra_bs_ratio (float): ratio of extra sampled batch size w.r.t. the function_bs. function_extra_bs_sampler (str): type of sampling method for extra training batch, types are [``uniform``, ``normal``]. function_extra_bs_std (float): std of the normal distribution for sampling extra training batch when using normal sampler. critic_hidden_layers (tuple): sizes of hidden layers of the critic, used for ``minmax``. critic_l2_weight (float): weight of L2 regularization in training the critic, used for ``minmax``. critic_iter_num (int): number of critic updates for each generator train_step, used for ``minmax``. critic_use_bn (book): whether use batch norm for each layers of the critic, used for ``minmax``. critic_optimizer (torch.optim.Optimizer): Optimizer for training the critic, used for ``minmax``. loss_type (str): loglikelihood type for the generated functions, types are [``classification``, ``regression``] voting (str): types of voting results from sampled functions, types are [``soft``, ``hard``] par_vi (str): types of particle-based methods for variational inference, types are [``svgd``, ``gfsf``, ``minmax``] * svgd: empirical expectation of SVGD is evaluated by reusing the same batch of particles. * gfsf: wasserstein gradient flow with smoothed functions. It involves a kernel matrix inversion, so computationally more expensive, but in some cases the convergence seems faster than svgd approaches. function_vi (bool): whether to use function value based par_vi. num_train_classes (int): number of classes in training set. optimizer (torch.optim.Optimizer): The optimizer for training. logging_network (bool): whether logging the archetectures of networks. logging_training (bool): whether logging loss and acc during training. logging_evaluate (bool): whether logging loss and acc of evaluate. config (TrainerConfig): configuration for training name (str): """ if data_creator is not None: trainset, testset = data_creator() if data_creator_outlier is not None: outlier_dataloaders = data_creator_outlier() else: outlier_dataloaders = None self.set_data_loader(trainset, testset, outlier_dataloaders) input_tensor_spec = TensorSpec(shape=trainset.dataset[0][0].shape) if hasattr(trainset.dataset, 'classes'): output_dim = len(trainset.dataset.classes) else: output_dim = num_train_classes input_tensor_spec = input_tensor_spec else: assert input_tensor_spec is not None and output_dim is not None, ( "input_tensor_spec and output_dim need to be provided if " "data_creator is not provided") self._train_loader = None self._test_loader = None last_layer_size = output_dim if param_net is None: assert input_tensor_spec is not None param_net = ParamNetwork( input_tensor_spec=input_tensor_spec, conv_layer_params=conv_layer_params, fc_layer_params=fc_layer_params, use_conv_bias=use_conv_bias, use_conv_ln=use_conv_ln, use_fc_bias=use_fc_bias, use_fc_ln=use_fc_ln, n_groups=num_particles, activation=activation, last_layer_size=last_layer_size, last_activation=last_activation, last_use_bias=last_use_bias, last_use_ln=last_use_ln) particle_dim = param_net.param_length if logging_network: logging.info("Each network") logging.info("-" * 68) logging.info(param_net) super().__init__( particle_dim, num_particles=num_particles, entropy_regularization=entropy_regularization, par_vi=par_vi, critic_hidden_layers=critic_hidden_layers, critic_l2_weight=critic_l2_weight, critic_iter_num=critic_iter_num, critic_use_bn=critic_use_bn, critic_optimizer=critic_optimizer, optimizer=optimizer, debug_summaries=debug_summaries, name=name) self._param_net = param_net self._param_net.set_parameters(self.particles.data, reinitialize=True) self._loss_type = loss_type self._logging_training = logging_training self._logging_evaluate = logging_evaluate self._config = config self._function_vi = function_vi if function_vi: assert function_bs is not None, ( "need to specify batch_size of function outputs.") self._function_extra_bs = math.ceil( function_bs * function_extra_bs_ratio) self._function_extra_bs_sampler = function_extra_bs_sampler self._function_extra_bs_std = function_extra_bs_std assert (voting in ['soft', 'hard']), ('voting only supports "soft" and "hard"') self._voting = voting if loss_type == 'classification': self._loss_func = classification_loss self._vote = self._classification_vote elif loss_type == 'regression': self._loss_func = regression_loss self._vote = self._regression_vote else: raise ValueError("Unsupported loss_type: %s" % loss_type)
[docs] def set_data_loader(self, train_loader, test_loader=None, outlier_data_loaders=None, entropy_regularization=None): """Set data loadder for training and testing. Args: train_loader (torch.utils.data.DataLoader): training data loader test_loader (torch.utils.data.DataLoader): testing data loader outlier_data_loaders (tuple[torch.utils.data.DataLoader): (trainloader, testloader) for outlier datasets entropy_regularization (float): weight of particle VI repulsive term. """ self._train_loader = train_loader self._test_loader = test_loader if entropy_regularization is not None: self._entropy_regularization = entropy_regularization if outlier_data_loaders is not None: assert isinstance(outlier_data_loaders, tuple), "outlier dataset "\ "must be provided in the format (outlier_train, outlier_test)" self._outlier_train_loader = outlier_data_loaders[0] self._outlier_test_loader = outlier_data_loaders[1] else: self._outlier_train_loader = self._outlier_test_loader = None
[docs] def predict_step(self, inputs, params=None, state=None): """Predict ensemble outputs for inputs using the hypernetwork model. Args: inputs (Tensor): inputs to the ensemble of networks. params (Tensor): parameters of the ensemble of networks, if None, use self.particles. state (None): not used. Returns: AlgStep: - output (Tensor): predictions with shape ``[batch_size, self._param_net._output_spec.shape[0]]`` - state (None): not used """ if params is None: params = self.particles self._param_net.set_parameters(params) outputs, _ = self._param_net(inputs) return AlgStep(output=outputs, state=(), info=())
[docs] def train_iter(self, state=None): """Perform one epoch (iteration) of training. Args: state (None): not used Returns: mini_batch number """ assert self._train_loader is not None, "Must set data_loader first." alf.summary.increment_global_counter() with record_time("time/train"): loss = 0. if self._loss_type == 'classification': avg_acc = [] for batch_idx, (data, target) in enumerate(self._train_loader): data = data.to(alf.get_default_device()) target = target.to(alf.get_default_device()) alg_step = self.train_step((data, target), state=state) loss_info, params = self.update_with_gradient(alg_step.info) loss += loss_info.extra.loss if self._loss_type == 'classification': avg_acc.append(alg_step.info.extra.extra) acc = None if self._loss_type == 'classification': acc = torch.as_tensor(avg_acc).mean() * 100 if self._logging_training: if self._loss_type == 'classification': logging.info("Avg acc: {}".format(acc)) logging.info("Cum loss: {}".format(loss)) self.summarize_train(loss_info, params, cum_loss=loss, avg_acc=acc) return batch_idx + 1
[docs] def train_step(self, inputs, entropy_regularization=None, loss_mask=None, state=None): """Perform one batch of training computation. Args: inputs (nested Tensor): input training data. entropy_regularization (float): weight of the repulsive term in par_vi. If None, use self._entropy_regularization. loss_mask (Tensor): mask indicating which samples are valid for loss propagation. state (None): not used Returns: AlgStep: - output(Tensor): shape is ``[batch_size, dim]`` - state: not used - info (LossInfo): loss """ if entropy_regularization is None: entropy_regularization = self._entropy_regularization if self._function_vi: data, target = inputs return super().train_step( loss_func=functools.partial(self._function_neglogprob, target), transform_func=functools.partial(self._function_transform, data), entropy_regularization=entropy_regularization, loss_mask=loss_mask, state=()) else: return super().train_step( loss_func=functools.partial(self._neglogprob, inputs), entropy_regularization=entropy_regularization, state=())
def _function_transform(self, data, params): """ Transform the particles to its corresponding function values evaluated on the training batch. Used when function_vi is True. Args: data (Tensor): training batch input. params (Tensor): parameter tensor for param_net. Returns: outputs (Tensor): outputs of param_net under params evaluated on data. density_outputs (Tensor): outputs of param_net under params evaluated on sampled extra data. """ # sample extra data if isinstance(params, tuple): params, extra_samples = params else: sample = data[-self._function_extra_bs:] noise = torch.zeros_like(sample) if self._function_extra_bs_sampler == 'uniform': noise.uniform_(0., 1.) else: noise.normal_(mean=0., std=self._function_extra_bs_std) extra_samples = sample + noise num_particles = params.shape[0] self._param_net.set_parameters(params) aug_data = torch.cat([data, extra_samples], dim=0) aug_outputs, _ = self._param_net(aug_data) # [B+b, P, D] outputs = aug_outputs[:data.shape[0]] # [B, P, D] outputs = outputs.transpose(0, 1) outputs = outputs.view(num_particles, -1) # [P, B * D] density_outputs = aug_outputs[-extra_samples.shape[0]:] # [b, P, D] density_outputs = density_outputs.transpose(0, 1) # [P, b, D] density_outputs = density_outputs.view(num_particles, -1) # [P, b * D] return outputs, density_outputs def _function_neglogprob(self, targets, outputs): """Function computing negative log_prob loss for function outputs. Used when function_vi is True. Args: targets (Tensor): target values of the training batch. outputs (Tensor): function outputs to evaluate the loss. Returns: Negative log_prob for outputs evaluated on current training batch. """ num_particles = outputs.shape[0] if self._loss_type == 'regression': # [B, D] -> [B, N, D] targets = _expand_to_replica(targets, num_particles, self._param_net.output_spec) # [B, N, D] -> [N, B, D] targets = targets.permute(1, 0, 2) # [N, B, D] -> [N, -1] targets = targets.view(num_particles, -1) else: # [B] -> [B, 1] targets = targets.unsqueeze(1) # [B, 1] -> [N, B, 1] targets = targets.unsqueeze(0).expand(num_particles, *targets.shape) return self._loss_func(outputs, targets) def _neglogprob(self, inputs, params): """Function computing negative log_prob loss for generator outputs. Used when function_vi is False. Args: inputs (Tensor): (data, target) of training batch. params (Tensor): generator outputs to evaluate the loss. Returns: Negative log_prob for params evaluated on current training batch. """ self._param_net.set_parameters(params) num_particles = params.shape[0] data, target = inputs output, _ = self._param_net(data) # [B, N, D] if self._loss_type == 'regression': # [B, d] -> [B, N, d] target = _expand_to_replica(target, num_particles, self._param_net.output_spec) else: # [B] -> [B, N] target = target.unsqueeze(1).expand(*target.shape[:1], num_particles) return self._loss_func(output, target)
[docs] def evaluate(self): """Evaluatation of the ParVI ensemble on a test dataset.""" assert self._test_loader is not None, "Must set test_loader first." logging.info("==> Begin evaluating") self._param_net.set_parameters(self.particles) with record_time("time/test"): if self._loss_type == 'classification': test_acc = 0. test_loss = 0. for i, (data, target) in enumerate(self._test_loader): data = data.to(alf.get_default_device()) target = target.to(alf.get_default_device()) output, _ = self._param_net(data) # [B, N, D] loss, extra = self._vote(output, target) if self._loss_type == 'classification': test_acc += extra.item() test_loss += loss.loss.item() if self._loss_type == 'classification': test_acc /= len(self._test_loader.dataset) alf.summary.scalar(name='eval/test_acc', data=test_acc * 100) if self._logging_evaluate: if self._loss_type == 'classification': logging.info("Test acc: {}".format(test_acc * 100)) logging.info("Test loss: {}".format(test_loss)) alf.summary.scalar(name='eval/test_loss', data=test_loss)
def _classification_vote(self, output, target): """Ensemble the outputs from sampled classifiers.""" num_particles = output.shape[1] probs = F.softmax(output, dim=-1) # [B, N, D] if self._voting == 'soft': pred = probs.mean(1).cpu() # [B, D] vote = pred.argmax(-1) elif self._voting == 'hard': pred = probs.argmax(-1).cpu() # [B, N, 1] vote = [] for i in range(pred.shape[0]): values, counts = torch.unique( pred[i], sorted=False, return_counts=True) modes = (counts == counts.max()).nonzero() label = values[torch.randint(len(modes), (1, ))] vote.append(label) vote = torch.as_tensor(vote, device='cpu') correct = vote.eq(target.cpu().view_as(vote)).float().cpu().sum() target = target.unsqueeze(1).expand(*target.shape[:1], num_particles, *target.shape[1:]) loss = classification_loss(output, target) return loss, correct def _regression_vote(self, output, target): """Ensemble the outputs for sampled regressors.""" num_particles = output.shape[1] pred = output.mean(1) # [B, D] loss = regression_loss(pred, target) target = target.unsqueeze(1).expand(*target.shape[:1], num_particles, *target.shape[1:]) total_loss = regression_loss(output, target) return loss, total_loss
[docs] def eval_uncertainty(self): """Function to evaluate the epistemic uncertainty of the ensemble. This method computes the following metrics: * AUROC (AUC) evaluates the separability of model predictions with respect to the training data and a prespecified outlier dataset. AUC is computed with respect to the entropy in the averaged softmax probabilities, as well as the sum of the variance of the softmax probabilities over the ensemble. """ with torch.no_grad(): outputs = predict_dataset(self._param_net, self._test_loader) outputs_outlier = predict_dataset(self._param_net, self._outlier_test_loader) probs = F.softmax(outputs, -1) probs_outlier = F.softmax(outputs_outlier, -1) mean_probs = probs.mean(0) mean_probs_outlier = probs_outlier.mean(0) entropy = torch.distributions.Categorical(mean_probs).entropy() entropy_outlier = torch.distributions.Categorical( mean_probs_outlier).entropy() variance = F.softmax(outputs, -1).var(0).sum(-1) variance_outlier = F.softmax(outputs_outlier, -1).var(0).sum(-1) auroc_entropy = auc_score(entropy, entropy_outlier) auroc_variance = auc_score(variance, variance_outlier) logging.info("AUROC score (entropy): {}".format(auroc_entropy)) logging.info("AUROC score (variance): {}".format(auroc_variance)) alf.summary.scalar(name='eval/auroc_entropy', data=auroc_entropy) alf.summary.scalar(name='eval/auroc_variance', data=auroc_variance) return auroc_entropy, auroc_variance
[docs] def summarize_train(self, loss_info, params, cum_loss=None, avg_acc=None): """Generate summaries for training & loss info after each gradient update. The default implementation of this function only summarizes params (with grads) and the loss. An algorithm can override this for additional summaries. See ``RLAlgorithm.summarize_train()`` for an example. Args: experience (nested Tensor): samples used for the most recent ``update_with_gradient()``. By default it's not summarized. train_info (nested Tensor): ``AlgStep.info`` returned by either ``rollout_step()`` (on-policy training) or ``train_step()`` (off-policy training). By default it's not summarized. loss_info (LossInfo): loss params (list[Parameter]): list of parameters with gradients """ if self._config is not None: if self._config.summarize_grads_and_vars: summary_utils.summarize_variables(params) summary_utils.summarize_gradients(params) if self._config.debug_summaries: summary_utils.summarize_loss(loss_info) if cum_loss is not None: alf.summary.scalar(name='train_epoch/neglogprob', data=cum_loss) if avg_acc is not None: alf.summary.scalar(name='train_epoch/avg_acc', data=avg_acc)