S1005403_RisCC/tests/gui/test_sfp_history.py

79 lines
2.3 KiB
Python

import os
import glob
import ctypes
import time
import tempfile
import pytest
from target_simulator.gui.sfp_debug_window import DebugPayloadRouter
from target_simulator.core.sfp_structures import SFPHeader
def make_sfp_packet(flow_char: bytes = b'M', tid: int = 1, extra=b'') -> bytes:
"""Create a minimal SFP packet bytes with SFPHeader where SFP_FLOW and SFP_TID are set."""
hdr = SFPHeader()
# Set a few fields; leave others as zeros
hdr.SFP_FLOW = ord(flow_char)
hdr.SFP_TID = int(tid) & 0xFF
# build bytes
hdr_size = ctypes.sizeof(hdr)
hdr_bytes = ctypes.string_at(ctypes.addressof(hdr), hdr_size)
return hdr_bytes + extra
def test_history_append_and_parse_flow_tid():
router = DebugPayloadRouter()
router.clear_history()
pkt = make_sfp_packet(b'M', tid=45, extra=b'payload')
router.update_raw_packet(pkt, ('127.0.0.1', 5555))
hist = router.get_history()
assert len(hist) == 1
entry = hist[0]
# flow numeric value should match ord('M')
assert entry.get('flow') == ord('M')
assert entry.get('tid') == 45
assert entry.get('flow_name') in ('MFD', 'M') or entry.get('flow_name')
def test_history_size_limit():
router = DebugPayloadRouter()
router.clear_history()
router.set_history_size(3)
for i in range(6):
pkt = make_sfp_packet(b'M', tid=i, extra=b'x')
router.update_raw_packet(pkt, ('127.0.0.1', 5555))
hist = router.get_history()
# only last 3 entries kept
assert len(hist) == 3
# last entry should have tid 5
assert hist[-1]['raw'] is not None
def test_persist_writes_file(tmp_path):
router = DebugPayloadRouter()
router.clear_history()
# point persist dir to temporary path
router._persist_dir = str(tmp_path)
router.set_persist(True)
# ensure empty folder
existing = list(tmp_path.glob('sfp_raw_*.bin'))
for f in existing:
f.unlink()
pkt = make_sfp_packet(b'S', tid=99, extra=b'data')
router.update_raw_packet(pkt, ('127.0.0.1', 12345))
# small sleep to allow write
time.sleep(0.05)
files = list(tmp_path.glob('sfp_raw_*.bin'))
assert len(files) >= 1
# content should start with header bytes
with open(files[0], 'rb') as f:
b = f.read()
assert b.startswith(ctypes.string_at(ctypes.addressof(SFPHeader()), ctypes.sizeof(SFPHeader()))) is False or len(b) > 0