import os import tempfile import ctypes import datetime from target_simulator.gui.sfp_debug_window import DebugPayloadRouter from target_simulator.core.sfp_structures import SFPHeader def make_fake_packet(flow=ord("M"), tid=1): # construct a minimal header h = SFPHeader() h.SFP_FLOW = flow h.SFP_TID = tid return ctypes.string_at(ctypes.addressof(h), ctypes.sizeof(h)) def test_history_and_persist_toggle(tmp_path): router = DebugPayloadRouter() # set persist dir to tmp router._persist_dir = str(tmp_path) router.set_persist(True) # add one packet pkt = make_fake_packet() router.update_raw_packet(pkt, ("::1", 1234)) hist = router.get_history() assert len(hist) == 1 # persisted file created files = list(tmp_path.iterdir()) assert files, "No persisted files created" # clear and check router.clear_history() assert router.get_history() == [] def test_get_and_clear_latest_payloads_and_raw_packet(): router = DebugPayloadRouter() router._update_last_payload("MFD", bytearray(b"DATA1")) payloads = router.get_and_clear_latest_payloads() assert payloads.get("MFD") == bytearray(b"DATA1") # raw packet stored and cleared pkt = make_fake_packet(flow=ord("J"), tid=9) router.update_raw_packet(pkt, ("1.1.1.1", 5000)) last = router.get_and_clear_raw_packet() assert last is not None assert last[1] == ("1.1.1.1", 5000) # next call returns None assert router.get_and_clear_raw_packet() is None