S1005403_RisCC/tests/simulation/test_simulation_controller.py

186 lines
5.1 KiB
Python

import types
import time
import pytest
from target_simulator.simulation.simulation_controller import SimulationController
class DummyVar:
def __init__(self, value=None):
self._v = value
def get(self):
return self._v
def set(self, v):
self._v = v
class DummyComm:
def __init__(self, use_json=False, send_results=None):
self.is_open = True
self._use_json_protocol = use_json
# send_results can be a list or a callable
self._send_calls = []
self._send_results = send_results
def send_commands(self, cmds):
self._send_calls.append(cmds)
if callable(self._send_results):
return self._send_results(cmds)
if isinstance(self._send_results, list):
if len(self._send_results) >= len(self._send_calls):
return self._send_results[len(self._send_calls) - 1]
return self._send_results[-1]
return True
def send_scenario(self, scenario):
return True
def router(self):
return None
class DummyCommManager:
def __init__(self, target_comm=None):
self.target_communicator = target_comm
class DummyHub:
def __init__(self, active_sequence=None):
# active_sequence: iterable of booleans returned by has_active_real_targets
self._seq = list(active_sequence) if active_sequence is not None else []
def has_active_real_targets(self):
if self._seq:
return self._seq.pop(0)
return False
def reset(self):
pass
class DummyScenario:
def __init__(self):
class T:
_total_duration_s = 5.0
self.targets = [T()]
def get_all_targets(self):
return self.targets
def reset_simulation(self):
pass
class DummyMainView:
def __init__(self):
self.is_simulation_running = DummyVar(False)
self.update_time = DummyVar(1.0)
self.time_multiplier = 1.0
self.scenario = DummyScenario()
self.ppi_widget = types.SimpleNamespace(
clear_trails=lambda: None, clear_previews=lambda: None
)
self.sim_slider_var = DummyVar(0.0)
self.simulation_engine = None
self.current_archive = None
def _update_simulation_progress_display(self):
pass
def _update_button_states(self):
pass
def _refresh_analysis_list(self):
pass
def test_reset_returns_false_when_not_connected():
hub = DummyHub()
cm = DummyCommManager(None)
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
assert controller.reset_radar_state(mv) is False
def test_legacy_reset_success(monkeypatch):
# Legacy communicator: _use_json_protocol False
comm = DummyComm(use_json=False, send_results=[True])
cm = DummyCommManager(comm)
hub = DummyHub(active_sequence=[False])
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
assert controller.reset_radar_state(mv) is True
def test_json_reset_fallback_to_per_target(monkeypatch):
# JSON communicator that fails simple reset but succeeds per-target
results = [
False,
True,
] # first send (json reset) -> False, second (per-target) -> True
def send_results(cmds):
# pop result accordingly
return results.pop(0)
comm = DummyComm(use_json=True, send_results=send_results)
cm = DummyCommManager(comm)
# hub returns False (cleared) after per-target
hub = DummyHub(active_sequence=[True, False])
# Patch command_builder.build_json_reset_ids to return two payloads
import target_simulator.simulation.simulation_controller as scmod
monkeypatch.setattr(
scmod.command_builder, "build_json_reset_ids", lambda: ['{"id":1}']
)
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
assert controller.reset_radar_state(mv) is True
def test_start_simulation_creates_engine_and_sets_running(monkeypatch):
# Patch SimulationEngine used in controller to a dummy engine
class DummyEngine:
def __init__(self, communicator=None, simulation_hub=None):
self.started = False
def set_time_multiplier(self, v):
pass
def set_update_interval(self, v):
pass
def load_scenario(self, s):
pass
def start(self):
self.started = True
def stop(self):
self.started = False
def is_running(self):
return self.started
monkeypatch.setattr(
"target_simulator.simulation.simulation_controller.SimulationEngine",
DummyEngine,
)
comm = DummyComm(use_json=False, send_results=[True])
cm = DummyCommManager(comm)
hub = DummyHub()
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
# start simulation
controller.start_simulation(mv)
# engine should be created and main_view.simulation_engine set
assert mv.simulation_engine is not None
assert mv.is_simulation_running.get() is True