Source code for alf.optimizers.traj_optimizers

# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import torch

import alf
from alf.data_structures import TimeStep
from alf.utils import tensor_utils


[docs]class TrajOptimizer(object): """Trajectory Optimizer Module This module generates optimized solution by minimizing a given cost function set through ``set_cost``. """ def __init__(self, *args, **kwargs): pass
[docs] def reset(self): pass
[docs] def obtain_solution(self, *args, **kwargs): pass
[docs]class RandomOptimizer(TrajOptimizer): def __init__(self, solution_dim, population_size, cost_func, upper_bound, lower_bound): """Random Trajectory Optimizer This module conducts trajectory optimization via random-shooting-based optimization, i.e., generating a random population for each sample in the batch and select those having the lowest cost as the solution. Args: solution_dim (int): The dimensionality of the problem space population_size (int): The number of candidate solutions to be sampled at every iteration cost_func (Callable): the cost function to be minimized. It takes as input: 1) init observation 2) action_sequence with the shape of [batch_size, population_size, solution_dim]) and returns a cost Tensor of shape [batch_size, population_size] upper_bound (float|Tensor): upper bounds for elements in solution lower_bound (float|Tensor): lower bounds for elements in solution """ super().__init__() self._solution_dim = solution_dim self._population_size = population_size self._upper_bound = upper_bound self._lower_bound = lower_bound self._cost_func = cost_func
[docs] def obtain_solution(self, observation): """Minimize the cost function provided Args: observation (Tensor): the initial observation for cost calculation """ batch_size = observation.shape[0] solutions = torch.rand( batch_size, self._population_size, self._solution_dim) * ( self._upper_bound - self._lower_bound) + self._lower_bound costs = self._cost_func(observation, solutions) min_ind = torch.argmin(costs, dim=-1).long() # solutions [B, pop_size, sol_dim] -> [B, sol_dim] solution = solutions[(torch.arange(batch_size), min_ind)] return solution
[docs]class CEMOptimizer(TrajOptimizer): def __init__(self, solution_dim, population_size, cost_func, upper_bound, lower_bound, elite_size=50, max_iter_num=5, epsilon=0.01, tau=0.9, min_var=1e-5): """Creates a CEM Optimizer This module optimizes a given cost function via the `Cross-Enrtopy Method <https://en.wikipedia.org/wiki/Cross-entropy_method>`_, which iterates between evaluating a population of samples generated from a probability distribution and updating the distribution based on the evaluation for generating better samples in the next iteration. In practice, a multi-dimensional Gaussian distribution with a diagonal covariance matrix is used. Args: solution_dim (int): the dimensionality of the problem space population_size (int): the number of candidate solutions to be sampled at every iteration cost_func (Callable): the cost function to be minimized. It takes as input: 1) init observation 2) action_sequence with the shape of [batch_size, population_size, solution_dim]) upper_bound (float|Tensor): upper bounds for elements in solution lower_bound (float|Tensor): lower bounds for elements in solution elite_size (int): the number of elites selected in each round. Elites represent the group of the top-elite_size members from the population based on their cost values. They are used to update the mean and variance of the Gaussian population generation distribution. max_iter_num (int|Tensor): the maximum number of CEM iterations epsilon (float): a minimum variance threshold. If the maximum variance of the population falls below it, the CEM iteration will stop. tau (float): a value in (0, 1) for softly updating the population mean and variance: mean = (1 - tau) * mean + tau * new_mean var = (1 - tau) * var + tau * new_var min_var (float): minimum value of the variance for the Gaussian distribution to sample from """ super().__init__() self._solution_dim = solution_dim self._population_size = population_size self._upper_bound = upper_bound self._lower_bound = lower_bound self._cost_func = cost_func self._elite_size = elite_size self._max_iter_num = max_iter_num self._epsilon = epsilon self._tau = tau self._min_var = min_var
[docs] def obtain_solution(self, observation, init_mean=None, init_var=None): """Minimize the cost function provided by using the CEM method. Args: observation (Tensor): the initial observation for cost calculation init_mean (None|Tensor): initial mean of the population. If None, the mean is initialized to have value as 0.5 * (self._upper_bound + self._lower_bound). init_var (None|Tensor): initial variance of the population. If None, the variance is initialized to have value as 0.5 * (upper_bound - lower_bound). """ batch_size = observation.shape[0] if init_mean is None: # [B, 1, solution_dim] init_mean = torch.ones(batch_size, 1, self._solution_dim) * \ (self._upper_bound + self._lower_bound) / 2. else: assert init_mean.shape == (batch_size, 1, self._solution_dim) if init_var is None: init_var = torch.ones(batch_size, 1, self._solution_dim) * \ (self._upper_bound - self._lower_bound) / 2. else: assert init_var.shape == (batch_size, 1, self._solution_dim) i = 0 pop_mean = init_mean pop_var = init_var # [B, population, solution_dim] samples_size = (batch_size, self._population_size, self._solution_dim) while i < self._max_iter_num and pop_var.max() > self._epsilon: pop_var = pop_var.clamp(min=self._min_var) samples = torch.randn(samples_size) * torch.sqrt( pop_var) + pop_mean # use bounded samples for evaluation bounded_samples = samples.clamp( min=self._lower_bound, max=self._upper_bound) costs = self._cost_func(observation, bounded_samples) # select elite set from the population ind = torch.topk(-costs, self._elite_size)[1] # samples: [batch, population, solution_dim] elites = samples[torch.arange(batch_size).unsqueeze(-1), ind] # update mean and var based on the elite set selected from the # unbounded samples new_mean = torch.mean(elites, dim=1, keepdim=True) new_var = torch.var(elites, dim=1, keepdim=True) pop_mean = (1 - self._tau) * pop_mean + self._tau * new_mean pop_var = (1 - self._tau) * pop_var + self._tau * new_var i = i + 1 # [B, 1, solution_dim] -> [B, solution_dim] return pop_mean.squeeze(1)