Source code for sumocr.interface.sumo_simulation

import copy
import logging
import math
import random
import sys
import time
from collections import defaultdict
from functools import lru_cache
from typing import Dict, Union
from xml.etree import cElementTree as ET
import numpy as np

from commonroad.geometry.shape import Rectangle
from commonroad.planning.planning_problem import PlanningProblemSet
from commonroad.common.util import Interval
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.scenario.lanelet import LaneletNetwork
from commonroad.scenario.obstacle import DynamicObstacle
from commonroad.scenario.scenario import Scenario
from commonroad.scenario.trajectory import State, Trajectory

import sumocr
from sumocr.interface.ego_vehicle import EgoVehicle
from sumocr.interface.util import *
from sumocr.maps.scenario_wrapper import AbstractScenarioWrapper
from sumocr.maps.sumo_scenario import ScenarioWrapper
from sumocr.sumo_config import DefaultConfig, EGO_ID_START, ID_DICT
from sumocr.sumo_config.pathConfig import SUMO_BINARY, SUMO_GUI_BINARY

import libsumo as traci

__author__ = "Moritz Klischat, Maximilian Frühauf"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "2022.1"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad@lists.lrz.de"
__status__ = "Released"


[docs]class SumoSimulation: """ Class for interfacing between the SUMO simulation and CommonRoad. """ def __init__(self): """Init empty object""" self.dt = None self.dt_sumo = None self.delta_steps = None self.planning_problem_set: PlanningProblemSet = None # {time_step: {obstacle_id: state}} self.obstacle_states: Dict[int, Dict[int, State]] = defaultdict(lambda: dict()) self.simulationdomain = traci.simulation self.vehicledomain = traci.vehicle self.persondomain = traci.person self.routedomain = traci.route self._current_time_step = 0 self.ids_sumo2cr, self.ids_cr2sumo = initialize_id_dicts(ID_DICT) self._max_lanelet_network_id = 0 # keep track of all IDs in CR lanelet_network # veh_id -> List[SignalState] self.signal_states: Dict[int, List[SignalState]] = defaultdict(list) self.obstacle_shapes: Dict[int, Rectangle] = dict() self.obstacle_types: Dict[int, ObstacleType] = dict() self.cached_position = {} # caches position for orientation computation self._scenarios: ScenarioWrapper = None self.ego_vehicles: Dict[int, EgoVehicle] = dict() self.conf = DefaultConfig() self._silent = False # enables dummy synchronization of ego vehicles without planner for testing self.dummy_ego_simulation = False # ego sync parameters self._lc_duration_max = 10 self._lc_counter = 0 # Count how many steps are counter for the lane change self._lc_inaction = 0 # Flag indicates that SUMO is performing a lane change for the ego self.lateral_position_buffer = dict() # stores lateral position [ego_vehicle_id,float] self.logger: logging.Logger = None self._traci_label = None self.initialized = False @property def scenarios(self) -> ScenarioWrapper: return self._scenarios @scenarios.setter def scenarios(self, scenarios: ScenarioWrapper): def max_lanelet_network_id(lanelet_network: LaneletNetwork) -> int: max_lanelet = np.max([l.lanelet_id for l in lanelet_network.lanelets]) \ if lanelet_network.lanelets else 0 max_intersection = np.max([i.intersection_id for i in lanelet_network.intersections]) \ if lanelet_network.intersections else 0 max_traffic_light = np.max([t.traffic_light_id for t in lanelet_network.traffic_lights]) \ if lanelet_network.traffic_lights else 0 max_traffic_sign = np.max([t.traffic_sign_id for t in lanelet_network.traffic_signs]) \ if lanelet_network.traffic_signs else 0 val = np.max([max_lanelet, max_intersection, max_traffic_light, max_traffic_sign]) if isinstance(val, np.generic): return val.item() else: return val if self.planning_problem_set is not None: max_pp= max(list(self.planning_problem_set.planning_problem_dict.keys())) else: max_pp = 0 self._max_lanelet_network_id = max(max_pp, max_lanelet_network_id(scenarios.lanelet_network)) self._scenarios = scenarios
[docs] def initialize(self, conf: DefaultConfig, scenario_wrapper: ScenarioWrapper, planning_problem_set: PlanningProblemSet = None) -> None: """ Reads scenario files, starts traci simulation, initializes vehicles, conducts pre-simulation. :param conf: configuration object. If None, use default configuration. :param scenario_wrapper: handles all files required for simulation. If None it is initialized with files folder conf.scenarios_path + conf.scenario_name :param planning_problem_set: initialize initial state of ego vehicles (if None, use planning_problem_set from self.scenario) """ if conf is not None: self.conf = conf self.logger = self._init_logging() assert isinstance(scenario_wrapper, AbstractScenarioWrapper), \ f'scenario_wrapper expected type ScenarioWrapper or None, but got type {type(scenario_wrapper)}' self.scenarios = scenario_wrapper self.dt = self.conf.dt self.dt_sumo = self.conf.dt / self.conf.delta_steps self.delta_steps = self.conf.delta_steps self.planning_problem_set = planning_problem_set \ if planning_problem_set is not None else self.scenarios.planning_problem_set assert sumocr.sumo_installed, "SUMO not installed or environment variable SUMO_HOME not set." if self.conf.with_sumo_gui: cmd = [ SUMO_GUI_BINARY, "--start", "-c", self.scenarios.sumo_cfg_file, "--step-length", str(self.dt_sumo), "--lateral-resolution", str(self.conf.lateral_resolution) ] else: cmd = [ SUMO_BINARY, "-c", self.scenarios.sumo_cfg_file, "--step-length", str(self.dt_sumo), "--lateral-resolution", str(self.conf.lateral_resolution) ] if self.conf.lateral_resolution > 0.0: cmd.extend(['--lanechange.duration', '0']) if self.conf.random_seed: np.random.seed(self.conf.random_seed) random.seed(self.conf.random_seed) cmd.extend(['--seed', str(self.conf.random_seed)]) traci.start(cmd) # simulate until ego_time_start if self.planning_problem_set is not None and len(self._find_ego_vehicles_in_rou_file()) == 0: self.__presimulation_silent(max(0, self.conf.presimulation_steps-1), fetch_obstacles=False) if len(self.ego_vehicles) > 0: self.logger.warning('<SumoSimulation/init_ego_vehicles> Ego vehicles are already defined through .rou' 'file and planning problem!') self.initialized = True self.init_ego_vehicles_from_planning_problem(self.planning_problem_set) self.__presimulation_silent(1, fetch_obstacles=True) else: self.__presimulation_silent(self.conf.presimulation_steps, fetch_obstacles=True) self.initialized = True
def _init_logging(self): # Create a custom logger logger = logging.getLogger(self.__class__.__name__) logger.setLevel(level=getattr(logging, self.conf.logging_level)) if not logger.hasHandlers(): # Create handlers c_handler = logging.StreamHandler() # Create formatters and add it to handlers c_format = logging.Formatter('<%(name)s.%(funcName)s> %(message)s') c_handler.setFormatter(c_format) # Add handlers to the logger logger.addHandler(c_handler) return logger def _find_ego_vehicles_in_rou_file(self, n_max=1) -> List[str]: rou_file = self.scenarios.get_rou_file() if rou_file is None: return [] with open(rou_file, 'r') as f: tree = ET.parse(f) ego_vehicle_ids = [] vehicles = tree.findall("vehicle") for v in vehicles: if v.get("id").startswith(EGO_ID_START): ego_vehicle_ids.append(v.get("id")) if len(ego_vehicle_ids) >= n_max: break return ego_vehicle_ids
[docs] def init_ego_vehicles_from_planning_problem( self, planning_problem_set: PlanningProblemSet) -> None: """ Initializes the ego vehicles according to planning problem set. :param planning_problem_set: The planning problem set which defines the ego vehicles. """ assert self.initialized, 'Initialize the SumoSimulation first!' self.dummy_ego_simulation = False # retrieve arbitrary route id for initialization (will not be used by interface) generic_route_id = traci.route.getIDList()[0] width = self.conf.ego_veh_width length = self.conf.ego_veh_length # create ego vehicles if planning problem is given if planning_problem_set is not None: list_ids_ego_used = [] for planning_problem in planning_problem_set.planning_problem_dict.values(): new_id = self._create_sumo_id(list_ids_ego_used) list_ids_ego_used.append(new_id) sumo_id = EGO_ID_START + str(new_id) cr_id = self._create_cr_id(type='egoVehicle', sumo_id=sumo_id, sumo_prefix=EGO_ID_START) self.vehicledomain.add(sumo_id, generic_route_id, typeID="DEFAULT_VEHTYPE", depart="now", departLane='first', departPos="base", departSpeed=planning_problem.initial_state.velocity, arrivalLane="current", arrivalPos="max", arrivalSpeed="current", fromTaz="", toTaz="", line="", personCapacity=0, personNumber=0) sumo_angle = 90.0 - math.degrees(planning_problem.initial_state.orientation) position_tmp = planning_problem.initial_state.position\ + 0.5 * length * np.array([math.cos(planning_problem.initial_state.orientation), math.sin(planning_problem.initial_state.orientation)]) self.vehicledomain.setLength(sumo_id, length=length) self.vehicledomain.setWidth(sumo_id, width=width) self.vehicledomain.moveToXY(sumo_id, edgeID='dummy', laneIndex=-1, x=position_tmp[0], y=position_tmp[1], angle=sumo_angle, keepRoute=2) self.ids_sumo2cr[EGO_ID_START][sumo_id] = cr_id self.ids_sumo2cr['vehicle'][sumo_id] = cr_id self.ids_cr2sumo[EGO_ID_START][cr_id] = sumo_id self.ids_cr2sumo['vehicle'][cr_id] = sumo_id self._add_ego_vehicle(EgoVehicle(cr_id, planning_problem.initial_state, self.conf.delta_steps, width, length, planning_problem))
def _add_ego_vehicle(self, ego_vehicle: EgoVehicle): """ Adds a ego vehicle to the current ego vehicle set. :param ego_vehicle: the ego vehicle to be added. """ if ego_vehicle.id in self._ego_vehicles: self.logger.debug( f'Ego vehicle with id {ego_vehicle.id} already exists!', exc_info=True) else: self._ego_vehicles[ego_vehicle.id] = ego_vehicle @property def ego_vehicles(self) -> Dict[int, EgoVehicle]: """ Returns the ego vehicles of the current simulation. """ return self._ego_vehicles @ego_vehicles.setter def ego_vehicles(self, ego_vehicles: Dict[int, EgoVehicle]): """ Sets the ego vehicles of the current simulation. :param ego_vehicles: ego vehicles used to set up the current simulation. """ self._ego_vehicles = ego_vehicles @property def current_time_step(self) -> int: """ :return: current time step of interface """ return self._current_time_step @current_time_step.setter def current_time_step(self, current_time_step): """ Time step should not be set manually. """ raise (ValueError('Time step should not be set manually'))
[docs] def commonroad_scenario_at_time_step(self, time_step: int, add_ego=False, start_0=True) -> Scenario: """ Creates and returns a commonroad scenario at the given time_step. Initial time_step=0 for all obstacles. :param time_step: the scenario will be created according this time step. :param add_ego: whether to add ego vehicles to the scenario. :param start_0: if set to true, initial time step of vehicles is 0, otherwise, the current time step """ self.cr_scenario = self.scenarios.create_minimal_scenario() # remove old obstacles from lanes # this is only necessary if obstacles are added to lanelets if self.conf.add_lanelets_to_dyn_obstacles: for lanelet in self.cr_scenario.lanelet_network.lanelets: lanelet.dynamic_obstacles_on_lanelet = {} self.cr_scenario.add_objects(self._get_cr_obstacles_at_time(time_step, add_ego=add_ego, start_0=start_0)) return self.cr_scenario
[docs] def commonroad_scenarios_all_time_steps(self) -> Scenario: """ Creates and returns a commonroad scenario with all the dynamic obstacles. :param lanelet_network: :return: list of cr scenarios, list of cr scenarios with ego, list of planning problem sets) """ self.cr_scenario = self.scenarios.create_full_meta_scenario() # remove old obstacles from lanes # this is only necessary if obstacles are added to lanelets if self.conf.add_lanelets_to_dyn_obstacles: for lanelet in self.cr_scenario.lanelet_network.lanelets: lanelet.dynamic_obstacles_on_lanelet = {} self.cr_scenario.add_objects(self._get_cr_obstacles_all()) return self.cr_scenario
def _init_vehicle_ids(self) -> Tuple[list, list]: """ Initialize vehicle IDs and set individual vehicle parameters. :return lists of new vehicle and pedestrian IDs that entered the simulation """ new_ids = [] vehicle_ids = self.vehicledomain.getIDList() for veh_id in vehicle_ids: if veh_id not in self.ids_sumo2cr[SUMO_VEHICLE_PREFIX]: if veh_id.startswith(EGO_ID_START): obs_type = EGO_ID_START else: obs_type = 'obstacleVehicle' self._create_cr_id(obs_type, veh_id, SUMO_VEHICLE_PREFIX) vehicle_class = VEHICLE_TYPE_SUMO2CR[self.vehicledomain.getVehicleClass(str(veh_id))] self._set_veh_params(self.vehicledomain, veh_id, vehicle_class) new_ids.append(veh_id) person_ids = self.persondomain.getIDList() obstacle_type = "obstaclePedestrian" new_ped_ids = [] for ped_id in person_ids: if ped_id not in self.ids_sumo2cr[SUMO_PEDESTRIAN_PREFIX]: # initialize new pedestrian self._create_cr_id(obstacle_type, ped_id, SUMO_PEDESTRIAN_PREFIX) vehicle_class = VEHICLE_TYPE_SUMO2CR[self.persondomain.getTypeID(ped_id).split("@")[0]] self._set_veh_params(self.persondomain, ped_id, vehicle_class) new_ped_ids.append(ped_id) return new_ids, new_ped_ids
[docs] def simulate_step(self) -> None: """ Executes next simulation step (consisting of delta_steps sub-steps with dt_sumo=dt/delta_steps) in SUMO """ # simulate sumo scenario for delta_steps time steps for i in range(self.delta_steps): # send ego vehicles to SUMO if not self.dummy_ego_simulation and len(self.ego_vehicles) > 0: self._send_ego_vehicles(self.ego_vehicles, i) # execute SUMO simulation step traci.simulationStep() for ego_veh in list(self.ego_vehicles.values()): ego_veh._current_time_step += 1 # get updated obstacles from sumo self._current_time_step += 1 new_vehicle_ids, new_ped_ids = self._init_vehicle_ids() self._fetch_sumo_vehicles(self.current_time_step, new_vehicle_ids) self._fetch_sumo_pedestrians(self.current_time_step, new_ped_ids)
def __presimulation_silent(self, pre_simulation_steps: int, fetch_obstacles=True): """ Simulate SUMO without synchronization of interface. Used before starting interface simulation. :param pre_simulation_steps: the steps of simulation which are executed before checking the existence of ego vehicles and configured simulation step. :param fetch_obstacles: fetch obstacles after simulating """ assert self.current_time_step == 0 assert pre_simulation_steps >= 0, f'ego_time_start={self.conf.presimulation_steps} must be >0' if pre_simulation_steps == 0: return self._silent = True for i in range(pre_simulation_steps * self.delta_steps): traci.simulationStep() self._init_vehicle_ids() self._silent = False if fetch_obstacles: self._fetch_sumo_vehicles(self.current_time_step, self.vehicledomain.getIDList()) self._fetch_sumo_pedestrians(self.current_time_step, self.persondomain.getIDList()) def _subscribe_new(self, new_ids: List[str], domain=traci.vehicle): """ Subscribe to values of new vehicle/pedestrian :param new_ids: list of new SUMO IDs :param domain: domain from which should be subscribed :return: """ vehicle_ids = self.sort_ego_first(new_ids) for veh_id in vehicle_ids: domain.subscribe(veh_id, traci_subscription_values) def _get_subscription_results(self, domain=traci.vehicle) -> Dict[str, Dict[int, any]]: """ Return subscription results from current time step :param domain: :return: dict with results with structure {sumo_id: {value_id: value}} """ return domain.getAllSubscriptionResults() def _fetch_sumo_vehicles(self, time_step: int, new_ids: List[str]): """ Gets and stores all vehicle states from SUMO. Initializes ego vehicles when they enter simulation. """ self._subscribe_new(new_ids) vehicle_states = self._get_subscription_results() if len(vehicle_states) == 0: return if not self.dummy_ego_simulation: self.check_ego_collisions(raise_error=True) for veh_id, vehicle_state in vehicle_states.items(): state = self._extract_current_state(self.vehicledomain, veh_id, vehicle_state, SUMO_VEHICLE_PREFIX) if state is None: continue # initializes new vehicle if veh_id in new_ids: if veh_id.startswith(EGO_ID_START) and not self._silent: # new ego vehicle cr_id = self.ids_sumo2cr[EGO_ID_START][veh_id] if self.dummy_ego_simulation: state.time_step = time_step - 1 else: state.time_step = time_step if self.planning_problem_set is not None: planning_problem = list(self.planning_problem_set.planning_problem_dict.values())[0] elif self.scenarios.planning_problem_set is not None: planning_problem = list(self.scenarios.planning_problem_set.planning_problem_dict.values())[0] else: planning_problem = None self._add_ego_vehicle(EgoVehicle(cr_id, state, self.conf.delta_steps, self.conf.ego_veh_width, self.conf.ego_veh_length, planning_problem=planning_problem)) elif not self._silent: # new obstacle vehicle cr_id = self.ids_sumo2cr['obstacleVehicle'][veh_id] vehicle_class = VEHICLE_TYPE_SUMO2CR[self.vehicledomain.getVehicleClass(str(veh_id))] self.obstacle_types[cr_id] = vehicle_class self.obstacle_shapes[cr_id] = self._get_veh_shape(self.vehicledomain, veh_id) self.obstacle_states[time_step][self.ids_sumo2cr['obstacleVehicle'][veh_id]] = state elif veh_id in self.ids_sumo2cr['obstacleVehicle']: # get obstacle vehicle state self.obstacle_states[time_step][self.ids_sumo2cr['obstacleVehicle'][veh_id]] = state elif not self._silent and veh_id not in self.ids_sumo2cr['egoVehicle']: raise NotImplemented() # read signal state if not self._silent: signal_states = get_signal_state(vehicle_state[traci.constants.VAR_SIGNALS], self.current_time_step) key = self.ids_sumo2cr['obstacleVehicle'][veh_id] \ if veh_id in self.ids_sumo2cr['obstacleVehicle'] else self.ids_sumo2cr[EGO_ID_START][veh_id] self.signal_states[key].append(signal_states) """For testing with dummy_ego_simulation""" if not self._silent and self.dummy_ego_simulation and veh_id in self.ids_sumo2cr['egoVehicle']: ego_veh = self.ego_vehicles[self.ids_sumo2cr['egoVehicle'][veh_id]] ori = state.orientation state_list = [] for t in range(0, self.conf.delta_steps): state_tmp = copy.deepcopy(state) state_tmp.position = state.position + (t + 1) * state.velocity \ * self.dt * np.array([math.cos(ori), math.sin(ori)]) state_tmp.time_step = t + 1 state_list.append(state_tmp) ego_veh.set_planned_trajectory(state_list) def _fetch_sumo_pedestrians(self, time_step: int, new_ids: List[str]): """ Gets and stores all vehicle states from SUMO. Initializes ego vehicles when they enter simulation. """ self._subscribe_new(new_ids, domain=traci.person) person_states = self._get_subscription_results(domain=traci.person) if not person_states: return obstacle_type = "obstaclePedestrian" for ped_id, ped_state in person_states.items(): state = self._extract_current_state(self.persondomain, ped_id, ped_state, SUMO_PEDESTRIAN_PREFIX) if state is None: continue if ped_id in new_ids: # initialize new pedestrian cr_id = self._create_cr_id(obstacle_type, ped_id, SUMO_PEDESTRIAN_PREFIX) vehicle_class = VEHICLE_TYPE_SUMO2CR[self.persondomain.getTypeID(ped_id).split("@")[0]] shape = self._set_veh_params(self.persondomain, ped_id, vehicle_class) self.obstacle_types[cr_id] = vehicle_class self.obstacle_shapes[cr_id] = shape self.obstacle_states[time_step][self.ids_sumo2cr[obstacle_type][ped_id]] = state elif ped_id in self.ids_sumo2cr[obstacle_type]: # get obstacle vehicle state self.obstacle_states[time_step][self.ids_sumo2cr[obstacle_type][ped_id]] = state def _get_veh_shape(self, vehicledomain, sumo_id): return Rectangle(vehicledomain.getLength(sumo_id), vehicledomain.getWidth(sumo_id))
[docs] def check_ego_collisions(self, raise_error=True): """ Checks if the ego vehicle is colliding in the current time step of the simulation. :param raise_error: raise EgoCollisionError, other return False :return: False if collision occurs, True otherwise """ colliding_vehicle_ids = self.simulationdomain.getCollidingVehiclesIDList() for id_ in colliding_vehicle_ids: if EGO_ID_START in id_: if raise_error: raise EgoCollisionError(self.current_time_step) else: return False return True
def _set_veh_params(self, domain, veh_id: int, vehicle_class: ObstacleType) -> Rectangle: """ :param domain: traci domain for which the parameter should be set :type domain: Union[PersonDomain, VehicleDomain] :param veh_id: vehicle ID :param vehicle_class: vehicle class :return: """ def sample(val: Union[Interval, float]) -> float: if isinstance(val, Interval): assert 0 <= val.start <= val.end, f"All values in the interval need to be positive: {val}" return float(np.random.uniform(val.start, val.end)) else: return val try: width = sample(self.conf.veh_params["width"][vehicle_class]) length = sample(self.conf.veh_params["length"][vehicle_class]) min_gap = sample(self.conf.veh_params["minGap"][vehicle_class]) domain.setWidth(veh_id, width) domain.setLength(veh_id, length) domain.setMinGap(veh_id, min_gap) except KeyError as e: self.logger.warning(f"vehicle_class: {vehicle_class} is invalid") raise e try: accel = sample(self.conf.veh_params["accel"][vehicle_class]) decel = sample(self.conf.veh_params["decel"][vehicle_class]) max_speed = sample(self.conf.veh_params["maxSpeed"][vehicle_class]) domain.setAccel(veh_id, accel) domain.setDecel(veh_id, decel) domain.setMaxSpeed(veh_id, max_speed) # in case of a pedestrian domain finally: return Rectangle(length, width) def _extract_current_state(self, domain, veh_id: str, vehicle_state: Dict[int, any], sumo_prefix: str) -> Union[None, State]: """ Gets the current state from sumo. :param domain :type: Union[PersonDomain, VehicleDomain] :param veh_id: the id of the vehicle, whose state will be returned from SUMO. :return: the state of the given vehicle or None, if not in field of view """ unique_id = sumo_prefix + veh_id position = np.array(vehicle_state[traci.constants.VAR_POSITION]) velocity = math.sqrt(vehicle_state[traci.constants.VAR_SPEED]**2 + vehicle_state[traci.constants.VAR_SPEED_LAT]**2) cr_id = sumo2cr(veh_id, self.ids_sumo2cr) if cr_id and cr_id in self.obstacle_shapes: length = self.obstacle_shapes[cr_id].length else: length = domain.getLength(veh_id) if self.conf.compute_orientation \ and self.current_time_step > 1 \ and velocity > 0.5 \ and unique_id in self.cached_position: delta_pos = position - self.cached_position[unique_id] orientation = math.atan2(delta_pos[1], delta_pos[0]) else: orientation = math.radians(-vehicle_state[traci.constants.VAR_ANGLE] + 90.0) self.cached_position[unique_id] = position position -= 0.5 * length * np.array([math.cos(orientation), math.sin(orientation)]) in_fov = self.is_in_field_of_view(position, veh_id) if in_fov is False: return None acceleration = vehicle_state[traci.constants.VAR_ACCELERATION] return State(position=position, orientation=orientation, velocity=velocity, acceleration=acceleration, time_step=self.current_time_step) def _get_cr_obstacles_at_time(self, time_step: int, add_ego: bool = False, start_0: bool = False) -> List[DynamicObstacle]: """ Gets current state of all vehicles in commonroad format from recorded simulation. :param time_step: time step of scenario :param add_ego: if True, add ego vehicles as well :param start_0: if True, initial time step of vehicles is 0, otherwise, the current time step """ vehicle_dict: Dict[int, State] = self.obstacle_states[time_step] obstacles: List[DynamicObstacle] = [] for veh_id, state in vehicle_dict.items(): if start_0: state.time_step = 0 else: state.time_step = time_step center_lanelets = None shape_lanelets = None if self.conf.add_lanelets_to_dyn_obstacles: center_lanelets = {lanelet_id for lanelet_ids in self.scenarios.lanelet_network.find_lanelet_by_position([state.position]) for lanelet_id in lanelet_ids} shape_lanelets = {lanelet_id for lanelet_ids in self.scenarios.lanelet_network.find_lanelet_by_position( [state.position + v for v in self.obstacle_shapes[veh_id].vertices]) for lanelet_id in lanelet_ids} obstacle_type = self.obstacle_types[veh_id] signal_state = next((s for s in self.signal_states[veh_id] if s.time_step == time_step), None) if start_0: signal_state.time_step = 0 else: signal_state.time_step = time_step dynamic_obstacle = DynamicObstacle( obstacle_id=veh_id, obstacle_type=obstacle_type, initial_state=state, obstacle_shape=self.obstacle_shapes[veh_id], initial_center_lanelet_ids=center_lanelets if center_lanelets else None, initial_shape_lanelet_ids=shape_lanelets if shape_lanelets else None, initial_signal_state=signal_state) obstacles.append(dynamic_obstacle) if add_ego: obstacles.extend(self.get_ego_obstacles(time_step)) return obstacles def _get_cr_obstacles_all(self) -> List[DynamicObstacle]: """ For all recorded time steps, get states of all obstacles and convert them into commonroad dynamic obstacles. :return: list of dynamic obstacles """ # transform self.obstacle_states:Dict[time_step:[veh_id:State]] to veh_state:Dict[veh_id:[time_step:State]] veh_state = {} for time_step, veh_dicts in self.obstacle_states.items(): for veh_id, state in veh_dicts.items(): veh_state[veh_id] = {} for time_step, veh_dicts in self.obstacle_states.items(): for veh_id, state in veh_dicts.items(): state.time_step = time_step veh_state[veh_id][time_step] = state # get all vehicles' ids for id conflict check between lanelet_id and veh_id self.veh_ids: List = [*veh_state] # create cr obstacles obstacles = [] for veh_id, time_dict in veh_state.items(): state_list = list(time_dict.values()) # coordinate transformation for all positions from sumo format to commonroad format obstacle_shape = self.obstacle_shapes[veh_id] obstacle_type = self.obstacle_types[veh_id] initial_state = state_list[0] assert self.conf.video_start != self.conf.video_end, \ "Simulation start time and end time are the same. Please set simulation interval." if len(state_list) > 4: obstacle_trajectory = Trajectory(state_list[0].time_step, state_list[0:]) obstacle_prediction = TrajectoryPrediction(obstacle_trajectory, obstacle_shape) center_lanelets = None shape_lanelets = None if self.conf.add_lanelets_to_dyn_obstacles: center_lanelets = {lanelet_id for lanelet_ids in self.cr_scenario.lanelet_network.find_lanelet_by_position( [initial_state.position]) for lanelet_id in lanelet_ids} shape_lanelets = {lanelet_id for lanelet_ids in self.cr_scenario.lanelet_network.find_lanelet_by_position( [initial_state.position + v for v in self.obstacle_shapes[veh_id].vertices]) for lanelet_id in lanelet_ids} signal_states = self.signal_states[veh_id] dynamic_obstacle = DynamicObstacle( obstacle_id=veh_id, obstacle_type=obstacle_type, initial_state=initial_state, obstacle_shape=obstacle_shape, prediction=obstacle_prediction, initial_center_lanelet_ids=center_lanelets if center_lanelets else None, initial_shape_lanelet_ids=shape_lanelets if shape_lanelets else None, initial_signal_state=signal_states[0] if signal_states else None, signal_series=signal_states[1:] if signal_states else None) # add a trajectory element obstacles.append(dynamic_obstacle) else: self.logger.debug( f'Vehicle {veh_id} has been simulated less than 5 time steps. Not converted to cr obstacle.' ) return obstacles @lru_cache() def _get_ids_of_map(self) -> Set[int]: """ Get a list of ids of all the lanelets from the cr map which is converted from a osm map. :return: list of lanelets' ids """ ids = set() for lanelet in self.scenarios.lanelet_network.lanelets: ids.add(lanelet.lanelet_id) for ts in self.scenarios.lanelet_network.traffic_signs: ids.add(ts.traffic_sign_id) for ts in self.scenarios.lanelet_network.traffic_lights: ids.add(ts.traffic_light_id) for ts in self.scenarios.lanelet_network.intersections: ids.add(ts.intersection_id) for inc in ts.incomings: ids.add(inc.incoming_id) return ids
[docs] def get_ego_obstacles(self, time_step: Union[int, None] = None) -> List[DynamicObstacle]: """ Get list of ego vehicles converted to Dynamic obstacles :param time_step: initial time step, if None, get complete driven trajectory :return: """ obstacles = [] for veh_id, ego_veh in self.ego_vehicles.items(): obs = ego_veh.get_dynamic_obstacle(time_step) if obs is not None: obstacles.append(obs) return obstacles
def _send_ego_vehicles(self, ego_vehicles: Dict[int, EgoVehicle], delta_step: int = 0) -> None: """ Sends the information of ego vehicles to SUMO. :param ego_vehicles: list of dictionaries. For each ego_vehicle, write tuple (cr_ego_id, cr_position, cr_lanelet_id, cr_orientation, cr_lanelet_id) cr_lanelet_id can be omitted but this is not recommended, if the lanelet is known for sure. :param delta_step: which time step of the planned trajectory should be sent """ for id_cr, ego_vehicle in ego_vehicles.items(): assert ego_vehicle.current_time_step == self.current_time_step, \ f'Trajectory of ego vehicle has not been updated. Still at time_step {ego_vehicle.current_time_step},' \ f'while simulation step {self.current_time_step + 1} should be simulated.' planned_state = ego_vehicle.get_planned_state(delta_step) self._forward_info2sumo(planned_state, "SYNC_MOVE_XY", 0, id_cr) def _get_ego_ids(self) -> Dict[int, str]: """ Returns a dictionary with all current ego vehicle ids and corresponding sumo ids """ return self.ids_cr2sumo[EGO_ID_START] def _create_sumo_id(self, list_ids_ego_used: List[int]) -> int: """ Generates a new unused id for SUMO :return: """ id_list = traci.vehicle.getIDList() new_id = int(len(id_list)) i = 0 while i < 1000: if str(new_id) not in id_list and new_id not in list_ids_ego_used: return new_id else: new_id += 1 i += 1 def _create_cr_id(self, type: str, sumo_id: str, sumo_prefix: str, cr_id: int = None) -> int: """ Generates a new cr ID and adds it to ID dictionaries :param type: one of the keys in params.id_convention; the type defines the first digit of the cr_id :param sumo_id: id in sumo simulation :param sumo_prefix: str giving what set of sumo ids to use :return: cr_id as int """ cr_id = generate_cr_id(type, sumo_id, sumo_prefix, self.ids_sumo2cr, self._max_lanelet_network_id) self.ids_sumo2cr[type][sumo_id] = cr_id self.ids_sumo2cr[sumo_prefix][sumo_id] = cr_id self.ids_cr2sumo[type][cr_id] = sumo_id self.ids_cr2sumo[sumo_prefix][cr_id] = sumo_id return cr_id @property def _silent(self): """Ego vehicle is not synced in this mode.""" return self.__silent @_silent.setter def _silent(self, silent): assert self.current_time_step == 0 self.__silent = silent
[docs] def stop(self): """ Exits SUMO Simulation""" traci.close() sys.stdout.flush()
# Ego sync functions
[docs] def check_lanelets_future_change( self, current_state: State, planned_traj: List[State]) -> Tuple[str, int]: """ Checks the lanelet changes of the ego vehicle in the future time_window. :param lanelet_network: object of the lanelet network :param time_window: the time of the window to check the lanelet change :param traj_index: index of the planner output corresponding to the current time step :return: lc_status, lc_duration: lc_status is the status of the lanelet change in the next time_window; lc_duration is the unit of time steps (using sumo dt) """ lc_duration_max = min(self.conf.lanelet_check_time_window, len(planned_traj)) lanelet_network = self.scenarios.lanelet_network lc_status = 'NO_LC' lc_duration = 0 # find current lanelets current_position = current_state.position current_lanelets_ids = lanelet_network.find_lanelet_by_position([current_position])[0] current_lanelets = [ lanelet_network.find_lanelet_by_id(id) for id in current_lanelets_ids ] # check for lane change for current_lanelet in current_lanelets: for t in range(lc_duration_max): future_lanelet_ids = lanelet_network.find_lanelet_by_position( [planned_traj[t].position])[0] if current_lanelet.adj_right in future_lanelet_ids: lc_status = 'RIGHT_LC' lc_duration = 2 * t * self.conf.delta_steps break elif current_lanelet.adj_left in future_lanelet_ids: lc_status = 'LEFT_LC' lc_duration = 2 * t * self.conf.delta_steps break else: pass self.logger.debug('current lanelets: ' + str(current_lanelets)) self.logger.debug('lc_status=' + lc_status) self.logger.debug('lc_duration=' + str(lc_duration)) return lc_status, lc_duration
def _check_lc_start(self, ego_id: str, lc_future_status: str) -> str: """ This function checks if a lane change is started according to the change in the lateral position and the lanelet change prediction. Note that checking the change of lateral position only is sensitive to the tiny changes, also at the boundaries of the lanes the lateral position sign is changed because it will be calculated relative to the new lane. So we check the future lanelet change to avoid these issues. :param ego_id: id of the ego vehicle :param lc_future_status: status of the future lanelet changes :return: lc_status: the status whether the ego vehicle starts a lane change or no """ lateral_position = self.vehicledomain.getLateralLanePosition( cr2sumo(ego_id, self.ids_cr2sumo)) if lc_future_status == 'NO_LC' or not id in self.lateral_position_buffer: lc_status = 'NO_LC' elif lc_future_status == 'RIGHT_LC' \ and self.lateral_position_buffer[id] > self.conf.lane_change_tol + lateral_position: lc_status = 'RIGHT_LC_STARTED' elif lc_future_status == 'LEFT_LC' \ and self.lateral_position_buffer[id] < -self.conf.lane_change_tol + lateral_position: lc_status = 'LEFT_LC_STARTED' else: lc_status = 'NO_LC' self.logger.debug('LC current status: ' + lc_status) self.lateral_position_buffer[id] = lateral_position return lc_status def _consistency_protection(self, ego_id: str, current_state: State) -> str: """ Checks the L2 distance between SUMO position and the planner position and returns CONSISTENCY_ERROR if it is above the configured margin. :param ego_id: id of the ego vehicle (string) :param current_state: the current state read from the commonroad motion planner :return: retval: the status whether there is a consistency error between sumo and planner positions or not """ cons_error = 'CONSISTENCY_NO_ERROR' pos_sumo = self.vehicledomain.getPosition(cr2sumo(ego_id, self.ids_cr2sumo)) pos_cr = current_state.position dist_error = np.linalg.norm(pos_cr - pos_sumo) if dist_error > self.conf.protection_margin: cons_error = 'CONSISTENCY_ERROR' self.logger.debug('SUMO X: ' + str(pos_sumo[0]) + ' **** SUMO Y: ' + str(pos_sumo[1])) self.logger.debug('TRAJ X: ' + str(pos_cr[0]) + ' **** TRAJ Y: ' + str(pos_cr[1])) self.logger.debug('Error Value: ' + str(dist_error)) self.logger.debug('Error Status: ' + cons_error) if self._lc_inaction == 0: cons_error = 'CONSISTENCY_NO_ERROR' return cons_error def _check_sync_mechanism(self, lc_status: str, ego_id: int, current_state: State) -> str: """ Defines the sync mechanism type that should be executed according to the ego vehicle motion. :param lc_status: status of the lanelet change in the next time_window :param ego_id: id of the ego vehicle (string) :param current_state: the current state read from the commonroad motion planner :return: retval: the sync mechanism that should be followed while communicating from the interface to sumo """ if self.conf.lane_change_sync == True: # Check the error between SUMO and CR positions cons_error = self._consistency_protection(ego_id, current_state) if cons_error == 'CONSISTENCY_NO_ERROR': # CONSISTENCY_NO_ERROR means error below the configured margin if self._lc_inaction == 0: if lc_status == 'RIGHT_LC_STARTED': self._lc_inaction = 1 retval = 'SYNC_SUMO_R_LC' elif lc_status == 'LEFT_LC_STARTED': self._lc_inaction = 1 retval = 'SYNC_SUMO_L_LC' else: retval = 'SYNC_MOVE_XY' else: # There is a lane change currently in action so do nothing and just increament the counter self._lc_counter += 1 if self._lc_counter >= self._lc_duration_max: self._lc_counter = 0 self._lc_inaction = 0 retval = 'SYNC_DO_NOTHING' else: # There is a consistency error so force the sync mechanism to moveToXY to return back to zero error retval = 'SYNC_MOVE_XY' self._lc_counter = 0 self._lc_inaction = 0 else: retval = 'SYNC_MOVE_XY' self.logger.debug('Sync Mechanism is: ' + retval) self.logger.debug('Lane change performed since ' + str(self._lc_counter)) return retval def _forward_info2sumo(self, planned_state: State, sync_mechanism: str, lc_duration: int, ego_id: int): """ Forwards the information to sumo (either initiate moveToXY or changeLane) according to the sync mechanism. :param planned_state: the planned state from commonroad motion planner :param sync_mechanism: the sync mechanism that should be followed while communicating from the interface to sumo :param lc_duration: lane change duration, expressed in number of time steps :param ego_id: id of the ego vehicle """ id_sumo = cr2sumo(ego_id, self.ids_cr2sumo) if sync_mechanism == 'SYNC_MOVE_XY': len_half = 0.5 * self.ego_vehicles[ego_id].length position_sumo = [0, 0] position_sumo[0] = planned_state.position[0] + len_half * math.cos(planned_state.orientation) position_sumo[1] = planned_state.position[1] + len_half * math.sin(planned_state.orientation) sumo_angle = 90 - math.degrees(planned_state.orientation) self.vehicledomain.moveToXY(id_sumo, edgeID='dummy', laneIndex=-1, x=position_sumo[0], y=position_sumo[1], angle=sumo_angle, keepRoute=2) self.vehicledomain.setSpeedMode(id_sumo, 0) self.vehicledomain.setSpeed(id_sumo, planned_state.velocity) elif sync_mechanism == 'SYNC_SUMO_R_LC': # A lane change (right lane change) is just started, so we will initiate lane change request by traci # self.vehicledomain.setLaneChangeDuration(cr2sumo(default_ego_id, self.ids_cr2sumo), lc_duration) self.vehicledomain.setLaneChangeMode(id_sumo, 512) targetlane = self.vehicledomain.getLaneIndex(id_sumo) - 1 self.vehicledomain.changeLane(id_sumo, targetlane, 0.1) elif sync_mechanism == 'SYNC_SUMO_L_LC': # A lane change (left lane change) is just started, so we will initiate lane change request by traci # self.vehicledomain.setLaneChangeDuration(cr2sumo(default_ego_id, self.ids_cr2sumo), lc_duration) self.vehicledomain.setLaneChangeMode(id_sumo, 512) targetlane = self.vehicledomain.getLaneIndex(id_sumo) + 1 self.vehicledomain.changeLane(id_sumo, targetlane, 0.1) elif sync_mechanism == 'SYNC_DO_NOTHING': pass else: pass
[docs] def is_in_field_of_view(self, position: np.ndarray, veh_id: str): """ Returns True if the position is in the FOV of any ego vehicle. :param position: position of other vehicle :return: """ if len(self.ego_vehicles) == 0 or self.conf.field_of_view is None \ or veh_id in self.ids_sumo2cr[SUMO_VEHICLE_PREFIX] or veh_id in self.ids_sumo2cr[SUMO_PEDESTRIAN_PREFIX]: return True for ego_vehicle in self.ego_vehicles.values(): try: if np.linalg.norm(ego_vehicle.get_state_at_timestep(self.current_time_step).position - position, ord=np.inf) < self.conf.field_of_view: return True except KeyError: if len(ego_vehicle._state_dict.keys()) == 0: raise ValueError return False
@staticmethod def sort_ego_first(vehicle_ids: List[str]): vehicle_ids = list(vehicle_ids) for i in range(len(vehicle_ids)): if vehicle_ids[i].startswith(EGO_ID_START): vehicle_ids.insert(0, vehicle_ids.pop(i)) return vehicle_ids