S1005403_RisCC/tools/test_ris_remove.py

57 lines
1.9 KiB
Python

# Test removal of targets with flags == 0
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
from target_simulator.gui.payload_router import DebugPayloadRouter
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
import target_simulator.gui.payload_router as pr_mod
class FakeRisTarget:
def __init__(self, flags, x, y, z, heading):
self.flags = flags
self.x = x
self.y = y
self.z = z
self.heading = heading
class FakeScenario:
def __init__(self, timetag=123456789):
self.timetag = timetag
class FakeParsed:
def __init__(self, tgt_list):
self.tgt = type('T', (), {'tgt': tgt_list})()
self.scenario = FakeScenario()
@staticmethod
def from_buffer_copy(payload):
raise RuntimeError('Should be monkeypatched')
# first, create initial payload with 2 active targets
rads = [0.0, 1.5707964897155762]
fake_targets = []
for i, h in enumerate(rads):
fake_targets.append(FakeRisTarget(flags=1, x=1000+i, y=2000+i, z=100.0, heading=h))
original_parser = pr_mod.SfpRisStatusPayload
def make_parser(targets):
return type('P', (), {'from_buffer_copy': staticmethod(lambda payload: FakeParsed(targets))})
# inject initial
pr_mod.SfpRisStatusPayload = make_parser(fake_targets)
hub = SimulationStateHub()
router = DebugPayloadRouter(simulation_hub=hub, update_queue=None)
router._handle_ris_status(b'INJ')
print('After first injection, hub ids:', hub.get_all_target_ids())
# now create a payload where target 1 is inactive (flags=0)
fake_targets2 = [FakeRisTarget(flags=1, x=1000, y=2000, z=100.0, heading=0.0), FakeRisTarget(flags=0, x=0, y=0, z=0, heading=0.0)]
pr_mod.SfpRisStatusPayload = make_parser(fake_targets2)
router._handle_ris_status(b'INJ2')
print('After second injection (one inactive), hub ids:', hub.get_all_target_ids())
# restore
pr_mod.SfpRisStatusPayload = original_parser
print('Done')