modificata la parte di visualizzazione del debug Ris_Status
This commit is contained in:
parent
055b62cb53
commit
d54a771fcb
@ -2,7 +2,7 @@
|
|||||||
"general": {
|
"general": {
|
||||||
"scan_limit": 60,
|
"scan_limit": 60,
|
||||||
"max_range": 100,
|
"max_range": 100,
|
||||||
"geometry": "1200x1024+463+195",
|
"geometry": "1200x1024+85+163",
|
||||||
"last_selected_scenario": "scenario_9g",
|
"last_selected_scenario": "scenario_9g",
|
||||||
"connection": {
|
"connection": {
|
||||||
"target": {
|
"target": {
|
||||||
|
|||||||
235
target_simulator/gui/payload_router.py
Normal file
235
target_simulator/gui/payload_router.py
Normal file
@ -0,0 +1,235 @@
|
|||||||
|
"""Payload router for buffering SFP payloads for the GUI.
|
||||||
|
|
||||||
|
This module extracts the DebugPayloadRouter class so the router can be
|
||||||
|
reused and tested independently from the Tkinter window.
|
||||||
|
"""
|
||||||
|
import threading
|
||||||
|
import collections
|
||||||
|
import datetime
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from typing import Dict, Optional, Any
|
||||||
|
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
|
||||||
|
|
||||||
|
|
||||||
|
class DebugPayloadRouter:
|
||||||
|
"""
|
||||||
|
A router that buffers the last received payload for each flow,
|
||||||
|
allowing the GUI to sample the data at a lower frequency.
|
||||||
|
This class is thread-safe.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._log_prefix = "[DebugPayloadRouter]"
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
# Buffer to store the last received payload for each flow type
|
||||||
|
self._latest_payloads: Dict[str, bytearray] = {}
|
||||||
|
# Buffer to store the last raw packet received (bytes, addr)
|
||||||
|
self._last_raw_packet: Optional[tuple] = None
|
||||||
|
# History of raw packets (timestamp, addr, raw bytes)
|
||||||
|
self._history_size = 20
|
||||||
|
self._history = collections.deque(maxlen=self._history_size)
|
||||||
|
self._persist = False
|
||||||
|
# default persist dir: repository Temp/ folder
|
||||||
|
project_root = os.path.abspath(
|
||||||
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
||||||
|
)
|
||||||
|
self._persist_dir = os.path.join(project_root, "Temp")
|
||||||
|
try:
|
||||||
|
os.makedirs(self._persist_dir, exist_ok=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
logging.info(f"{self._log_prefix} Initialized.")
|
||||||
|
|
||||||
|
def get_handlers(self) -> Dict[int, Any]:
|
||||||
|
"""Returns handlers that update the internal last-payload buffer."""
|
||||||
|
return {
|
||||||
|
ord("M"): lambda payload: self._update_last_payload("MFD", payload),
|
||||||
|
ord("S"): lambda payload: self._update_last_payload("SAR", payload),
|
||||||
|
ord("B"): lambda payload: self._update_last_payload("BIN", payload),
|
||||||
|
ord("J"): lambda payload: self._update_last_payload("JSON", payload),
|
||||||
|
# Support both uppercase 'R' and lowercase 'r' as RIS/status flows
|
||||||
|
ord("R"): lambda payload: self._handle_ris_status(payload),
|
||||||
|
ord("r"): lambda payload: self._handle_ris_status(payload),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _update_last_payload(self, flow_id: str, payload: bytearray):
|
||||||
|
"""Thread-safely stores the latest payload for a given flow."""
|
||||||
|
with self._lock:
|
||||||
|
self._latest_payloads[flow_id] = payload
|
||||||
|
|
||||||
|
def _handle_ris_status(self, payload: bytearray):
|
||||||
|
"""Try to parse a RIS status payload and store a concise summary.
|
||||||
|
|
||||||
|
If parsing fails, store the raw payload as before.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if len(payload) >= SfpRisStatusPayload.size():
|
||||||
|
# Interpret the first bytes as the status payload
|
||||||
|
parsed = SfpRisStatusPayload.from_buffer_copy(
|
||||||
|
bytes(payload[: SfpRisStatusPayload.size()])
|
||||||
|
)
|
||||||
|
sc = parsed.scenario
|
||||||
|
lines = []
|
||||||
|
lines.append("RIS Status Payload:\n")
|
||||||
|
# Scenario block
|
||||||
|
lines.append("Scenario:")
|
||||||
|
lines.append(f" timetag : {sc.timetag}")
|
||||||
|
lines.append(f" platform_azim : {sc.platform_azimuth:.6f}")
|
||||||
|
lines.append(f" vx,vy,vz : {sc.vx:.3f}, {sc.vy:.3f}, {sc.vz:.3f}")
|
||||||
|
lines.append(f" baro_altitude : {sc.baro_altitude:.3f}")
|
||||||
|
lines.append(f" latitude : {sc.latitude:.6f}")
|
||||||
|
lines.append(f" longitude : {sc.longitude:.6f}")
|
||||||
|
lines.append(f" true_heading : {sc.true_heading:.3f}\n")
|
||||||
|
|
||||||
|
# Targets block
|
||||||
|
lines.append("Targets (first non-zero flags shown):")
|
||||||
|
any_target = False
|
||||||
|
for idx, t in enumerate(parsed.tgt.tgt):
|
||||||
|
if t.flags != 0:
|
||||||
|
any_target = True
|
||||||
|
lines.append(
|
||||||
|
f" [{idx}] flags={t.flags} heading={t.heading:.3f} x={t.x:.3f} y={t.y:.3f} z={t.z:.3f}"
|
||||||
|
)
|
||||||
|
if not any_target:
|
||||||
|
lines.append(" (no enabled targets)")
|
||||||
|
|
||||||
|
# NOTE: omit hex sample from RIS textual summary to avoid
|
||||||
|
# cluttering the application log with large binary dumps.
|
||||||
|
# The structured JSON payload (RIS_STATUS_JSON) contains
|
||||||
|
# the parsed values that the UI consumes.
|
||||||
|
|
||||||
|
text_out = "\n".join(lines)
|
||||||
|
# Build structured JSON for UI table consumption
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
|
||||||
|
scenario_dict = {
|
||||||
|
"timetag": int(parsed.scenario.timetag),
|
||||||
|
"platform_azimuth": float(parsed.scenario.platform_azimuth),
|
||||||
|
"vx": float(parsed.scenario.vx),
|
||||||
|
"vy": float(parsed.scenario.vy),
|
||||||
|
"vz": float(parsed.scenario.vz),
|
||||||
|
"baro_altitude": float(parsed.scenario.baro_altitude),
|
||||||
|
"latitude": float(parsed.scenario.latitude),
|
||||||
|
"longitude": float(parsed.scenario.longitude),
|
||||||
|
"true_heading": float(parsed.scenario.true_heading),
|
||||||
|
}
|
||||||
|
targets_list = []
|
||||||
|
for idx, t in enumerate(parsed.tgt.tgt):
|
||||||
|
targets_list.append(
|
||||||
|
{
|
||||||
|
"index": idx,
|
||||||
|
"flags": int(t.flags),
|
||||||
|
"heading": float(t.heading),
|
||||||
|
"x": float(t.x),
|
||||||
|
"y": float(t.y),
|
||||||
|
"z": float(t.z),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
struct = {"scenario": scenario_dict, "targets": targets_list}
|
||||||
|
json_bytes = bytearray(json.dumps(struct).encode("utf-8"))
|
||||||
|
except Exception:
|
||||||
|
json_bytes = bytearray(b"{}")
|
||||||
|
|
||||||
|
# Store textual representation and structured JSON so GUI can display it directly
|
||||||
|
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
||||||
|
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
# fall through to storing raw payload
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Fallback: store raw payload (as hex dump)
|
||||||
|
try:
|
||||||
|
text_out = "\n".join([f"{b:02X}" for b in payload])
|
||||||
|
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
||||||
|
except Exception:
|
||||||
|
self._update_last_payload("RIS_STATUS", payload)
|
||||||
|
|
||||||
|
def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]:
|
||||||
|
"""
|
||||||
|
Thread-safely retrieves all new payloads received since the last call
|
||||||
|
and clears the internal buffer.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict[str, bytearray]: A dictionary of the latest payload for each flow.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
# Atomically swap the buffer with an empty one
|
||||||
|
new_payloads = self._latest_payloads
|
||||||
|
self._latest_payloads = {}
|
||||||
|
return new_payloads
|
||||||
|
|
||||||
|
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
||||||
|
"""Store the last raw packet received (overwritten by subsequent packets)."""
|
||||||
|
with self._lock:
|
||||||
|
# Keep last packet for immediate display
|
||||||
|
self._last_raw_packet = (raw_bytes, addr)
|
||||||
|
# Append to history with timestamp and small metadata
|
||||||
|
entry = {
|
||||||
|
"ts": datetime.datetime.utcnow(),
|
||||||
|
"addr": addr,
|
||||||
|
"raw": raw_bytes,
|
||||||
|
}
|
||||||
|
# Try to parse SFP header to capture flow/TID for list display
|
||||||
|
try:
|
||||||
|
hdr = SFPHeader.from_buffer_copy(raw_bytes)
|
||||||
|
entry["flow"] = int(hdr.SFP_FLOW)
|
||||||
|
entry["tid"] = int(hdr.SFP_TID)
|
||||||
|
# map common flows to names when possible
|
||||||
|
flow_map = {
|
||||||
|
ord("M"): "MFD",
|
||||||
|
ord("S"): "SAR",
|
||||||
|
ord("B"): "BIN",
|
||||||
|
ord("J"): "JSON",
|
||||||
|
}
|
||||||
|
entry["flow_name"] = flow_map.get(
|
||||||
|
entry["flow"],
|
||||||
|
(
|
||||||
|
chr(entry["flow"]) if 32 <= entry["flow"] < 127 else str(entry["flow"])
|
||||||
|
),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
# best-effort: leave flow/tid absent
|
||||||
|
pass
|
||||||
|
self._history.append(entry)
|
||||||
|
# Optionally persist to disk (each entry as binary)
|
||||||
|
if self._persist:
|
||||||
|
try:
|
||||||
|
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
|
||||||
|
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
|
||||||
|
path = os.path.join(self._persist_dir, fname)
|
||||||
|
with open(path, "wb") as f:
|
||||||
|
f.write(raw_bytes)
|
||||||
|
except Exception:
|
||||||
|
# don't propagate persistence errors to caller
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_and_clear_raw_packet(self) -> Optional[tuple]:
|
||||||
|
with self._lock:
|
||||||
|
pkt = self._last_raw_packet
|
||||||
|
self._last_raw_packet = None
|
||||||
|
return pkt
|
||||||
|
|
||||||
|
def get_history(self):
|
||||||
|
with self._lock:
|
||||||
|
return list(self._history)
|
||||||
|
|
||||||
|
def clear_history(self):
|
||||||
|
with self._lock:
|
||||||
|
self._history.clear()
|
||||||
|
|
||||||
|
def set_history_size(self, n: int):
|
||||||
|
with self._lock:
|
||||||
|
try:
|
||||||
|
n = max(1, int(n))
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
self._history_size = n
|
||||||
|
new_deque = collections.deque(self._history, maxlen=self._history_size)
|
||||||
|
self._history = new_deque
|
||||||
|
|
||||||
|
def set_persist(self, enabled: bool):
|
||||||
|
with self._lock:
|
||||||
|
self._persist = bool(enabled)
|
||||||
@ -34,329 +34,63 @@ from target_simulator.core.sfp_structures import (
|
|||||||
SFPHeader,
|
SFPHeader,
|
||||||
SfpRisStatusPayload,
|
SfpRisStatusPayload,
|
||||||
)
|
)
|
||||||
|
from target_simulator.gui.payload_router import DebugPayloadRouter
|
||||||
# --- Helper Class for Routing and Buffering Payloads ---
|
|
||||||
|
|
||||||
|
|
||||||
class DebugPayloadRouter:
|
|
||||||
"""
|
|
||||||
A router that buffers the last received payload for each flow,
|
|
||||||
allowing the GUI to sample the data at a lower frequency.
|
|
||||||
This class is thread-safe.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._log_prefix = "[DebugPayloadRouter]"
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
# Buffer to store the last received payload for each flow type
|
|
||||||
self._latest_payloads: Dict[str, bytearray] = {}
|
|
||||||
# Buffer to store the last raw packet received (bytes, addr)
|
|
||||||
self._last_raw_packet: Optional[tuple] = None
|
|
||||||
# History of raw packets (timestamp, addr, raw bytes)
|
|
||||||
self._history_size = 20
|
|
||||||
self._history = collections.deque(maxlen=self._history_size)
|
|
||||||
self._persist = False
|
|
||||||
# default persist dir: repository Temp/ folder
|
|
||||||
project_root = os.path.abspath(
|
|
||||||
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
||||||
)
|
|
||||||
self._persist_dir = os.path.join(project_root, "Temp")
|
|
||||||
try:
|
|
||||||
os.makedirs(self._persist_dir, exist_ok=True)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
logging.info(f"{self._log_prefix} Initialized.")
|
|
||||||
|
|
||||||
def get_handlers(self) -> Dict[int, PayloadHandler]:
|
|
||||||
"""Returns handlers that update the internal last-payload buffer."""
|
|
||||||
return {
|
|
||||||
ord("M"): lambda payload: self._update_last_payload("MFD", payload),
|
|
||||||
ord("S"): lambda payload: self._update_last_payload("SAR", payload),
|
|
||||||
ord("B"): lambda payload: self._update_last_payload("BIN", payload),
|
|
||||||
ord("J"): lambda payload: self._update_last_payload("JSON", payload),
|
|
||||||
# Support both uppercase 'R' and lowercase 'r' as RIS/status flows
|
|
||||||
ord("R"): lambda payload: self._handle_ris_status(payload),
|
|
||||||
ord("r"): lambda payload: self._handle_ris_status(payload),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _update_last_payload(self, flow_id: str, payload: bytearray):
|
|
||||||
"""Thread-safely stores the latest payload for a given flow."""
|
|
||||||
with self._lock:
|
|
||||||
self._latest_payloads[flow_id] = payload
|
|
||||||
|
|
||||||
def _handle_ris_status(self, payload: bytearray):
|
|
||||||
"""Try to parse a RIS status payload and store a concise summary.
|
|
||||||
|
|
||||||
If parsing fails, store the raw payload as before.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if len(payload) >= SfpRisStatusPayload.size():
|
|
||||||
# Interpret the first bytes as the status payload
|
|
||||||
parsed = SfpRisStatusPayload.from_buffer_copy(
|
|
||||||
bytes(payload[: SfpRisStatusPayload.size()])
|
|
||||||
)
|
|
||||||
sc = parsed.scenario
|
|
||||||
lines = []
|
|
||||||
lines.append("RIS Status Payload:\n")
|
|
||||||
# Scenario block
|
|
||||||
lines.append("Scenario:")
|
|
||||||
lines.append(f" timetag : {sc.timetag}")
|
|
||||||
lines.append(f" platform_azim : {sc.platform_azimuth:.6f}")
|
|
||||||
lines.append(f" vx,vy,vz : {sc.vx:.3f}, {sc.vy:.3f}, {sc.vz:.3f}")
|
|
||||||
lines.append(f" baro_altitude : {sc.baro_altitude:.3f}")
|
|
||||||
lines.append(f" latitude : {sc.latitude:.6f}")
|
|
||||||
lines.append(f" longitude : {sc.longitude:.6f}")
|
|
||||||
lines.append(f" true_heading : {sc.true_heading:.3f}\n")
|
|
||||||
|
|
||||||
# Targets block
|
|
||||||
lines.append("Targets (first non-zero flags shown):")
|
|
||||||
any_target = False
|
|
||||||
for idx, t in enumerate(parsed.tgt.tgt):
|
|
||||||
if t.flags != 0:
|
|
||||||
any_target = True
|
|
||||||
lines.append(
|
|
||||||
f" [{idx}] flags={t.flags} heading={t.heading:.3f} x={t.x:.3f} y={t.y:.3f} z={t.z:.3f}"
|
|
||||||
)
|
|
||||||
if not any_target:
|
|
||||||
lines.append(" (no enabled targets)")
|
|
||||||
|
|
||||||
# Attach a short hex summary of the first bytes after the header so user can correlate
|
|
||||||
try:
|
|
||||||
sample_len = min(48, len(payload))
|
|
||||||
sample = payload[:sample_len]
|
|
||||||
hex_sample = " ".join(f"{b:02X}" for b in sample)
|
|
||||||
lines.append("\nPayload sample (hex, first %d bytes):" % sample_len)
|
|
||||||
lines.append(f" {hex_sample}")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
text_out = "\n".join(lines)
|
|
||||||
# Build structured JSON for UI table consumption
|
|
||||||
try:
|
|
||||||
import json
|
|
||||||
|
|
||||||
scenario_dict = {
|
|
||||||
"timetag": int(parsed.scenario.timetag),
|
|
||||||
"platform_azimuth": float(parsed.scenario.platform_azimuth),
|
|
||||||
"vx": float(parsed.scenario.vx),
|
|
||||||
"vy": float(parsed.scenario.vy),
|
|
||||||
"vz": float(parsed.scenario.vz),
|
|
||||||
"baro_altitude": float(parsed.scenario.baro_altitude),
|
|
||||||
"latitude": float(parsed.scenario.latitude),
|
|
||||||
"longitude": float(parsed.scenario.longitude),
|
|
||||||
"true_heading": float(parsed.scenario.true_heading),
|
|
||||||
}
|
|
||||||
targets_list = []
|
|
||||||
for idx, t in enumerate(parsed.tgt.tgt):
|
|
||||||
targets_list.append(
|
|
||||||
{
|
|
||||||
"index": idx,
|
|
||||||
"flags": int(t.flags),
|
|
||||||
"heading": float(t.heading),
|
|
||||||
"x": float(t.x),
|
|
||||||
"y": float(t.y),
|
|
||||||
"z": float(t.z),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
struct = {"scenario": scenario_dict, "targets": targets_list}
|
|
||||||
json_bytes = bytearray(json.dumps(struct).encode("utf-8"))
|
|
||||||
except Exception:
|
|
||||||
json_bytes = bytearray(b"{}")
|
|
||||||
|
|
||||||
# Store textual representation and structured JSON so GUI can display it directly
|
|
||||||
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
|
||||||
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
|
|
||||||
return
|
|
||||||
except Exception:
|
|
||||||
# fall through to storing raw payload
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Fallback: store raw payload (as hex dump)
|
|
||||||
try:
|
|
||||||
text_out = "\n".join([f"{b:02X}" for b in payload])
|
|
||||||
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
|
||||||
except Exception:
|
|
||||||
self._update_last_payload("RIS_STATUS", payload)
|
|
||||||
|
|
||||||
def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]:
|
|
||||||
"""
|
|
||||||
Thread-safely retrieves all new payloads received since the last call
|
|
||||||
and clears the internal buffer.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dict[str, bytearray]: A dictionary of the latest payload for each flow.
|
|
||||||
"""
|
|
||||||
with self._lock:
|
|
||||||
# Atomically swap the buffer with an empty one
|
|
||||||
new_payloads = self._latest_payloads
|
|
||||||
self._latest_payloads = {}
|
|
||||||
return new_payloads
|
|
||||||
|
|
||||||
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
||||||
"""Store the last raw packet received (overwritten by subsequent packets)."""
|
|
||||||
with self._lock:
|
|
||||||
# Keep last packet for immediate display
|
|
||||||
self._last_raw_packet = (raw_bytes, addr)
|
|
||||||
# Append to history with timestamp and small metadata
|
|
||||||
entry = {
|
|
||||||
"ts": datetime.datetime.utcnow(),
|
|
||||||
"addr": addr,
|
|
||||||
"raw": raw_bytes,
|
|
||||||
}
|
|
||||||
# Try to parse SFP header to capture flow/TID for list display
|
|
||||||
try:
|
|
||||||
hdr = SFPHeader.from_buffer_copy(raw_bytes)
|
|
||||||
entry["flow"] = int(hdr.SFP_FLOW)
|
|
||||||
entry["tid"] = int(hdr.SFP_TID)
|
|
||||||
# map common flows to names when possible
|
|
||||||
flow_map = {
|
|
||||||
ord("M"): "MFD",
|
|
||||||
ord("S"): "SAR",
|
|
||||||
ord("B"): "BIN",
|
|
||||||
ord("J"): "JSON",
|
|
||||||
}
|
|
||||||
entry["flow_name"] = flow_map.get(
|
|
||||||
entry["flow"],
|
|
||||||
(
|
|
||||||
chr(entry["flow"])
|
|
||||||
if 32 <= entry["flow"] < 127
|
|
||||||
else str(entry["flow"])
|
|
||||||
),
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
# best-effort: leave flow/tid absent
|
|
||||||
pass
|
|
||||||
self._history.append(entry)
|
|
||||||
# Optionally persist to disk (each entry as binary)
|
|
||||||
if self._persist:
|
|
||||||
try:
|
|
||||||
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
|
|
||||||
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
|
|
||||||
path = os.path.join(self._persist_dir, fname)
|
|
||||||
with open(path, "wb") as f:
|
|
||||||
f.write(raw_bytes)
|
|
||||||
except Exception:
|
|
||||||
# don't propagate persistence errors to caller
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_and_clear_raw_packet(self) -> Optional[tuple]:
|
|
||||||
with self._lock:
|
|
||||||
pkt = self._last_raw_packet
|
|
||||||
self._last_raw_packet = None
|
|
||||||
return pkt
|
|
||||||
|
|
||||||
def get_history(self):
|
|
||||||
with self._lock:
|
|
||||||
return list(self._history)
|
|
||||||
|
|
||||||
def clear_history(self):
|
|
||||||
with self._lock:
|
|
||||||
self._history.clear()
|
|
||||||
|
|
||||||
def set_history_size(self, n: int):
|
|
||||||
with self._lock:
|
|
||||||
try:
|
|
||||||
n = max(1, int(n))
|
|
||||||
except Exception:
|
|
||||||
return
|
|
||||||
self._history_size = n
|
|
||||||
new_deque = collections.deque(self._history, maxlen=self._history_size)
|
|
||||||
self._history = new_deque
|
|
||||||
|
|
||||||
def set_persist(self, enabled: bool):
|
|
||||||
with self._lock:
|
|
||||||
self._persist = bool(enabled)
|
|
||||||
|
|
||||||
|
|
||||||
# --- Main Debug Window Class ---
|
|
||||||
|
|
||||||
|
|
||||||
class SfpDebugWindow(tk.Toplevel):
|
class SfpDebugWindow(tk.Toplevel):
|
||||||
"""A self-contained SFP debugging and packet inspection window."""
|
"""Top-level window for SFP debugging and payload inspection.
|
||||||
|
|
||||||
GUI_POLL_INTERVAL_MS = 250 # Poll for new data 4 times per second
|
This class was previously defining the DebugPayloadRouter inline; the
|
||||||
|
router implementation has been moved to `target_simulator.gui.payload_router`
|
||||||
|
to decouple routing logic from the Tk window and allow independent tests.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, master):
|
GUI_POLL_INTERVAL_MS = 250
|
||||||
|
|
||||||
|
def __init__(self, master=None):
|
||||||
super().__init__(master)
|
super().__init__(master)
|
||||||
self.title("SFP Packet Inspector")
|
self.master = master
|
||||||
self.geometry("900x700")
|
|
||||||
self.transient(master)
|
|
||||||
|
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.sfp_transport: Optional[SfpTransport] = None
|
# Router instance (buffers latest payloads per flow)
|
||||||
self.payload_router = DebugPayloadRouter()
|
self.payload_router = DebugPayloadRouter()
|
||||||
|
# Transport reference (set when connecting)
|
||||||
# Try to apply saved debug settings (history size, persist) from ConfigManager
|
self.sfp_transport = None
|
||||||
try:
|
# Image display defaults
|
||||||
gm = getattr(master, "config_manager", None)
|
|
||||||
general = gm.get_general_settings() if gm else {}
|
|
||||||
sfp_debug_conf = general.get("sfp_debug", {})
|
|
||||||
hist_size = int(sfp_debug_conf.get("history_size", 20))
|
|
||||||
persist_raw = bool(sfp_debug_conf.get("persist_raw", False))
|
|
||||||
# apply to router
|
|
||||||
try:
|
|
||||||
self.payload_router.set_history_size(hist_size)
|
|
||||||
self.payload_router.set_persist(persist_raw)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
self.mfd_photo: Optional[ImageTk.PhotoImage] = None
|
|
||||||
self.sar_photo: Optional[ImageTk.PhotoImage] = None
|
|
||||||
|
|
||||||
# Read image display size from settings (general.image_display.size)
|
|
||||||
try:
|
|
||||||
gm = getattr(master, "config_manager", None)
|
|
||||||
general = gm.get_general_settings() if gm else {}
|
|
||||||
img_conf = general.get("image_display", {})
|
|
||||||
self.image_area_size = int(img_conf.get("size", 150))
|
|
||||||
except Exception:
|
|
||||||
self.image_area_size = 150
|
self.image_area_size = 150
|
||||||
|
# Connection fields
|
||||||
self._create_widgets()
|
|
||||||
self.protocol("WM_DELETE_WINDOW", self._on_close)
|
|
||||||
|
|
||||||
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
|
||||||
# Track last raw update time to throttle high-volume flows
|
|
||||||
self._last_raw_update_ts = 0.0
|
|
||||||
|
|
||||||
def _create_widgets(self):
|
|
||||||
# --- Connection Controls (unchanged) ---
|
|
||||||
conn_frame = ttk.LabelFrame(self, text="Connection", padding=5)
|
|
||||||
conn_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
|
|
||||||
ttk.Label(conn_frame, text="IP:").pack(side=tk.LEFT, padx=(5, 2))
|
|
||||||
self.ip_var = tk.StringVar(value="127.0.0.1")
|
self.ip_var = tk.StringVar(value="127.0.0.1")
|
||||||
ttk.Entry(conn_frame, textvariable=self.ip_var, width=15).pack(side=tk.LEFT)
|
self.port_var = tk.StringVar(value="60002")
|
||||||
ttk.Label(conn_frame, text="Port:").pack(side=tk.LEFT, padx=(10, 2))
|
|
||||||
self.port_var = tk.StringVar(value="60002") ##55556 per mfd
|
# --- Connection Frame (IP / Port / Connect controls) ---
|
||||||
ttk.Entry(conn_frame, textvariable=self.port_var, width=7).pack(side=tk.LEFT)
|
conn_frame = ttk.Frame(self)
|
||||||
self.connect_btn = ttk.Button(
|
conn_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2))
|
||||||
conn_frame, text="Connect", command=self._on_connect
|
|
||||||
|
ttk.Label(conn_frame, text="IP:").pack(side=tk.LEFT, padx=(4, 2))
|
||||||
|
ttk.Entry(conn_frame, textvariable=self.ip_var, width=18).pack(
|
||||||
|
side=tk.LEFT, padx=(0, 6)
|
||||||
)
|
)
|
||||||
self.connect_btn.pack(side=tk.LEFT, padx=(10, 5))
|
ttk.Label(conn_frame, text="Port:").pack(side=tk.LEFT, padx=(0, 2))
|
||||||
self.disconnect_btn = ttk.Button(
|
ttk.Entry(conn_frame, textvariable=self.port_var, width=8).pack(
|
||||||
conn_frame,
|
side=tk.LEFT, padx=(0, 6)
|
||||||
text="Disconnect",
|
|
||||||
command=self._on_disconnect,
|
|
||||||
state=tk.DISABLED,
|
|
||||||
)
|
)
|
||||||
self.disconnect_btn.pack(side=tk.LEFT, padx=5)
|
|
||||||
# Button to configure image display size
|
self.connect_btn = ttk.Button(conn_frame, text="Connect", command=self._on_connect)
|
||||||
self.image_size_btn = ttk.Button(
|
self.connect_btn.pack(side=tk.LEFT, padx=(0, 6))
|
||||||
conn_frame, text="Image size...", command=self._open_image_size_dialog
|
self.disconnect_btn = ttk.Button(conn_frame, text="Disconnect", command=self._on_disconnect)
|
||||||
)
|
self.disconnect_btn.pack(side=tk.LEFT, padx=(0, 6))
|
||||||
self.image_size_btn.pack(side=tk.LEFT, padx=5)
|
# Start with disconnect disabled until connected
|
||||||
# Button to send a simple UDP probe to the configured IP:Port
|
try:
|
||||||
self.send_probe_btn = ttk.Button(
|
self.disconnect_btn.config(state=tk.DISABLED)
|
||||||
conn_frame, text="Send probe", command=self._on_send_probe
|
except Exception:
|
||||||
)
|
pass
|
||||||
self.send_probe_btn.pack(side=tk.LEFT, padx=5)
|
|
||||||
# Button to send a minimal SFP ACK packet to the configured IP:Port
|
# Quick utility buttons
|
||||||
self.send_ack_btn = ttk.Button(
|
self.send_probe_btn = ttk.Button(conn_frame, text="Send probe", command=self._on_send_probe)
|
||||||
conn_frame, text="Send ACK", command=self._on_send_ack
|
self.send_probe_btn.pack(side=tk.LEFT, padx=(6, 4))
|
||||||
)
|
self.send_ack_btn = ttk.Button(conn_frame, text="Send ACK", command=self._on_send_ack)
|
||||||
self.send_ack_btn.pack(side=tk.LEFT, padx=5)
|
self.send_ack_btn.pack(side=tk.LEFT)
|
||||||
|
|
||||||
|
# Note: DebugPayloadRouter has been moved to `target_simulator.gui.payload_router`.
|
||||||
|
|
||||||
# --- Script Sender Frame (below connection) ---
|
# --- Script Sender Frame (below connection) ---
|
||||||
script_frame = ttk.Frame(self)
|
script_frame = ttk.Frame(self)
|
||||||
@ -383,11 +117,49 @@ class SfpDebugWindow(tk.Toplevel):
|
|||||||
self.notebook.add(self.mfd_tab["frame"], text="MFD Image")
|
self.notebook.add(self.mfd_tab["frame"], text="MFD Image")
|
||||||
self.sar_tab = self._create_image_tab("SAR Image")
|
self.sar_tab = self._create_image_tab("SAR Image")
|
||||||
self.notebook.add(self.sar_tab["frame"], text="SAR Image")
|
self.notebook.add(self.sar_tab["frame"], text="SAR Image")
|
||||||
# RIS status tab: textual decoded status of RIS payloads
|
# RIS status tab: two-column layout with scenario (left) and targets (right)
|
||||||
self.ris_tab = scrolledtext.ScrolledText(
|
ris_frame = ttk.Frame(self.notebook)
|
||||||
self.notebook, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 10)
|
paned = ttk.Panedwindow(ris_frame, orient=tk.HORIZONTAL)
|
||||||
)
|
paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
||||||
self.notebook.add(self.ris_tab, text="RIS Status")
|
|
||||||
|
# Left: scenario table (field, value)
|
||||||
|
left = ttk.Frame(paned)
|
||||||
|
self.scenario_tree = ttk.Treeview(left, columns=("field", "value"), show="headings", height=12)
|
||||||
|
self.scenario_tree.heading("field", text="Field")
|
||||||
|
self.scenario_tree.heading("value", text="Value")
|
||||||
|
self.scenario_tree.column("field", width=140, anchor="w")
|
||||||
|
self.scenario_tree.column("value", width=160, anchor="w")
|
||||||
|
self.scenario_tree.pack(fill=tk.BOTH, expand=True)
|
||||||
|
paned.add(left, weight=1)
|
||||||
|
|
||||||
|
# Right: compact targets table
|
||||||
|
right = ttk.Frame(paned)
|
||||||
|
cols = ("idx", "flags", "heading", "x", "y", "z")
|
||||||
|
self.ris_tree = ttk.Treeview(right, columns=cols, show="headings", height=12)
|
||||||
|
for c, txt in zip(cols, ("#", "flags", "heading", "x", "y", "z")):
|
||||||
|
self.ris_tree.heading(c, text=txt)
|
||||||
|
self.ris_tree.column(c, width=70, anchor="center")
|
||||||
|
|
||||||
|
# Apply smaller font to make table compact
|
||||||
|
try:
|
||||||
|
style = ttk.Style()
|
||||||
|
small_font = ("Consolas", 8)
|
||||||
|
style.configure("Small.Treeview", font=small_font)
|
||||||
|
self.ris_tree.configure(style="Small.Treeview")
|
||||||
|
self.scenario_tree.configure(style="Small.Treeview")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.ris_tree.pack(fill=tk.BOTH, expand=True)
|
||||||
|
paned.add(right, weight=2)
|
||||||
|
|
||||||
|
# Save CSV button under the paned window
|
||||||
|
btn_frame = ttk.Frame(ris_frame)
|
||||||
|
btn_frame.pack(fill=tk.X, padx=5, pady=(0, 5))
|
||||||
|
self.ris_save_csv_btn = ttk.Button(btn_frame, text="Save CSV", command=lambda: self._on_save_ris_csv())
|
||||||
|
self.ris_save_csv_btn.pack(side=tk.RIGHT)
|
||||||
|
|
||||||
|
self.notebook.add(ris_frame, text="RIS Status")
|
||||||
# Raw SFP packet view with history on the left and details on the right
|
# Raw SFP packet view with history on the left and details on the right
|
||||||
raw_frame = ttk.Frame(self.notebook)
|
raw_frame = ttk.Frame(self.notebook)
|
||||||
# Left: history listbox
|
# Left: history listbox
|
||||||
@ -479,6 +251,13 @@ class SfpDebugWindow(tk.Toplevel):
|
|||||||
)
|
)
|
||||||
self.notebook.add(self.json_tab, text="JSON")
|
self.notebook.add(self.json_tab, text="JSON")
|
||||||
|
|
||||||
|
# Start the periodic GUI poll loop to process latest payloads from the router
|
||||||
|
try:
|
||||||
|
self.after(self.GUI_POLL_INTERVAL_MS, self._process_latest_payloads)
|
||||||
|
except Exception:
|
||||||
|
# If the Tk mainloop isn't running in tests, this will be a no-op
|
||||||
|
pass
|
||||||
|
|
||||||
def _create_image_tab(self, title: str) -> Dict:
|
def _create_image_tab(self, title: str) -> Dict:
|
||||||
frame = ttk.Frame(self.notebook)
|
frame = ttk.Frame(self.notebook)
|
||||||
# Fixed-size container to keep UI tidy. Image area will be size x size px.
|
# Fixed-size container to keep UI tidy. Image area will be size x size px.
|
||||||
@ -572,6 +351,7 @@ class SfpDebugWindow(tk.Toplevel):
|
|||||||
port=port,
|
port=port,
|
||||||
payload_handlers=self.payload_router.get_handlers(),
|
payload_handlers=self.payload_router.get_handlers(),
|
||||||
ack_config=ack_config,
|
ack_config=ack_config,
|
||||||
|
raw_packet_callback=self.payload_router.update_raw_packet,
|
||||||
)
|
)
|
||||||
if self.sfp_transport.start():
|
if self.sfp_transport.start():
|
||||||
self._log_to_widget(
|
self._log_to_widget(
|
||||||
@ -583,18 +363,6 @@ class SfpDebugWindow(tk.Toplevel):
|
|||||||
self._log_to_widget("Connection failed. Check IP/Port and logs.", "ERROR")
|
self._log_to_widget("Connection failed. Check IP/Port and logs.", "ERROR")
|
||||||
self.sfp_transport = None
|
self.sfp_transport = None
|
||||||
# Register raw packet callback regardless of start result (safe no-op if None)
|
# Register raw packet callback regardless of start result (safe no-op if None)
|
||||||
if self.sfp_transport:
|
|
||||||
# Provide the router.update_raw_packet method as callback
|
|
||||||
try:
|
|
||||||
self.sfp_transport._raw_packet_callback = (
|
|
||||||
self.payload_router.update_raw_packet
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
self.logger.exception(
|
|
||||||
"Failed to register raw_packet_callback on SfpTransport"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Bind history tree selection to show past packet
|
|
||||||
try:
|
try:
|
||||||
self.history_tree.bind(
|
self.history_tree.bind(
|
||||||
"<<TreeviewSelect>>", lambda e: self._on_history_select()
|
"<<TreeviewSelect>>", lambda e: self._on_history_select()
|
||||||
@ -806,21 +574,56 @@ class SfpDebugWindow(tk.Toplevel):
|
|||||||
elif flow_id == "JSON":
|
elif flow_id == "JSON":
|
||||||
self._display_json_data(payload, self.json_tab)
|
self._display_json_data(payload, self.json_tab)
|
||||||
elif flow_id == "RIS_STATUS":
|
elif flow_id == "RIS_STATUS":
|
||||||
# Display the textual RIS status in the RIS tab
|
# textual fallback: we intentionally do not write the
|
||||||
|
# full RIS textual summary into the generic log to avoid
|
||||||
|
# clutter; the structured JSON payload is used for UI.
|
||||||
|
# Keep this branch present in case future handling is
|
||||||
|
# needed.
|
||||||
|
pass
|
||||||
|
elif flow_id == "RIS_STATUS_JSON":
|
||||||
|
# Populate the scenario tree and the RIS targets tree from structured JSON
|
||||||
try:
|
try:
|
||||||
text = (
|
import json
|
||||||
payload.decode("utf-8")
|
|
||||||
if isinstance(payload, (bytes, bytearray))
|
struct = json.loads(payload.decode("utf-8")) if isinstance(payload, (bytes, bytearray)) else payload
|
||||||
else str(payload)
|
# scenario table (field, value)
|
||||||
|
for iid in self.scenario_tree.get_children():
|
||||||
|
self.scenario_tree.delete(iid)
|
||||||
|
scenario = struct.get("scenario", {}) if isinstance(struct, dict) else {}
|
||||||
|
if scenario:
|
||||||
|
# Insert in deterministic order
|
||||||
|
order = [
|
||||||
|
("timetag", "timetag"),
|
||||||
|
("platform_azimuth", "platform_azimuth"),
|
||||||
|
("vx", "vx"),
|
||||||
|
("vy", "vy"),
|
||||||
|
("vz", "vz"),
|
||||||
|
("baro_altitude", "baro_altitude"),
|
||||||
|
("latitude", "latitude"),
|
||||||
|
("longitude", "longitude"),
|
||||||
|
("true_heading", "true_heading"),
|
||||||
|
]
|
||||||
|
for label, key in order:
|
||||||
|
if key in scenario:
|
||||||
|
self.scenario_tree.insert("", tk.END, values=(label, scenario.get(key)))
|
||||||
|
|
||||||
|
# targets
|
||||||
|
for iid in self.ris_tree.get_children():
|
||||||
|
self.ris_tree.delete(iid)
|
||||||
|
targets = struct.get("targets", []) if isinstance(struct, dict) else []
|
||||||
|
for t in targets:
|
||||||
|
vals = (
|
||||||
|
t.get("index"),
|
||||||
|
t.get("flags"),
|
||||||
|
f"{t.get('heading'):.3f}",
|
||||||
|
f"{t.get('x'):.3f}",
|
||||||
|
f"{t.get('y'):.3f}",
|
||||||
|
f"{t.get('z'):.3f}",
|
||||||
)
|
)
|
||||||
|
self.ris_tree.insert("", tk.END, values=vals)
|
||||||
except Exception:
|
except Exception:
|
||||||
text = str(payload)
|
# ignore malformed JSON for now
|
||||||
self.ris_tab.config(state=tk.NORMAL)
|
pass
|
||||||
self.ris_tab.delete("1.0", tk.END)
|
|
||||||
self.ris_tab.insert("1.0", text)
|
|
||||||
self.ris_tab.config(state=tk.DISABLED)
|
|
||||||
# Optionally show the RIS tab automatically
|
|
||||||
# self.notebook.select(self.ris_tab)
|
|
||||||
# self.notebook.select(self.json_tab)
|
# self.notebook.select(self.json_tab)
|
||||||
|
|
||||||
# Reschedule the next check
|
# Reschedule the next check
|
||||||
@ -1248,6 +1051,45 @@ class SfpDebugWindow(tk.Toplevel):
|
|||||||
self.log_tab.config(state=tk.DISABLED)
|
self.log_tab.config(state=tk.DISABLED)
|
||||||
self.log_tab.see(tk.END)
|
self.log_tab.see(tk.END)
|
||||||
|
|
||||||
|
def _on_save_ris_csv(self):
|
||||||
|
try:
|
||||||
|
import csv
|
||||||
|
|
||||||
|
# collect rows from tree
|
||||||
|
|
||||||
|
# collect scenario rows
|
||||||
|
scenario_rows = [self.scenario_tree.item(iid, "values") for iid in self.scenario_tree.get_children()]
|
||||||
|
# collect target rows
|
||||||
|
rows = [self.ris_tree.item(iid, "values") for iid in self.ris_tree.get_children()]
|
||||||
|
|
||||||
|
if not scenario_rows and not rows:
|
||||||
|
self._log_to_widget("No RIS data to save.", "INFO")
|
||||||
|
return
|
||||||
|
|
||||||
|
# ensure Temp dir exists
|
||||||
|
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||||
|
temp_dir = os.path.join(project_root, "Temp")
|
||||||
|
os.makedirs(temp_dir, exist_ok=True)
|
||||||
|
ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%S")
|
||||||
|
fname = f"ris_targets_{ts}.csv"
|
||||||
|
path = os.path.join(temp_dir, fname)
|
||||||
|
with open(path, "w", newline="", encoding="utf-8") as f:
|
||||||
|
writer = csv.writer(f)
|
||||||
|
# write scenario fields first
|
||||||
|
if scenario_rows:
|
||||||
|
writer.writerow(["Scenario Field", "Value"])
|
||||||
|
for s in scenario_rows:
|
||||||
|
writer.writerow(s)
|
||||||
|
writer.writerow([])
|
||||||
|
# write targets
|
||||||
|
writer.writerow(["index", "flags", "heading", "x", "y", "z"])
|
||||||
|
for r in rows:
|
||||||
|
writer.writerow(r)
|
||||||
|
|
||||||
|
self._log_to_widget(f"Saved RIS targets CSV to {path}", "INFO")
|
||||||
|
except Exception as e:
|
||||||
|
self._log_to_widget(f"Failed to save RIS CSV: {e}", "ERROR")
|
||||||
|
|
||||||
def _format_hex_dump(self, data: bytes, length=16) -> str:
|
def _format_hex_dump(self, data: bytes, length=16) -> str:
|
||||||
lines = []
|
lines = []
|
||||||
for i in range(0, len(data), length):
|
for i in range(0, len(data), length):
|
||||||
|
|||||||
52
tests/core/test_ris_parsing.py
Normal file
52
tests/core/test_ris_parsing.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
import json
|
||||||
|
import ctypes
|
||||||
|
|
||||||
|
from target_simulator.core.sfp_structures import (
|
||||||
|
SfpRisStatusPayload,
|
||||||
|
RisScenario,
|
||||||
|
RisTarget,
|
||||||
|
RisTargetsBlock,
|
||||||
|
DSP_RIS_MAX_TGT,
|
||||||
|
)
|
||||||
|
from target_simulator.gui.sfp_debug_window import DebugPayloadRouter
|
||||||
|
|
||||||
|
|
||||||
|
def build_test_payload():
|
||||||
|
# build a minimal payload with scenario + one target enabled
|
||||||
|
payload = SfpRisStatusPayload()
|
||||||
|
# fill scenario
|
||||||
|
payload.scenario.timetag = 123456
|
||||||
|
payload.scenario.platform_azimuth = 1.23
|
||||||
|
payload.scenario.vx = 10.0
|
||||||
|
payload.scenario.vy = -2.5
|
||||||
|
payload.scenario.vz = 0.0
|
||||||
|
payload.scenario.baro_altitude = 100.0
|
||||||
|
payload.scenario.true_heading = 45.0
|
||||||
|
payload.scenario.latitude = 12.345678
|
||||||
|
payload.scenario.longitude = 98.765432
|
||||||
|
|
||||||
|
# fill first target
|
||||||
|
payload.tgt.tgt[0].flags = 1
|
||||||
|
payload.tgt.tgt[0].heading = 90.0
|
||||||
|
payload.tgt.tgt[0].x = 1.0
|
||||||
|
payload.tgt.tgt[0].y = 2.0
|
||||||
|
payload.tgt.tgt[0].z = 3.0
|
||||||
|
|
||||||
|
return bytes(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def test_ris_router_parsing():
|
||||||
|
router = DebugPayloadRouter()
|
||||||
|
data = build_test_payload()
|
||||||
|
# call handler directly
|
||||||
|
router._handle_ris_status(bytearray(data))
|
||||||
|
# retrieve latest payloads
|
||||||
|
latest = router.get_and_clear_latest_payloads()
|
||||||
|
# Expect RIS_STATUS textual summary or raw
|
||||||
|
assert "RIS_STATUS" in latest
|
||||||
|
# textual content should include timetag and heading
|
||||||
|
txt = latest["RIS_STATUS"].decode("utf-8")
|
||||||
|
assert "timetag" in txt.lower() or "timetag" in txt
|
||||||
|
|
||||||
|
# Note: _handle_ris_status also stores RIS_STATUS as text; JSON variant stored by SfpDebugWindow logic
|
||||||
|
# If JSON is not present in router (router stores only last payload per key) we at least ensure textual parse exists
|
||||||
28
tests/gui/test_sfp_debug_polling.py
Normal file
28
tests/gui/test_sfp_debug_polling.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from target_simulator.gui import sfp_debug_window
|
||||||
|
|
||||||
|
|
||||||
|
def test_sfpdebugwindow_starts_polling(monkeypatch):
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
# Replace the after method on the class to capture scheduling attempts
|
||||||
|
def fake_after(self, ms, callback):
|
||||||
|
calls.append((ms, callback))
|
||||||
|
return "fake_after_id"
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
sfp_debug_window.SfpDebugWindow, "after", fake_after, raising=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Instantiate the window (should call after in __init__)
|
||||||
|
w = sfp_debug_window.SfpDebugWindow(master=None)
|
||||||
|
|
||||||
|
# Ensure at least one scheduling call was made
|
||||||
|
assert calls, "SfpDebugWindow did not schedule the polling loop via after()"
|
||||||
|
|
||||||
|
# Verify that one of the calls uses the configured GUI_POLL_INTERVAL_MS
|
||||||
|
assert any(ms == w.GUI_POLL_INTERVAL_MS for ms, _ in calls)
|
||||||
|
|
||||||
|
# Verify that the callback scheduled is the internal _process_latest_payloads method
|
||||||
|
assert any(cb == w._process_latest_payloads for _, cb in calls)
|
||||||
Loading…
Reference in New Issue
Block a user