SXXXXXXX_PyMsc/tests/test_monitor_buffer.py
2025-12-10 11:47:46 +01:00

95 lines
2.6 KiB
Python

import sys
import os
import threading
import time
import pytest
# Ensure project root is on sys.path when running tests directly
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
from pymsc.core.monitor import MonitorBuffer, MonDecoded, MonitorInfo
from pymsc.core.app_controller import AppController
def make_entry(sa=11, tr=0, payload=b'\x01\x02'):
return MonDecoded(
cw_raw=0x1234,
sw=0x0000,
error_code=0x0000,
sa=sa,
tr=tr,
rt=20,
wc=len(payload)//2,
data=payload,
timetag=time.time()
)
def test_append_and_get_raw_buffer():
buf = MonitorBuffer(buf_size=10)
# append 3 entries
for i in range(3):
buf.append(make_entry(sa=11 + i))
snap = buf.get_raw_buffer()
assert isinstance(snap, MonitorInfo)
assert snap.total_message == 3
assert len(snap.buffer) == 3
# After get_raw_buffer the buffer must be cleared
snap2 = buf.get_raw_buffer()
assert snap2.total_message == 0
assert len(snap2.buffer) == 0
def test_isr_invoked_when_threshold_reached():
called = threading.Event()
captured = {}
def isr(snapshot: MonitorInfo):
captured['n'] = snapshot.total_message
called.set()
buf = MonitorBuffer(buf_size=8, isr_threshold=3)
buf.set_isr(isr)
buf.enable_isr(True)
# Append two entries: below threshold
buf.append(make_entry(sa=12))
buf.append(make_entry(sa=13))
assert not called.is_set()
# Third append should trigger ISR (runs in daemon thread)
buf.append(make_entry(sa=14))
# Wait briefly for ISR to run
assert called.wait(1.0), "ISR was not invoked within 1s"
assert captured.get('n', 0) >= 3
def test_appcontroller_integration_monitor_delegation():
ctrl = AppController()
# Ensure monitor exists and is instance of MonitorBuffer
assert hasattr(ctrl, 'monitor')
ev = threading.Event()
def cb(snapshot):
# just set event
ev.set()
ctrl.set_mon_isr(cb)
ctrl.enableMonitorIsr(True)
# Append via controller.monitor
ctrl.monitor.append(make_entry(sa=15))
ctrl.monitor.append(make_entry(sa=15))
ctrl.monitor.append(make_entry(sa=15))
# ISR threshold default is 256 (buf_size 512 -> threshold 256) but MonitorBuffer in controller was created with 512
# temporarily set a low threshold on controller.monitor for this test
ctrl.monitor._isr_threshold = 2
# append one more to reach threshold
ctrl.monitor.append(make_entry(sa=15))
assert ev.wait(1.0), "Controller monitor ISR not invoked"