Source code for alf.environments.metadrive.extra_rewards

# Copyright (c) 2022 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
from typing import Tuple
import numpy as np
import torch

import alf

from metadrive.component.vehicle.base_vehicle import BaseVehicle
from metadrive.engine.base_engine import BaseEngine
from alf.tensor_specs import TensorSpec


[docs]@alf.configurable class ExtraReward(abc.ABC): """Base class for MetaDrive extra rewards. The required interface are defined here. """ def __init__(self): pass
[docs] @abc.abstractmethod def evaluate(self, engine: BaseEngine) -> Tuple[dict, dict]: """Evaluate the reward. This is the main API that generate the reward and related information based on the current states of the MetaDrive engine. Args: engine: The MetaDrive simulator's engine object. It provides access to various states of the simulator, including the ones of the ego vehicle. Returns: A pair of dictionaries. The first dictionary is a map of invidual reward names to their values. The second dictionary are extra env info that will be added to the environment's env info when this reward is turned on. """ return {}, {}
[docs] def reset(self): """Reset the internal states of the reward. Some of the reward maintains internal states for computing rewards (e.g. history buffer for computing the derivatives). Such internal states are reset at the end of each episode via overriding this method. """ pass
[docs] @abc.abstractmethod def env_info_spec(self): """Returns the env_info_spec generated by this reward. This is useful for extending the default env info spec of MetaDrive. """ return {}
def _regularize_angle(x): return (x + np.pi) % (2 * np.pi) - np.pi def _estimate_derivative_1(seq: np.ndarray, h: float = 0.1): """Estimate the first order derivative, with 1 delay .. math:: f'(x) = \frac{f{x + h} - f{x - h)}{2h} + O(h^2) """ return (seq[-1] - seq[-3]) / (2.0 * h) def _estimate_derivative_2(seq: np.ndarray, h: float = 0.1, is_angle: bool = False): """Estimate the second order derivative, with 1 delay .. math:: f''(x) = \frac{f{x + h} - 2f(x) + f{x - h)}{h^2} + O(h^2) """ d = seq[-1] + seq[-3] - 2.0 * seq[-2] if is_angle: d = _regularize_angle(d) return d / (h * h) def _estimate_derivative_3(seq: np.ndarray, h: float = 0.1, is_angle: bool = False): """Estimate the second order derivative, with 2 delay .. math:: f'''(x) = \frac{f(x + 2h) - f(x - 2h) -2[f(x + h) - f(x - h)]}{2h^3} + O(h^2) """ d = seq[-1] - seq[-5] - 2.0 * (seq[-2] - seq[-4]) if is_angle: d = _regularize_angle(d) return d / (2.0 * h * h * h)
[docs]def squared_jerk_cost(jerk: float, speed: float, jerk_deadband: float = 4.0, speed_deadband: float = 1.5, scale: float = 1e-3, cap: float = 0.8): """Produce a cost based on the jerk. The function is a squared cost of the violation that only activate when the jerk is above its deadband and the speed is also above its deadband. Also note that the cost is always POSITIVE or zero. Args: jerk: the value of the jerk speed: the value of the speed jerk_deadband: cost is 0.0 if jerk's abs value is below this threshold speed_deadband: cost is 0.0 if speed is below this threshold scale: the scale of the squared violation cap: if the scaled squared violation will be capped by this value """ if abs(jerk) < jerk_deadband or speed < speed_deadband: return 0.0 diff = abs(jerk) - jerk_deadband return min(diff * diff * scale, cap)
[docs]def squared_brake_cost(lon_acc: float, speed: float, harsh_brake_limit: float = -1.2, speed_deadband: float = 2.0, scale: float = 2.0, cap: float = 1.0): """Produce a cost based on the (harsh) brake. The function is a squared cost of the violation that only activate when the lon acceleration is beyond the harsh brake limit and the speed is also above its deadband. Also note that the cost is always POSITIVE or zero. Args: lon_acc: the longitudinal acceleration speed: the value of the speed harsh_brake_limit: the threshold used to determine whether the lon_acc is considered a harsh brake to be penalized. speed_deadband: cost is 0.0 if speed is below this threshold scale: the scale of the squared violation cap: if the scaled squared violation will be capped by this value """ if lon_acc > harsh_brake_limit or speed < speed_deadband: return 0.0 diff = harsh_brake_limit - lon_acc return min(diff * diff * scale, cap)
[docs]def is_harsh_brake(lon_acc: float, speed: float): """Simple empirical thresholds for harsh brake. """ if speed < 2.0: return False elif speed < 15.0: # Within 15 m/s, typical local driving where braking at -3.0 # is noticeable and uncomfortable. return lon_acc < -3.0 else: # High way and express way range, -1.2 is noticeable and # uncomfortable. return lon_acc < -1.2
[docs]@alf.configurable class EgoKinematicReward(ExtraReward): """The comfort rewards that are based on the kinematics of the ego vehicle. Args: harsh_brake_cost_func: the function that converts lon acc to a reward lon_jerk_cost_func: the function that converts lon jerk to a reward lat_jerk_cost_func: the function that converts lat jerk to a reward """ def __init__(self, harsh_brake_cost_func=squared_brake_cost, lon_jerk_cost_func=squared_jerk_cost, lat_jerk_cost_func=squared_jerk_cost): super().__init__() self._harsh_brake_cost_func = harsh_brake_cost_func self._lon_jerk_cost_func = lon_jerk_cost_func self._lat_jerk_cost_func = lat_jerk_cost_func # Stores the history of the linear speed and heading for computing the # derivatives. self._history_speed = np.zeros(3) self._history_heading = np.zeros(5)
[docs] def evaluate(self, engine: BaseEngine): ego: BaseVehicle = engine.managers["agent_manager"].active_agents[ "default_agent"] self._history_heading = np.roll(self._history_heading, -1) self._history_heading[-1] = ego.heading_theta self._history_speed = np.roll(self._history_speed, -1) self._history_speed[-1] = ego.speed / 3.6 # km/h -> m/s lon_acc = _estimate_derivative_1(self._history_speed) lon_jerk = _estimate_derivative_2(self._history_speed) lat_acc = _estimate_derivative_2(self._history_heading, is_angle=True) lat_jerk = _estimate_derivative_3(self._history_heading, is_angle=True) speed = self._history_speed[-1] lon_acc_cost = self._harsh_brake_cost_func( lon_acc=lon_acc, speed=speed) lon_jerk_cost = self._lon_jerk_cost_func(jerk=lon_jerk, speed=speed) lat_jerk_cost = self._lat_jerk_cost_func(jerk=lat_jerk, speed=speed) rewards = { "lon_acc": -lon_acc_cost, "lon_jerk": -lon_jerk_cost, "lat_jerk": -lat_jerk_cost, } info = { "MetaDrive/harsh_brake": is_harsh_brake(lon_acc=lon_acc, speed=speed), "MetaDrive/lon_acc": lon_acc, "MetaDrive/lon_jerk": lon_jerk, "MetaDrive/lat_acc": lat_acc, "MetaDrive/lat_jerk": lat_jerk, "MetaDrive/costs/lon_acc": lon_acc_cost, "MetaDrive/costs/lon_jerk": lon_jerk_cost, "MetaDrive/costs/lat_jerk": lat_jerk_cost, } return rewards, info
[docs] def reset(self): self._history_speed = np.zeros(3) self._history_heading = np.zeros(5)
[docs] def env_info_spec(self): return { "MetaDrive/harsh_brake": TensorSpec(shape=(), dtype=torch.float32), "MetaDrive/lon_acc": TensorSpec(shape=(), dtype=torch.float32), "MetaDrive/lon_jerk": TensorSpec(shape=(), dtype=torch.float32), "MetaDrive/lat_acc": TensorSpec(shape=(), dtype=torch.float32), "MetaDrive/lat_jerk": TensorSpec(shape=(), dtype=torch.float32), "MetaDrive/costs/lon_acc": TensorSpec(shape=(), dtype=torch.float32), "MetaDrive/costs/lon_jerk": TensorSpec(shape=(), dtype=torch.float32), "MetaDrive/costs/lat_jerk": TensorSpec(shape=(), dtype=torch.float32), }
[docs]@alf.configurable class LaneKeepingReward(ExtraReward): """The reward that penalizes riding a broken line. A broken line resides between two lanes that permits lane change. This reward particularly encourages the car to stay in the lane unless it wants to perform lane change or overtaking. Args: broken_line_cost: the penalty for one step if the ego car is riding on a broken line. """ def __init__(self, broken_line_cost: float = 0.05): super().__init__() self._broken_line_cost = broken_line_cost
[docs] def evaluate(self, engine: BaseEngine): ego: BaseVehicle = engine.managers["agent_manager"].active_agents[ "default_agent"] rewards = { "on_broken_line": -self._broken_line_cost if ego.on_broken_line else 0.0 } info = {"MetaDrive/on_broken_line": float(ego.on_broken_line)} return rewards, info
[docs] def env_info_spec(self): return { "MetaDrive/on_broken_line": TensorSpec(shape=(), dtype=torch.float32) }
[docs]@alf.configurable class CrashVehicleReward(ExtraReward): """The EXTRA reward that penalizes ego car from crashing into another vehicle. MetaDrive has already has a flat penalty towards all kinds of crashing (road boundary, vehicle, objects, etc). To further penalize crashing into vehicles over crashing into the road boundary, we added this extra reward. Note that the episode will END when a crash happens on the ego car. This means that such reward is imposed at most ONCE per episode. Args: cost: the extra cost imposed when the crash is with another vehicle. """ def __init__(self, cost: float = 20.0): super().__init__() self._cost = cost
[docs] def evaluate(self, engine: BaseEngine): ego: BaseVehicle = engine.managers["agent_manager"].active_agents[ "default_agent"] rewards = {"crash_vehicle": -self._cost if ego.crash_vehicle else 0.0} info = {"MetaDrive/crash_vehicle": 1.0 if ego.crash_vehicle else 0.0} return rewards, info
[docs] def env_info_spec(self): return { "MetaDrive/crash_vehicle": TensorSpec(shape=(), dtype=torch.float32) }