import json import ctypes from target_simulator.core.sfp_structures import ( SfpRisStatusPayload, RisScenario, RisTarget, RisTargetsBlock, DSP_RIS_MAX_TGT, ) from target_simulator.gui.sfp_debug_window import DebugPayloadRouter def build_test_payload(): # build a minimal payload with scenario + one target enabled payload = SfpRisStatusPayload() # fill scenario payload.scenario.timetag = 123456 payload.scenario.platform_azimuth = 1.23 payload.scenario.vx = 10.0 payload.scenario.vy = -2.5 payload.scenario.vz = 0.0 payload.scenario.baro_altitude = 100.0 payload.scenario.true_heading = 45.0 payload.scenario.latitude = 12.345678 payload.scenario.longitude = 98.765432 # fill first target payload.tgt.tgt[0].flags = 1 payload.tgt.tgt[0].heading = 90.0 payload.tgt.tgt[0].x = 1.0 payload.tgt.tgt[0].y = 2.0 payload.tgt.tgt[0].z = 3.0 return bytes(payload) def test_ris_router_parsing(): import time router = DebugPayloadRouter() data = build_test_payload() # call processing routine directly (internal API changed to use processing pipeline) router._process_ris_status_payload(bytearray(data), time.monotonic()) # retrieve latest payloads latest = router.get_and_clear_latest_payloads() # Expect one of the RIS payload variants produced by the router assert any(k in latest for k in ("RIS_STATUS", "RIS_STATUS_JSON", "RIS_STATUS_TEXT")) # textual content should include timetag and heading if "RIS_STATUS_JSON" in latest: txt = latest["RIS_STATUS_JSON"].decode("utf-8") elif "RIS_STATUS_TEXT" in latest: txt = latest["RIS_STATUS_TEXT"].decode("utf-8") else: txt = latest.get("RIS_STATUS", b"").decode("utf-8") assert "timetag" in txt.lower() or "timetag" in txt # Note: _handle_ris_status also stores RIS_STATUS as text; JSON variant stored by SfpDebugWindow logic # If JSON is not present in router (router stores only last payload per key) we at least ensure textual parse exists