Source code for sumocr.interface.util

import os
import xml.etree.ElementTree as et

import warnings

import libsumo as traci
from sumocr.sumo_config import plot_params, ID_DICT
from typing import List, Tuple, Set
from sumocr.sumo_config.default import SUMO_PEDESTRIAN_PREFIX, SUMO_VEHICLE_PREFIX
from commonroad.scenario.obstacle import ObstacleType, SignalState
import enum

__author__ = "Moritz Klischat"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "2022.1"
__maintainer__ = "Moritz Klischat"
__email__ = "commonroad@lists.lrz.de"
__status__ = "Released"

# CommonRoad obstacle type to sumo type
VEHICLE_TYPE_CR2SUMO = {
    ObstacleType.UNKNOWN: "passenger",
    ObstacleType.CAR: "passenger",
    ObstacleType.TRUCK: "truck",
    ObstacleType.BUS: "bus",
    ObstacleType.BICYCLE: "bicycle",
    ObstacleType.PEDESTRIAN: "pedestrian",
    ObstacleType.PRIORITY_VEHICLE: "vip",
    ObstacleType.PARKED_VEHICLE: "passenger",
    ObstacleType.CONSTRUCTION_ZONE: "passenger",
    ObstacleType.TRAIN: "rail",
    ObstacleType.ROAD_BOUNDARY: "custom2",
    ObstacleType.MOTORCYCLE: "motorcycle",
    ObstacleType.TAXI: "taxi",
    ObstacleType.BUILDING: "custom2",
    ObstacleType.PILLAR: "custom2",
    ObstacleType.MEDIAN_STRIP: "custom1"
}
# CommonRoad obstacle type to sumo type
VEHICLE_TYPE_SUMO2CR = {
    "DEFAULT_PEDTYPE": ObstacleType.PEDESTRIAN,
    "passenger": ObstacleType.CAR,
    "truck": ObstacleType.TRUCK,
    "bus": ObstacleType.BUS,
    "bicycle": ObstacleType.BICYCLE,
    "pedestrian": ObstacleType.PEDESTRIAN,
    "vip": ObstacleType.PRIORITY_VEHICLE,
    "rail": ObstacleType.TRAIN,
    "motorcycle": ObstacleType.MOTORCYCLE,
    "taxi": ObstacleType.TAXI,
    "custom2": ObstacleType.PILLAR,
    "custom1": ObstacleType.MEDIAN_STRIP
}


[docs]def get_route_files(config_file) -> List[str]: """ Returns net-file and route-files specified in the config file. :param config_file: SUMO config file (.sumocfg) """ if not os.path.isfile(config_file): raise FileNotFoundError(config_file) tree = et.parse(config_file) file_directory = os.path.dirname(config_file) # find route-files all_route_files = tree.findall('*/route-files') route_files = [] if len(all_route_files) < 1: raise RouteError() for item in all_route_files: attributes = item.attrib['value'].split(',') for route in attributes: route_files.append(os.path.join(file_directory, route)) return route_files
[docs]def initialize_id_dicts(id_convention: dict) -> Tuple[dict, dict]: """ Creates empty nested dict structure for sumo2cr and cr2sumo dicts from id_convention and returns them. :param id_convention: dict with mapping from object type to id start number """ sumo2cr = {} cr2sumo = {} for k in id_convention: sumo2cr[k] = {} cr2sumo[k] = {} sumo2cr[SUMO_PEDESTRIAN_PREFIX] = {} sumo2cr[SUMO_VEHICLE_PREFIX] = {} cr2sumo[SUMO_PEDESTRIAN_PREFIX] = {} cr2sumo[SUMO_VEHICLE_PREFIX] = {} return sumo2cr, cr2sumo
[docs]def generate_cr_id(type: str, sumo_id: str, sumo_prefix: str, ids_sumo2cr: dict, max_cr_id: int) -> int: """ Generates a new commonroad ID without adding it to any ID dictionary. :param type: one of the keys in params.id_convention; the type defines the first digit of the cr_id :param sumo_id: id in sumo simulation :param ids_sumo2cr: dictionary of ids in sumo2cr """ if type not in ID_DICT: raise ValueError( '{0} is not a valid type of id_convention. Only allowed: {1}'. format(type, ID_DICT.keys())) if sumo_id in ids_sumo2cr[type]: warnings.warn( 'For this sumo_id there is already a commonroad id. No cr ID is generated' ) return ids_sumo2cr[type][sumo_id] elif sumo_id in ids_sumo2cr[sumo_prefix]: raise ValueError( 'Two sumo objects of different types seem to have same sumo ID {0}. ID must be unique' .format(sumo_id)) max_type = max(list(ids_sumo2cr[type].values())) if len(ids_sumo2cr[type]) > 0 else int(str(ID_DICT[type]) + "0") cr_id = max(max_cr_id, max_type) + 1 if int(str(cr_id)[0]) != ID_DICT[type]: cr_id = int(str(ID_DICT[type]) + "0" + str(cr_id)[1:]) assert cr_id not in list(ids_sumo2cr[type].values()) return cr_id
[docs]def cr2sumo(cr_id: int, ids_cr2sumo: dict) -> int: """ Takes CommonRoad ID and returns corresponding SUMO ID. :param ids_cr2sumo: dictionary of ids in cr2sumo """ if type(cr_id) == list: print("id: " + str(cr_id) + ": " + str(ids_cr2sumo[SUMO_PEDESTRIAN_PREFIX] + ' ' + str(ids_cr2sumo[SUMO_VEHICLE_PREFIX]))) print("\n") if cr_id is None: return None elif cr_id in ids_cr2sumo[SUMO_VEHICLE_PREFIX]: return ids_cr2sumo[SUMO_VEHICLE_PREFIX][cr_id] elif cr_id in ids_cr2sumo[SUMO_PEDESTRIAN_PREFIX]: return ids_cr2sumo[SUMO_PEDESTRIAN_PREFIX][cr_id] return None
# raise ValueError('Commonroad id {0} does not exist.'.format(cr_id))
[docs]def sumo2cr(sumo_id: str, ids_sumo2cr: dict) -> int: """ Returns corresponding CommonRoad ID according to sumo id. :param sumo_id: sumo id :param ids_sumo2cr: dictionary of ids in sumo2cr. """ if sumo_id is None: return None elif sumo_id in ids_sumo2cr[SUMO_VEHICLE_PREFIX]: return ids_sumo2cr[SUMO_VEHICLE_PREFIX][sumo_id] elif sumo_id in ids_sumo2cr[SUMO_PEDESTRIAN_PREFIX]: return ids_sumo2cr[SUMO_PEDESTRIAN_PREFIX][sumo_id] elif sumo_id == "": warnings.warn('Tried to convert id <empty string>. \ Check if your net file is complete (e. g. having internal-links,...)' ) return None
# raise ValueError('Sumo id \'%s\' does not exist.' % sumo_id) traci_subscription_values = (traci.constants.VAR_POSITION, traci.constants.VAR_SPEED, traci.constants.VAR_SPEED_LAT, traci.constants.VAR_ACCELERATION, traci.constants.VAR_ANGLE, traci.constants.VAR_SIGNALS) class SumoSignalIndices(enum.IntEnum): """All interpretations with their respective bit indices ref.: https://sumo.dlr.de/docs/TraCI/Vehicle_Signalling.html""" VEH_SIGNAL_BLINKER_RIGHT = 0 VEH_SIGNAL_BLINKER_LEFT = 1 VEH_SIGNAL_BLINKER_EMERGENCY = 2 VEH_SIGNAL_BRAKELIGHT = 3 VEH_SIGNAL_FRONTLIGHT = 4 VEH_SIGNAL_FOGLIGHT = 5 VEH_SIGNAL_HIGHBEAM = 6 VEH_SIGNAL_BACKDRIVE = 7 VEH_SIGNAL_WIPER = 8 VEH_SIGNAL_DOOR_OPEN_LEFT = 9 VEH_SIGNAL_DOOR_OPEN_RIGHT = 10 VEH_SIGNAL_EMERGENCY_BLUE = 11 VEH_SIGNAL_EMERGENCY_RED = 12 VEH_SIGNAL_EMERGENCY_YELLOW = 13 max_signal_index: int = max([s.value for s in SumoSignalIndices]) _defined_signals = { # only the following signals are computed on every time step SumoSignalIndices.VEH_SIGNAL_BLINKER_LEFT: "indicator_left", SumoSignalIndices.VEH_SIGNAL_BLINKER_RIGHT: "indicator_right", SumoSignalIndices.VEH_SIGNAL_BRAKELIGHT: "braking_lights", SumoSignalIndices.VEH_SIGNAL_EMERGENCY_BLUE: "flashing_blue_lights" } def get_signal_state(state: int, time_step: int) -> SignalState: """ Computes the CR Signal state from the sumo signals """ binary = list(reversed(bin(state)[2:])) bit_string: List[bool] = [binary[i] == "1" if i < len(binary) else False for i in range(max_signal_index + 1)] args = {cr_name: bit_string[sumo_name.value] for sumo_name, cr_name in _defined_signals.items()} return SignalState(**{**args, **{"time_step": time_step}}) class NetError(Exception): """ Exception raised if there is no net-file or multiple net-files. """ def __init__(self, len): self.len = len def __str__(self): if self.len == 0: return repr('There is no net-file.') else: return repr('There are more than one net-files.') class RouteError(Exception): """ Exception raised if there is no route-file. """ def __str__(self): return repr('There is no route-file.') class EgoCollisionError(Exception): """ Exception raised if the ego vehicle collides with another vehicle """ def __init__(self, time_step=None): super().__init__() self.time_step = time_step def __str__(self): if self.time_step is not None: return repr(f'Ego vehicle collides at current simulation step = {self.time_step}!') else: return repr(f'Ego vehicle collides at current simulation step!')