Source code for alf.environments.metadrive.agent_perception

# Copyright (c) 2022 Horizon Robotics and ALF Contributors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Tuple, Optional, NamedTuple
import functools

import torch
import numpy as np

from alf.tensor_specs import TensorSpec
from .geometry import FieldOfView, Polyline

try:
    import metadrive
    from metadrive.component.vehicle.base_vehicle import BaseVehicle
    from metadrive.engine.base_engine import BaseEngine
except ImportError:
    from unittest.mock import Mock
    # create 'metadrive' as a mock to not break python argument type hints
    metadrive = Mock()


[docs]class AgentPerception(object): """A perception module that once initialized can produced the vectorized feature of the dynamic road users (agents) that are visible to the ego car in the driving scenario. The essential method of AgentPerception is observe(), which is called upon every observation to generate the observation of the dynamic agents. Useful Notations: B - the batch size, a.k.a. the agent limit (see below) H - history_window_size, a.k.a. the number of historical steps F - the size of the feature for each of the agent at each step The final feature is a 3D tensor of shape [B, H, F]. The feature for each agent at each step is a vector of 7 elements: * The distance between the centers of ego and the agent (1) * The unit vector point from the center of ego to that of the agent (2) * The width and length of the agent (2) * The heading of agent (w.r.t. ego's heading) as an unit vector (2) """ def __init__(self, fov: FieldOfView, history_window_size: int, history_frame_skip: int = 4, agent_limit: int = 16): """Construct an AgentPerception instance. Args: fov: Describe the field of view (FOV) of the ego car. When generating the agent features, only those agent who is within the FOV result in the feature. history_window_size: The feature only tracks this number of historical frames (current frame included). history_frame_skip: Pick every this number of frames to form the historical trajectories for the agents. agent_limit: The maximum number of agents shown in the feature. If the number of visible agents exceeds this limit, the farthest ones are filtered out until this limit is satisfied. """ self._history_window_size = history_window_size self._fov = fov self._agent_limit = agent_limit self._unit_feature_size = 7 self._history_frame_skip = history_frame_skip self._engine = None self._ego = None # Static Information self._num_agents = 0 self._dimension = None self._agent_to_index = None # Dynamic Buffers. Please refer to documentation in reset() to # understand them if needed. self._visible = None # 1 = visible, 0 = invisible self._history_position = None self._history_heading = None # Handle Frame Skip H = self._history_window_size self._sampled_index = np.arange((H - 1) % self._history_frame_skip, H, self._history_frame_skip) self._spec = TensorSpec( shape=(self._agent_limit, self._sampled_index.shape[0], self._unit_feature_size), dtype=torch.float32) @property def observation_spec(self): return self._spec
[docs] def reset(self, engine: BaseEngine, ego: BaseVehicle): """Initialize by creating the buffers for holding the dynamic agents information. This is internally used by MetaDrive related Observation objects, called once when a new MetaDrive environment (which is required to produce agent related observations) is constructed. """ # NOTE that the buffers track ALL the agents in the MetaDrive # environment. Later when the buffers are actually queried for # observation, only those who is within the field of view of the ego car # are retrieved. self._engine = engine self._ego = ego # We are going to use A to denote the number of agents below. Also H # will be used to denote the hitory window size. self._num_agents = len(self._engine.traffic_manager.vehicles) - 1 self._agent_to_index = {} # Shape is [A, 2]. The variable self._dimension stores the width and # length of each agents, which holds constant throughout the MetaDrive # environment's lifetime. self._dimension = np.zeros((self._num_agents, 2), dtype=np.float32) i = 0 for agent in self._engine.traffic_manager.vehicles: # Ego car is excluded. if agent is self._ego: continue self._dimension[i] = (agent.LENGTH, agent.WIDTH) self._agent_to_index[agent] = i i += 1 # Shape is [A, H]. Stores whether the agent is visible (Ture for # visible) for each agent, at each historical step (including the # current step). self._visible = np.zeros((self._num_agents, self._history_window_size), dtype=bool) # Shape is [A, H, 2]. Stores the WORLD FRAME positon of each agent, at # each historical step (including the current step). self._history_position = Polyline( point=np.zeros((self._num_agents, self._history_window_size, 2), dtype=np.float32)) # Shape is [A, H]. Stores the WORLD FRAME heading orientation of each # agent, at each historical step (including the current step). self._history_heading = np.zeros( (self._num_agents, self._history_window_size), dtype=np.float32)
[docs] def observe(self) -> Tuple[np.ndarray, int]: """Called upon every observation to produce the feature vectors describing the dynamic agents that are visible to the ego car. The vectors are transformed so that they are in ego car's body frame. Returns: A tuple of 2: 1. A 3D feature tensor of shape [B, H, F]. See class docstring for the meaning of B, H and F. 2. An integer indicating how many agents are actually filled. """ # Shift the buffer so that slot -1 is available for insertion. self._history_position.point[:, :-1] = self._history_position.point[:, 1:] self._history_heading[:, :-1] = self._history_heading[:, 1:] self._visible[:, :-1] = self._visible[:, 1:] alive = np.zeros_like(self._visible[:, -1]) # Insert the new positions and headings for agent in self._engine.traffic_manager.vehicles: if agent is self._ego: continue i = self._agent_to_index[agent] alive[i] = True self._history_heading[i, -1] = agent.heading_theta self._history_position.point[i, -1] = agent.position i += 1 # Transform the position so that we can test whether it is in the field # of view of the ego car. The test result is stored in self._visible. transformed_position = self._history_position.transformed( self._ego.position, self._ego.heading_theta) transformed_heading = self._history_heading - self._ego.heading_theta self._visible[:, -1] = self._fov.within( transformed_position.point[:, -1]) self._visible[~alive, -1] = False sampled_visible = self._visible[:, self._sampled_index] sampled_position = transformed_position.point[:, self._sampled_index] sampled_heading = transformed_heading[:, self._sampled_index] # Shape is [B,]. Denote whether a car is picked to show in the final # feature tensor or not. The criterion is that the car has been visible # in at least 1 step within the latest H steps. picked = np.any(sampled_visible, axis=-1) picked_position = sampled_position[picked] picked_heading = sampled_heading[picked] picked_dimension = self._dimension[picked] picked_visible = sampled_visible[picked] # Filter out the farthest agents in case the total number of visible # agents exceeds the limit. if np.count_nonzero(picked) > self._agent_limit: distances = np.linalg.norm(picked_position[:, -1], axis=-1) closest = np.argpartition(distances, self._agent_limit)[:self._agent_limit] picked_position = picked_position[closest] picked_heading = picked_heading[closest] picked_dimension = picked_dimension[closest] picked_visible = picked_visible[closest] size = picked_dimension.shape[0] # [B, H, 2] center = picked_position # [B, H, 1] r = np.linalg.norm(center, axis=-1, keepdims=True) + 1e-5 # [B, H] cos = np.cos(picked_heading) sin = np.sin(picked_heading) feature = np.zeros(self._spec.shape, dtype=np.float32) feature_view = feature[:size] # [B, H, 7] feature_view[:, :, 0] = r.squeeze(axis=-1) feature_view[:, :, 1:3] = center / r feature_view[:, :, 3:5] = np.expand_dims(picked_dimension, axis=1) feature_view[:, :, 5] = cos feature_view[:, :, 6] = sin feature_view[~picked_visible] = 0.0 return feature, size