# Copyright (c) 2022 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from typing import Tuple
import numpy as np
import torch
import alf
from metadrive.component.vehicle.base_vehicle import BaseVehicle
from metadrive.engine.base_engine import BaseEngine
from alf.tensor_specs import TensorSpec
def _regularize_angle(x):
return (x + np.pi) % (2 * np.pi) - np.pi
def _estimate_derivative_1(seq: np.ndarray, h: float = 0.1):
"""Estimate the first order derivative, with 1 delay
.. math::
f'(x) = \frac{f{x + h} - f{x - h)}{2h} + O(h^2)
"""
return (seq[-1] - seq[-3]) / (2.0 * h)
def _estimate_derivative_2(seq: np.ndarray,
h: float = 0.1,
is_angle: bool = False):
"""Estimate the second order derivative, with 1 delay
.. math::
f''(x) = \frac{f{x + h} - 2f(x) + f{x - h)}{h^2} + O(h^2)
"""
d = seq[-1] + seq[-3] - 2.0 * seq[-2]
if is_angle:
d = _regularize_angle(d)
return d / (h * h)
def _estimate_derivative_3(seq: np.ndarray,
h: float = 0.1,
is_angle: bool = False):
"""Estimate the second order derivative, with 2 delay
.. math::
f'''(x) = \frac{f(x + 2h) - f(x - 2h) -2[f(x + h) - f(x - h)]}{2h^3} + O(h^2)
"""
d = seq[-1] - seq[-5] - 2.0 * (seq[-2] - seq[-4])
if is_angle:
d = _regularize_angle(d)
return d / (2.0 * h * h * h)
[docs]def squared_jerk_cost(jerk: float,
speed: float,
jerk_deadband: float = 4.0,
speed_deadband: float = 1.5,
scale: float = 1e-3,
cap: float = 0.8):
"""Produce a cost based on the jerk.
The function is a squared cost of the violation that only activate when the
jerk is above its deadband and the speed is also above its deadband.
Also note that the cost is always POSITIVE or zero.
Args:
jerk: the value of the jerk
speed: the value of the speed
jerk_deadband: cost is 0.0 if jerk's abs value is below this threshold
speed_deadband: cost is 0.0 if speed is below this threshold
scale: the scale of the squared violation
cap: if the scaled squared violation will be capped by this value
"""
if abs(jerk) < jerk_deadband or speed < speed_deadband:
return 0.0
diff = abs(jerk) - jerk_deadband
return min(diff * diff * scale, cap)
[docs]def squared_brake_cost(lon_acc: float,
speed: float,
harsh_brake_limit: float = -1.2,
speed_deadband: float = 2.0,
scale: float = 2.0,
cap: float = 1.0):
"""Produce a cost based on the (harsh) brake.
The function is a squared cost of the violation that only activate when the
lon acceleration is beyond the harsh brake limit and the speed is also above
its deadband.
Also note that the cost is always POSITIVE or zero.
Args:
lon_acc: the longitudinal acceleration
speed: the value of the speed
harsh_brake_limit: the threshold used to determine whether the lon_acc
is considered a harsh brake to be penalized.
speed_deadband: cost is 0.0 if speed is below this threshold
scale: the scale of the squared violation
cap: if the scaled squared violation will be capped by this value
"""
if lon_acc > harsh_brake_limit or speed < speed_deadband:
return 0.0
diff = harsh_brake_limit - lon_acc
return min(diff * diff * scale, cap)
[docs]def is_harsh_brake(lon_acc: float, speed: float):
"""Simple empirical thresholds for harsh brake.
"""
if speed < 2.0:
return False
elif speed < 15.0:
# Within 15 m/s, typical local driving where braking at -3.0
# is noticeable and uncomfortable.
return lon_acc < -3.0
else:
# High way and express way range, -1.2 is noticeable and
# uncomfortable.
return lon_acc < -1.2
[docs]@alf.configurable
class EgoKinematicReward(ExtraReward):
"""The comfort rewards that are based on the kinematics of the ego vehicle.
Args:
harsh_brake_cost_func: the function that converts lon acc to a reward
lon_jerk_cost_func: the function that converts lon jerk to a reward
lat_jerk_cost_func: the function that converts lat jerk to a reward
"""
def __init__(self,
harsh_brake_cost_func=squared_brake_cost,
lon_jerk_cost_func=squared_jerk_cost,
lat_jerk_cost_func=squared_jerk_cost):
super().__init__()
self._harsh_brake_cost_func = harsh_brake_cost_func
self._lon_jerk_cost_func = lon_jerk_cost_func
self._lat_jerk_cost_func = lat_jerk_cost_func
# Stores the history of the linear speed and heading for computing the
# derivatives.
self._history_speed = np.zeros(3)
self._history_heading = np.zeros(5)
[docs] def evaluate(self, engine: BaseEngine):
ego: BaseVehicle = engine.managers["agent_manager"].active_agents[
"default_agent"]
self._history_heading = np.roll(self._history_heading, -1)
self._history_heading[-1] = ego.heading_theta
self._history_speed = np.roll(self._history_speed, -1)
self._history_speed[-1] = ego.speed / 3.6 # km/h -> m/s
lon_acc = _estimate_derivative_1(self._history_speed)
lon_jerk = _estimate_derivative_2(self._history_speed)
lat_acc = _estimate_derivative_2(self._history_heading, is_angle=True)
lat_jerk = _estimate_derivative_3(self._history_heading, is_angle=True)
speed = self._history_speed[-1]
lon_acc_cost = self._harsh_brake_cost_func(
lon_acc=lon_acc, speed=speed)
lon_jerk_cost = self._lon_jerk_cost_func(jerk=lon_jerk, speed=speed)
lat_jerk_cost = self._lat_jerk_cost_func(jerk=lat_jerk, speed=speed)
rewards = {
"lon_acc": -lon_acc_cost,
"lon_jerk": -lon_jerk_cost,
"lat_jerk": -lat_jerk_cost,
}
info = {
"MetaDrive/harsh_brake":
is_harsh_brake(lon_acc=lon_acc, speed=speed),
"MetaDrive/lon_acc":
lon_acc,
"MetaDrive/lon_jerk":
lon_jerk,
"MetaDrive/lat_acc":
lat_acc,
"MetaDrive/lat_jerk":
lat_jerk,
"MetaDrive/costs/lon_acc":
lon_acc_cost,
"MetaDrive/costs/lon_jerk":
lon_jerk_cost,
"MetaDrive/costs/lat_jerk":
lat_jerk_cost,
}
return rewards, info
[docs] def reset(self):
self._history_speed = np.zeros(3)
self._history_heading = np.zeros(5)
[docs] def env_info_spec(self):
return {
"MetaDrive/harsh_brake":
TensorSpec(shape=(), dtype=torch.float32),
"MetaDrive/lon_acc":
TensorSpec(shape=(), dtype=torch.float32),
"MetaDrive/lon_jerk":
TensorSpec(shape=(), dtype=torch.float32),
"MetaDrive/lat_acc":
TensorSpec(shape=(), dtype=torch.float32),
"MetaDrive/lat_jerk":
TensorSpec(shape=(), dtype=torch.float32),
"MetaDrive/costs/lon_acc":
TensorSpec(shape=(), dtype=torch.float32),
"MetaDrive/costs/lon_jerk":
TensorSpec(shape=(), dtype=torch.float32),
"MetaDrive/costs/lat_jerk":
TensorSpec(shape=(), dtype=torch.float32),
}
[docs]@alf.configurable
class LaneKeepingReward(ExtraReward):
"""The reward that penalizes riding a broken line.
A broken line resides between two lanes that permits lane change. This
reward particularly encourages the car to stay in the lane unless it wants
to perform lane change or overtaking.
Args:
broken_line_cost: the penalty for one step if the ego car is riding on
a broken line.
"""
def __init__(self, broken_line_cost: float = 0.05):
super().__init__()
self._broken_line_cost = broken_line_cost
[docs] def evaluate(self, engine: BaseEngine):
ego: BaseVehicle = engine.managers["agent_manager"].active_agents[
"default_agent"]
rewards = {
"on_broken_line":
-self._broken_line_cost if ego.on_broken_line else 0.0
}
info = {"MetaDrive/on_broken_line": float(ego.on_broken_line)}
return rewards, info
[docs] def env_info_spec(self):
return {
"MetaDrive/on_broken_line":
TensorSpec(shape=(), dtype=torch.float32)
}
[docs]@alf.configurable
class CrashVehicleReward(ExtraReward):
"""The EXTRA reward that penalizes ego car from crashing into another vehicle.
MetaDrive has already has a flat penalty towards all kinds of crashing (road boundary,
vehicle, objects, etc). To further penalize crashing into vehicles over crashing into
the road boundary, we added this extra reward.
Note that the episode will END when a crash happens on the ego car. This means that
such reward is imposed at most ONCE per episode.
Args:
cost: the extra cost imposed when the crash is with another vehicle.
"""
def __init__(self, cost: float = 20.0):
super().__init__()
self._cost = cost
[docs] def evaluate(self, engine: BaseEngine):
ego: BaseVehicle = engine.managers["agent_manager"].active_agents[
"default_agent"]
rewards = {"crash_vehicle": -self._cost if ego.crash_vehicle else 0.0}
info = {"MetaDrive/crash_vehicle": 1.0 if ego.crash_vehicle else 0.0}
return rewards, info
[docs] def env_info_spec(self):
return {
"MetaDrive/crash_vehicle":
TensorSpec(shape=(), dtype=torch.float32)
}