90 lines
2.9 KiB
Python
90 lines
2.9 KiB
Python
import pytest
|
|
from target_simulator.core.simulation_engine import SimulationEngine
|
|
from target_simulator.core.models import Scenario, Target, Waypoint, ManeuverType
|
|
from unittest.mock import Mock
|
|
from queue import Queue
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_communicator():
|
|
return Mock()
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_update_queue():
|
|
return Queue()
|
|
|
|
|
|
def test_simulation_engine_init(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
assert hasattr(engine, "scenario")
|
|
assert hasattr(engine, "communicator")
|
|
assert hasattr(engine, "update_queue")
|
|
|
|
|
|
def test_simulation_engine_load_scenario(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
scenario = Scenario(name="TestScenario")
|
|
t = Target(
|
|
target_id=1, trajectory=[Waypoint(maneuver_type=ManeuverType.FLY_TO_POINT)]
|
|
)
|
|
scenario.add_target(t)
|
|
engine.load_scenario(scenario)
|
|
assert engine.scenario.name == "TestScenario"
|
|
assert engine.scenario.get_target(1) is not None
|
|
|
|
|
|
def test_simulation_engine_start_stop(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
engine._is_running_event.clear()
|
|
engine._stop_event.clear()
|
|
engine.start()
|
|
assert engine.is_alive()
|
|
engine._stop_event.set()
|
|
engine.join(timeout=1)
|
|
|
|
|
|
def test_set_time_multiplier(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
engine.set_time_multiplier(2.5)
|
|
assert engine.time_multiplier == 2.5
|
|
|
|
|
|
def test_set_update_interval(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
engine.set_update_interval(0.5)
|
|
assert engine.update_interval_s == 0.5
|
|
|
|
|
|
def test_pause_resume(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
engine.pause()
|
|
assert engine._is_paused is True
|
|
engine.resume()
|
|
assert engine._is_paused is False
|
|
|
|
|
|
def test_stop(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
engine.start()
|
|
engine.stop()
|
|
assert not engine.is_alive() or not engine.is_running()
|
|
|
|
|
|
def test_set_simulation_time(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
scenario = Scenario(name="JumpScenario")
|
|
t = Target(target_id=2, trajectory=[Waypoint(maneuver_type=ManeuverType.FLY_TO_POINT)])
|
|
scenario.add_target(t)
|
|
engine.load_scenario(scenario)
|
|
engine.set_simulation_time(42.0)
|
|
assert getattr(t, "_sim_time_s", None) == 42.0
|
|
|
|
|
|
def test_is_running_flag(fake_communicator, fake_update_queue):
|
|
engine = SimulationEngine(fake_communicator, fake_update_queue)
|
|
assert not engine.is_running()
|
|
engine.start()
|
|
assert engine.is_running()
|
|
engine.stop()
|