import os import glob import ctypes import time import tempfile import pytest from target_simulator.gui.sfp_debug_window import DebugPayloadRouter from target_simulator.core.sfp_structures import SFPHeader def make_sfp_packet(flow_char: bytes = b'M', tid: int = 1, extra=b'') -> bytes: """Create a minimal SFP packet bytes with SFPHeader where SFP_FLOW and SFP_TID are set.""" hdr = SFPHeader() # Set a few fields; leave others as zeros hdr.SFP_FLOW = ord(flow_char) hdr.SFP_TID = int(tid) & 0xFF # build bytes hdr_size = ctypes.sizeof(hdr) hdr_bytes = ctypes.string_at(ctypes.addressof(hdr), hdr_size) return hdr_bytes + extra def test_history_append_and_parse_flow_tid(): router = DebugPayloadRouter() router.clear_history() pkt = make_sfp_packet(b'M', tid=45, extra=b'payload') router.update_raw_packet(pkt, ('127.0.0.1', 5555)) hist = router.get_history() assert len(hist) == 1 entry = hist[0] # flow numeric value should match ord('M') assert entry.get('flow') == ord('M') assert entry.get('tid') == 45 assert entry.get('flow_name') in ('MFD', 'M') or entry.get('flow_name') def test_history_size_limit(): router = DebugPayloadRouter() router.clear_history() router.set_history_size(3) for i in range(6): pkt = make_sfp_packet(b'M', tid=i, extra=b'x') router.update_raw_packet(pkt, ('127.0.0.1', 5555)) hist = router.get_history() # only last 3 entries kept assert len(hist) == 3 # last entry should have tid 5 assert hist[-1]['raw'] is not None def test_persist_writes_file(tmp_path): router = DebugPayloadRouter() router.clear_history() # point persist dir to temporary path router._persist_dir = str(tmp_path) router.set_persist(True) # ensure empty folder existing = list(tmp_path.glob('sfp_raw_*.bin')) for f in existing: f.unlink() pkt = make_sfp_packet(b'S', tid=99, extra=b'data') router.update_raw_packet(pkt, ('127.0.0.1', 12345)) # small sleep to allow write time.sleep(0.05) files = list(tmp_path.glob('sfp_raw_*.bin')) assert len(files) >= 1 # content should start with header bytes with open(files[0], 'rb') as f: b = f.read() assert b.startswith(ctypes.string_at(ctypes.addressof(SFPHeader()), ctypes.sizeof(SFPHeader()))) is False or len(b) > 0