79 lines
2.3 KiB
Python
79 lines
2.3 KiB
Python
import os
|
|
import glob
|
|
import ctypes
|
|
import time
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
from target_simulator.gui.sfp_debug_window import DebugPayloadRouter
|
|
from target_simulator.core.sfp_structures import SFPHeader
|
|
|
|
|
|
def make_sfp_packet(flow_char: bytes = b'M', tid: int = 1, extra=b'') -> bytes:
|
|
"""Create a minimal SFP packet bytes with SFPHeader where SFP_FLOW and SFP_TID are set."""
|
|
hdr = SFPHeader()
|
|
# Set a few fields; leave others as zeros
|
|
hdr.SFP_FLOW = ord(flow_char)
|
|
hdr.SFP_TID = int(tid) & 0xFF
|
|
# build bytes
|
|
hdr_size = ctypes.sizeof(hdr)
|
|
hdr_bytes = ctypes.string_at(ctypes.addressof(hdr), hdr_size)
|
|
return hdr_bytes + extra
|
|
|
|
|
|
def test_history_append_and_parse_flow_tid():
|
|
router = DebugPayloadRouter()
|
|
router.clear_history()
|
|
|
|
pkt = make_sfp_packet(b'M', tid=45, extra=b'payload')
|
|
router.update_raw_packet(pkt, ('127.0.0.1', 5555))
|
|
|
|
hist = router.get_history()
|
|
assert len(hist) == 1
|
|
entry = hist[0]
|
|
# flow numeric value should match ord('M')
|
|
assert entry.get('flow') == ord('M')
|
|
assert entry.get('tid') == 45
|
|
assert entry.get('flow_name') in ('MFD', 'M') or entry.get('flow_name')
|
|
|
|
|
|
def test_history_size_limit():
|
|
router = DebugPayloadRouter()
|
|
router.clear_history()
|
|
router.set_history_size(3)
|
|
for i in range(6):
|
|
pkt = make_sfp_packet(b'M', tid=i, extra=b'x')
|
|
router.update_raw_packet(pkt, ('127.0.0.1', 5555))
|
|
hist = router.get_history()
|
|
# only last 3 entries kept
|
|
assert len(hist) == 3
|
|
# last entry should have tid 5
|
|
assert hist[-1]['raw'] is not None
|
|
|
|
|
|
def test_persist_writes_file(tmp_path):
|
|
router = DebugPayloadRouter()
|
|
router.clear_history()
|
|
# point persist dir to temporary path
|
|
router._persist_dir = str(tmp_path)
|
|
router.set_persist(True)
|
|
|
|
# ensure empty folder
|
|
existing = list(tmp_path.glob('sfp_raw_*.bin'))
|
|
for f in existing:
|
|
f.unlink()
|
|
|
|
pkt = make_sfp_packet(b'S', tid=99, extra=b'data')
|
|
router.update_raw_packet(pkt, ('127.0.0.1', 12345))
|
|
|
|
# small sleep to allow write
|
|
time.sleep(0.05)
|
|
|
|
files = list(tmp_path.glob('sfp_raw_*.bin'))
|
|
assert len(files) >= 1
|
|
# content should start with header bytes
|
|
with open(files[0], 'rb') as f:
|
|
b = f.read()
|
|
assert b.startswith(ctypes.string_at(ctypes.addressof(SFPHeader()), ctypes.sizeof(SFPHeader()))) is False or len(b) > 0
|