Source code for sumocr.interface.ego_vehicle

import warnings
from typing import Dict, List, Union
import numpy as np
import copy

from commonroad.geometry.shape import Rectangle
from commonroad.planning.planning_problem import PlanningProblem, GoalRegion
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.scenario.obstacle import DynamicObstacle, ObstacleType
from commonroad.scenario.trajectory import State, Trajectory

__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "2022.1"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad@lists.lrz.de"
__status__ = "Released"


[docs]class EgoVehicle: """ Interface object for ego vehicle. How to use: After each simulation step, get current state with EgoVehicle.current_state() and set planned trajectory with EgoVehicle.set_planned_trajectory(planned_trajectory). """ def __init__(self, id, initial_state: State, delta_steps: int, width: float = 2.0, length: float = 5.0, planning_problem: PlanningProblem = None): self.id = id self._width = width self._length = length self._initial_state = initial_state self._state_dict: Dict[State] = dict() # collects driven states self.shape = Rectangle(self.length, self.width, center=np.array([0, 0]), orientation=0.0) self._driven_trajectory = None # returns trajectory object of driven self._planned_trajectories: Dict[int, List[State]] = {} # collects trajectories from planner for every time step self._current_time_step = initial_state.time_step self.delta_steps = delta_steps self.planning_problem = planning_problem @property def pp_id(self) -> int: """ :returns: associated planning problem id """ return self.planning_problem.planning_problem_id if self.planning_problem is not None else None
[docs] def set_planned_trajectory(self, planned_state_list: List[State]) -> None: """ Sets planned trajectory beginning with current time step. :param planned_state_list: the planned trajectory """ assert len(planned_state_list) >= self.delta_steps, \ 'planned_trajectory must contain at least {} states, but contains {}. (See delta_steps in sumo_config file)' \ .format(self.delta_steps, len(planned_state_list)) assert 1 == planned_state_list[0].time_step, \ 'planned_trajectory must always start at time_step ({}) but starts at time_step {}' \ .format(1, planned_state_list[0].time_step) self._planned_trajectories[self.current_time_step] = planned_state_list self.add_state(planned_state_list[0])
@property def get_planned_trajectory(self) -> List[State]: """Gets planned trajectory according to the current time step""" return self._planned_trajectories[self.current_time_step]
[docs] def get_dynamic_obstacle(self, time_step: Union[int, None] = None) -> DynamicObstacle: """ If time step is None, adds complete driven trajectory and returns the dynamic obstacles. If time step is int: starts from given step and adds planned trajectory and returns the dynamic obstacles. :param time_step: initial time step of vehicle :return: DynamicObstacle object of the ego vehicle. """ if time_step is None: return DynamicObstacle(self.id, obstacle_type=ObstacleType.CAR, obstacle_shape=Rectangle(self.length, self.width, center=np.array([0, 0]), orientation=0.0), initial_state=self.initial_state, prediction=self.driven_trajectory) elif isinstance(time_step, int): if time_step in self._state_dict: if time_step in self._planned_trajectories: prediction = TrajectoryPrediction(Trajectory(self._planned_trajectories[time_step][0].time_step, self._planned_trajectories[time_step]), self.shape) else: prediction = None return DynamicObstacle(self.id, obstacle_type=ObstacleType.CAR, obstacle_shape=Rectangle(self.length, self.width, center=np.array([0, 0]), orientation=0.0), initial_state=self.get_state_at_timestep(time_step), prediction=prediction) else: raise ValueError('time needs to be type None or int')
[docs] def get_planned_state(self, delta_step: int = 0): """ Returns the planned state. :param delta_step: get planned state after delta steps """ assert self.current_time_step in self._planned_trajectories,\ f"No planned trajectory found at time step {self.current_time_step} for ego vehicle {self.id}! " \ f"Use ego_vehicle.set_planned_trajectory() to set the trajectory." planned_state: State = copy.deepcopy(self._planned_trajectories[self.current_time_step][0]) if self.delta_steps > 1: # linear interpolation for state in planned_state.attributes: curr_state = getattr(self.current_state, state) next_state = getattr(planned_state, state) setattr(planned_state, state, curr_state + (delta_step + 1) / self.delta_steps * (next_state - curr_state)) return planned_state
@property def current_state(self) -> State: """ Returns the current state. """ if self.current_time_step == self.initial_state.time_step: return self.initial_state else: return self._state_dict[self.current_time_step]
[docs] def get_state_at_timestep(self, time_step: int) -> State: """ Returns the state according to the given time step. :param time_step: the state is returned according to this time step. """ if time_step == self.initial_state.time_step: return self.initial_state else: state = self._state_dict[time_step] state.time_step = 0 return state
@current_state.setter def current_state(self, current_state): raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()') @property def current_time_step(self) -> int: """ Returns current time step. """ return self._current_time_step @current_time_step.setter def current_time_step(self, current_time_step): raise PermissionError('current_state cannot be set manually, use set_planned_trajectory()') @property def goal(self) -> GoalRegion: """ Returns the goal of the planning problem. """ return self.planning_problem.goal
[docs] def add_state(self, state: State) -> None: """ Adds a state to the current state dictionary. :param state: the state to be added """ self._state_dict[self._current_time_step + 1] = state
@property def driven_trajectory(self) -> TrajectoryPrediction: """ Returns trajectory prediction object for driven trajectory (mainly for plotting) """ state_dict_tmp = {} for t, state in self._state_dict.items(): state_dict_tmp[t] = state state_dict_tmp[t].time_step = t sorted_list = sorted(state_dict_tmp.keys()) state_list = [state_dict_tmp[key] for key in sorted_list] return TrajectoryPrediction(Trajectory(self.initial_state.time_step + 1, state_list), self.shape) @driven_trajectory.setter def driven_trajectory(self, _): if hasattr(self, '_driven_trajectory'): warnings.warn('driven_trajectory of vehicle cannot be changed') return @property def width(self) -> float: """ Returns the width of the ego vehicle. """ return self._width @width.setter def width(self, width): if hasattr(self, '_width'): warnings.warn('width of vehicle cannot be changed') return @property def length(self) -> float: """ Returns the length of the ego vehicle. """ return self._length @length.setter def length(self, length): if hasattr(self, '_length'): warnings.warn('length of vehicle cannot be changed') return @property def initial_state(self) -> State: """ Returns the initial state of the ego vehicle. """ return self._initial_state @initial_state.setter def initial_state(self, _): if hasattr(self, '_initial_state'): warnings.warn('initial_state of vehicle cannot be changed') return