# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Prior action policies for KL regularized RL."""
import numpy as np
import torch
import torch.distributions as td
from torch.distributions import Categorical, Independent, Uniform
import alf
from alf.algorithms.algorithm import Algorithm
from alf.data_structures import AlgStep, TimeStep, StepType
from alf.tensor_specs import TensorSpec, BoundedTensorSpec
[docs]def normcdf(a, b):
c = 0.7071067811865476 # = math.sqrt(0.5)
return 0.5 * (torch.erf(c * b) - torch.erf(c * a))
[docs]class TruncatedNormal(td.Distribution):
def __init__(self, loc, scale, low, high, validate_args=None):
"""Normal distribution truncated to the range between ``low`` and ``high``.
Currently, only ``log_prob()`` is implemented.
Args:
loc (Tensor): mean of the untruncated Normal
scale (Tensor): standard deviation of the untruncated Normal
low (Tensor): lower range of the truncation range
high (Tensor): upper range of the truncation range
"""
self._loc = loc
self._scale = scale
self._low = low
self._high = high
super().__init__(batch_shape=loc.shape, validate_args=validate_args)
[docs] def log_prob(self, value):
"""Log-probability of ``value``.
Args:
value (Tensor): the samples whose log_prob is to calculated
Returns:
log probability of ``value``
"""
scale = self._scale
loc = self._loc
var = scale**2
log_scale = scale.log()
low = (self._low - loc) / scale
high = (self._high - loc) / scale
# 0.9189385332046727 = math.log(math.sqrt(2 * math.pi))
log_prob = -(
(value - loc)**2) / (2 * var) - log_scale - 0.9189385332046727
return log_prob - normcdf(low, high).log()
[docs] def sample(self):
raise NotImplementedError()
[docs] def rsample(self):
raise NotImplementedError()
[docs]@alf.configurable
class SameActionPriorActor(Algorithm):
def __init__(self,
observation_spec,
action_spec: BoundedTensorSpec,
same_action_noise=0.1,
same_action_prob=0.9,
debug_summaries=False,
name="SameActionPriorActor"):
"""
``SameActionPriorActor`` can be used as a prior for KLD regularized RL-algorithms.
It encodes the prior intuition that the next action should be same as the
previous action most of time. More specifically, the distribution for each
action dimension is a mixture of two components:
1. a flat ``TruncatedNormal`` with ``loc`` equal to the median of the
action range ``scale`` equal to the action range.
2. a sharp ``TruncatedNormal`` with ``loc`` equal to the previous action
and scale equal to the action range multiplied by ``same_action_noise``.
The mixture weight depends on step_type:
1. If the step_type is FIRST, the mixture weight is [1.0, 0]
2. Otherwise the mixture weight is [1-same_actin_prob, same_actin_prob]
Args:
observation_spec (nested TensorSpec): representing the observations.
action_spec (nested BoundedTensorSpec): representing the actions.
same_action_noise (float): the noise added to the previous action if
the new action is the same as the previous action.
same_action_prob (float): the probability that the next action is same
as the previous action.
debug_summaries (bool): True if debug summaries should be created.
name (str): The name of this algorithm.
"""
super().__init__(
train_state_spec=(), debug_summaries=debug_summaries, name=name)
def _prepare_spec(action_spec):
spec = {}
assert action_spec.is_continuous, "Discrete action is not supported"
spec['minimum'] = torch.as_tensor(
np.broadcast_to(action_spec.minimum,
action_spec.shape)).reshape(
1, *action_spec.shape, 1)
spec['maximum'] = torch.as_tensor(
np.broadcast_to(action_spec.maximum,
action_spec.shape)).reshape(
1, *action_spec.shape, 1)
spec['background_loc'] = 0.5 * (
spec['minimum'] + spec['maximum']).squeeze(-1)
spec['scale'] = torch.cat([
spec['maximum'] - spec['minimum'],
(spec['maximum'] - spec['minimum']) * same_action_noise
],
dim=-1)
mix_prob = torch.tensor([1. - same_action_prob, same_action_prob])
spec['mix_logits'] = mix_prob.log().reshape(
1, *([1] * len(action_spec.shape)), 2)
spec['pure_logits'] = torch.tensor([0., -100.])
spec['shape'] = action_spec.shape
spec['continuous'] = True
return spec
self._action_spec = action_spec
flat_action_spec = alf.nest.flatten(action_spec)
self._prepared_specs = [
_prepare_spec(spec) for spec in flat_action_spec
]
def _make_dist(self, step_type, prev_action, spec):
logits = spec['mix_logits'].expand(*prev_action.shape, -1).clone()
logits[step_type == StepType.FIRST] = spec['pure_logits']
mix = Categorical(logits=logits)
loc = torch.stack(
[spec['background_loc'].expand_as(prev_action), prev_action],
dim=-1)
components = TruncatedNormal(loc, spec['scale'], spec['minimum'],
spec['maximum'])
return Independent(
base_distribution=td.MixtureSameFamily(mix, components),
reinterpreted_batch_ndims=prev_action.ndim - 1)
[docs] def predict_step(self, inputs: TimeStep, state):
"""Calculate the disribution of the next action.
Args:
inputs (TimeStep): time step structure
Returns:
AlgStep:
- output (Distribution): the distribution of the action
- state: ()
- info: ()
"""
flat_prev_action = alf.nest.flatten(inputs.prev_action)
dists = [
self._make_dist(inputs.step_type, prev_action,
spec) for prev_action, spec in zip(
flat_prev_action, self._prepared_specs)
]
return AlgStep(
output=alf.nest.pack_sequence_as(self._action_spec, dists),
state=(),
info=())
[docs] def rollout_step(self, inputs: TimeStep, state):
return self.predict_step(inputs, state)
[docs] def train_step(self, inputs: TimeStep, state, unroll_info=()):
return self.predict_step(inputs, state)