# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from typing import Callable
import alf
from alf.algorithms.off_policy_algorithm import OffPolicyAlgorithm
from alf.data_structures import AlgStep, namedtuple, TimeStep, StepType
from alf.nest import nest
from alf.optimizers.traj_optimizers import RandomOptimizer, CEMOptimizer
from alf.tensor_specs import TensorSpec
PlannerState = namedtuple("PlannerState", ["prev_plan"], default_value=())
PlannerInfo = namedtuple("PlannerInfo", ["planner"])
[docs]@alf.configurable
class PlanAlgorithm(OffPolicyAlgorithm):
"""Planning Module
This module plans for actions based on initial observation
and specified reward and dynamics functions
"""
def __init__(self,
feature_spec,
action_spec,
reward_spec=TensorSpec(()),
planning_horizon=25,
upper_bound=None,
lower_bound=None,
name="PlanningAlgorithm"):
"""Create a PlanningAlgorithm.
Args:
reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing
the reward(s).
planning_horizon (int): planning horizon in terms of time steps
upper_bound (int): upper bound for elements in solution;
action_spec.maximum will be used if not specified
lower_bound (int): lower bound for elements in solution;
action_spec.minimum will be used if not specified
particles_per_replica (int): number of particles used for each replica
"""
super().__init__(
feature_spec,
action_spec,
reward_spec=reward_spec,
train_state_spec=PlannerState(
prev_plan=TensorSpec((planning_horizon,
action_spec.shape[-1]))),
name=name)
flat_action_spec = nest.flatten(action_spec)
assert len(flat_action_spec) == 1, "doesn't support nested action_spec"
action_spec = flat_action_spec[0]
assert action_spec.is_continuous, "only support \
continious control"
self._num_actions = action_spec.shape[-1]
self._action_spec = action_spec
self._feature_spec = feature_spec
self._planning_horizon = planning_horizon
self._upper_bound = torch.Tensor(action_spec.maximum) \
if upper_bound is None else upper_bound
self._lower_bound = torch.Tensor(action_spec.minimum) \
if lower_bound is None else lower_bound
self._action_seq_cost_func = None
[docs] def train_step(self,
time_step: TimeStep,
state: PlannerState,
rollout_info=None):
"""
Args:
time_step (TimeStep): input data for dynamics learning
state (PlannerState): input planner state
Returns:
AlgStep:
output: empty tuple ()
state (PlannerState): updated planner state
info (PlannerInfo):
"""
pass
[docs] def set_action_sequence_cost_func(self, action_seq_cost_func):
"""Set a function for evaluating the action sequences for planning
Args:
action_seq_cost_func (Callable): cost function to be used for planning.
action_seq_cost_func takes initial observation and action sequences
of the shape [B, population, unroll_steps, action_dim] as input
and returns the accumulated cost along the unrolled trajectory, with
the shape of [B, population]
"""
self._action_seq_cost_func = action_seq_cost_func
[docs] def predict_plan(self, time_step: TimeStep, state: PlannerState,
epsilon_greedy):
"""Compute the plan based on the provided observation and action
Args:
time_step (TimeStep): input data for next step prediction
state (PlannerState): input planner state
Returns:
action: planned action for the given inputs
"""
pass
[docs]@alf.configurable
class RandomShootingAlgorithm(PlanAlgorithm):
"""Random Shooting-based planning method.
This method uses a Random Shooting approach to optimize an action
trajectory by minimizing a given cost function. The optimized action
trajectory is termed as a 'plan' which can be used by other components
such as a MPC-based controller. It has been used in `Neural Network Dynamics
for Model-Based Deep Reinforcement Learning with Model-Free Fine-Tuning
<https://arxiv.org/abs/1708.02596>`_
"""
def __init__(self,
feature_spec,
action_spec,
population_size,
reward_spec=TensorSpec(()),
planning_horizon=25,
upper_bound=None,
lower_bound=None,
name="RandomShootingAlgorithm"):
"""Create a RandomShootingAlgorithm.
Args:
population_size (int): the size of polulation for random shooting
reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing
the reward(s).
planning_horizon (int): planning horizon in terms of time steps
upper_bound (int): upper bound for elements in solution;
action_spec.maximum will be used if not specified
lower_bound (int): lower bound for elements in solution;
action_spec.minimum will be used if not specified
"""
super().__init__(
feature_spec=feature_spec,
action_spec=action_spec,
reward_spec=reward_spec,
planning_horizon=planning_horizon,
upper_bound=upper_bound,
lower_bound=lower_bound,
name=name)
flat_action_spec = nest.flatten(action_spec)
assert len(flat_action_spec) == 1, ("RandomShootingAlgorithm doesn't "
"support nested action_spec")
self._population_size = population_size
solution_size = self._planning_horizon * self._num_actions
self._solution_size = solution_size
# expand action bound to solution bound
solution_upper_bound = self._upper_bound.unsqueeze(0).expand(
planning_horizon, *self._upper_bound.shape).reshape(-1)
solution_lower_bound = self._lower_bound.unsqueeze(0).expand(
planning_horizon, *self._lower_bound.shape).reshape(-1)
self._plan_optimizer = RandomOptimizer(
solution_size,
self._population_size,
upper_bound=solution_upper_bound,
lower_bound=solution_lower_bound,
cost_func=self._calc_cost_for_action_sequence)
[docs] def train_step(self, time_step: TimeStep, state, rollout_info=None):
"""
Args:
time_step (TimeStep): input data for planning
state: state for planning (previous observation)
Returns:
AlgStep:
output: empty tuple ()
state (DynamicsState): state for training
info (DynamicsInfo):
"""
return AlgStep(output=(), state=state, info=())
[docs] def predict_plan(self, time_step: TimeStep, state: PlannerState,
epsilon_greedy):
assert self._action_seq_cost_func is not None, (
"specify "
"action sequence cost function before planning")
opt_action = self._plan_optimizer.obtain_solution(
time_step.observation)
# [B, horizon * action_dim] -> [B, horizon, action_dim]
opt_action = torch.reshape(
opt_action,
[opt_action.shape[0], self._planning_horizon, self._num_actions])
action = opt_action[:, 0]
return action, state
def _calc_cost_for_action_sequence(self, obs, ac_seqs):
"""
Args:
obs (Tensor): initial observation to start the rollout for the
evaluation of ac_seqs.
ac_seqs(Tensor): action_sequence of shape [batch_size,
population_size, solution_dim]), where
solution_dim = planning_horizon * num_actions
Returns:
cost (Tensor) with shape [batch_size, population_size]
"""
ac_seqs = ac_seqs.reshape(*ac_seqs.shape[0:2], self._planning_horizon,
-1)
cost = self._action_seq_cost_func(obs, ac_seqs)
return cost
[docs] def after_update(self, root_inputs, info):
pass
[docs]@alf.configurable
class CEMPlanAlgorithm(RandomShootingAlgorithm):
"""CEM-based planning method.
This method uses a Cross-Entropy Method (CEM) to optimize an action
trajectory by minimizing a given cost function. The optimized action
trajectory is termed as a 'plan' which can be used by other components
such as a MPC-based controller. This has been used by some MBRL works
such as `Deep Reinforcement Learning in a Handful of Trials using
Probabilistic Dynamics Models <https://arxiv.org/abs/1805.12114>`_
To speedup, when possible, we have used the plan obtained at the previous
time step to initialize the the mean of the plan distribution at the current
time step, after proper shifting and padding.
"""
def __init__(self,
feature_spec,
action_spec,
population_size,
planning_horizon,
reward_spec=TensorSpec(()),
elite_size=50,
max_iter_num=5,
epsilon=0.01,
tau=0.9,
scalar_var=None,
upper_bound=None,
lower_bound=None,
name="CEMPlanAlgorithm"):
"""Create a CEMPlanAlgorithm.
Args:
population_size (int): the size of polulation for optimization
planning_horizon (int): planning horizon in terms of time steps
reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing
the reward(s.)
elite_size (int): the number of elites selected in each round
max_iter_num (int|Tensor): the maximum number of CEM iterations
epsilon (float): a minimum variance threshold. If the variance of
the population falls below it, the CEM iteration will stop.
tau (float): a value in (0, 1) for softly updating the population
mean and variance:
.. code-block:: python
mean = (1 - tau) * mean + tau * new_mean
var = (1 - tau) * var + tau * new_var
scalar_var (None|float): the value that will be used to construct
the initial diagonal covariance matrix of the multi-dimensional
Gaussian used by the CEM optimizer. If value is None,
0.5 * (upper_bound - lower_bound) is used.
upper_bound (int): upper bound for elements in solution;
action_spec.maximum will be used if not specified
lower_bound (int): lower bound for elements in solution;
action_spec.minimum will be used if not specified
"""
super().__init__(
feature_spec=feature_spec,
action_spec=action_spec,
population_size=population_size,
reward_spec=reward_spec,
planning_horizon=planning_horizon,
upper_bound=upper_bound,
lower_bound=lower_bound,
name=name)
solution_size = planning_horizon * self._num_actions
self._plan_optimizer = CEMOptimizer(
solution_size,
self._population_size,
upper_bound=self._upper_bound,
lower_bound=self._lower_bound,
cost_func=self._calc_cost_for_action_sequence,
elite_size=elite_size,
max_iter_num=max_iter_num,
epsilon=epsilon,
tau=tau)
if scalar_var is None:
self._scalar_var = (self._upper_bound - self._lower_bound) / 2.
else:
self._scalar_var = scalar_var
[docs] def predict_plan(self, time_step: TimeStep, state: PlannerState,
epislon_greedy):
prev_plan = state.prev_plan
# [B, horizon, action_dim] -> [B, horizon*action_dim]
prev_solution = prev_plan.reshape(prev_plan.shape[0], -1)
prev_solution = prev_solution.clone()
prev_solution[time_step.step_type == StepType.FIRST] = torch.full(
(self._solution_size, ),
(self._upper_bound + self._lower_bound) / 2.)
init_mean = prev_solution.unsqueeze(1)
opt_action = self._plan_optimizer.obtain_solution(
time_step.observation,
init_mean=init_mean,
init_var=torch.ones_like(init_mean) * self._scalar_var)
# [B, horizon * action_dim] -> [B, horizon, action_dim]
opt_action = torch.reshape(
opt_action,
[opt_action.shape[0], self._planning_horizon, self._num_actions])
# [B, horizon, action_dim]
temporally_shifted_plan = torch.cat(
(opt_action[:, 1:], opt_action.mean(
dim=(0, 1), keepdim=True).expand(opt_action.shape[0], 1, -1)),
1)
action = opt_action[:, 0]
new_state = state._replace(prev_plan=temporally_shifted_plan)
return action, new_state