Source code for sumocr.sumo_docker.interface.docker_interface

from sumocr import DOCKER_REGISTRY
from sumocr.sumo_docker.rpc.sumo_client import SumoRPCClient
from sumocr.sumo_docker.utils.docker_runner import DockerRunner

__author__ = "Peter Kocsis"
__copyright__ = "TUM Cyber-Physical Systems Group"
__credits__ = ["ZIM Projekt ZF4086007BZ8"]
__version__ = "0.7.5"
__maintainer__ = "Peter Kocsis"
__email__ = "commonroad@lists.lrz.de"
__status__ = "Testing"


[docs]class SumoInterface: """Class for setting up a simulation with a dockerized Sumo installation.""" def __init__(self, use_docker: bool = True): """Init empty object""" self.use_docker = use_docker self.sumo_client: SumoRPCClient or None = None if self.use_docker: self._sumo_container = DockerRunner() else: self._sumo_container = None
[docs] def start_simulator(self) -> SumoRPCClient: """ Starting SUMO in docker container and return a client for connecting with it. :return: SumoRPCClient which can communicate with SUMO """ if self.use_docker: self._sumo_container.prepare_docker(DOCKER_REGISTRY, "ver{}".format(SumoRPCClient.__version__), server_log_level='info', docker_registry_address='gitlab.lrz.de:5005') self._sumo_container.start() container_address_dict = self._sumo_container.get_host_address() container_address_value = container_address_dict['50051/tcp'][0] container_address = f"localhost:{container_address_value['HostPort']}" else: container_address = "localhost:50051" self.sumo_client = SumoRPCClient(container_address) self.sumo_client.ping() return self.sumo_client
[docs] def stop_simulator(self) -> None: """ Stop the sumo simulation and stop the docker container. :return: None """ self.sumo_client.stop_server() if self.sumo_client is not None: self.sumo_client = None if self._sumo_container is not None: self._sumo_container.stop()
[docs] def check_docker_health(self) -> bool: """ Checks the health of the docker container :return: True if the container is up and running """ status = self._sumo_container.get_container_status() return status == DockerRunner.DockerStatus.RUNNING