240 lines
10 KiB
Python
240 lines
10 KiB
Python
"""Payload router for buffering SFP payloads for the GUI.
|
|
|
|
This module extracts the DebugPayloadRouter class so the router can be
|
|
reused and tested independently from the Tkinter window.
|
|
"""
|
|
import threading
|
|
import collections
|
|
import datetime
|
|
import os
|
|
import logging
|
|
from typing import Dict, Optional, Any
|
|
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
|
|
|
|
|
|
class DebugPayloadRouter:
|
|
"""
|
|
A router that buffers the last received payload for each flow,
|
|
allowing the GUI to sample the data at a lower frequency.
|
|
This class is thread-safe.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._log_prefix = "[DebugPayloadRouter]"
|
|
self._lock = threading.Lock()
|
|
# Buffer to store the last received payload for each flow type
|
|
self._latest_payloads: Dict[str, bytearray] = {}
|
|
# Buffer to store the last raw packet received (bytes, addr)
|
|
self._last_raw_packet: Optional[tuple] = None
|
|
# History of raw packets (timestamp, addr, raw bytes)
|
|
self._history_size = 20
|
|
self._history = collections.deque(maxlen=self._history_size)
|
|
self._persist = False
|
|
# default persist dir: repository Temp/ folder
|
|
project_root = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
)
|
|
self._persist_dir = os.path.join(project_root, "Temp")
|
|
try:
|
|
os.makedirs(self._persist_dir, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
logging.info(f"{self._log_prefix} Initialized.")
|
|
|
|
def get_handlers(self) -> Dict[int, Any]:
|
|
"""Returns handlers that update the internal last-payload buffer."""
|
|
return {
|
|
ord("M"): lambda payload: self._update_last_payload("MFD", payload),
|
|
ord("S"): lambda payload: self._update_last_payload("SAR", payload),
|
|
ord("B"): lambda payload: self._update_last_payload("BIN", payload),
|
|
ord("J"): lambda payload: self._update_last_payload("JSON", payload),
|
|
# Support both uppercase 'R' and lowercase 'r' as RIS/status flows
|
|
ord("R"): lambda payload: self._handle_ris_status(payload),
|
|
ord("r"): lambda payload: self._handle_ris_status(payload),
|
|
}
|
|
|
|
def _update_last_payload(self, flow_id: str, payload: bytearray):
|
|
"""Thread-safely stores the latest payload for a given flow."""
|
|
with self._lock:
|
|
self._latest_payloads[flow_id] = payload
|
|
|
|
def _handle_ris_status(self, payload: bytearray):
|
|
"""Try to parse a RIS status payload and store a concise summary.
|
|
|
|
If parsing fails, store the raw payload as before.
|
|
"""
|
|
try:
|
|
if len(payload) >= SfpRisStatusPayload.size():
|
|
# Interpret the first bytes as the status payload
|
|
parsed = SfpRisStatusPayload.from_buffer_copy(
|
|
bytes(payload[: SfpRisStatusPayload.size()])
|
|
)
|
|
sc = parsed.scenario
|
|
lines = []
|
|
lines.append("RIS Status Payload:\n")
|
|
# Scenario block
|
|
lines.append("Scenario:")
|
|
lines.append(f" timetag : {sc.timetag}")
|
|
lines.append(f" platform_azim : {sc.platform_azimuth:.6f}")
|
|
lines.append(f" vx,vy,vz : {sc.vx:.3f}, {sc.vy:.3f}, {sc.vz:.3f}")
|
|
lines.append(f" baro_altitude : {sc.baro_altitude:.3f}")
|
|
lines.append(f" latitude : {sc.latitude:.6f}")
|
|
lines.append(f" longitude : {sc.longitude:.6f}")
|
|
lines.append(f" true_heading : {sc.true_heading:.3f}\n")
|
|
|
|
# Targets block
|
|
lines.append("Targets (first non-zero flags shown):")
|
|
any_target = False
|
|
for idx, t in enumerate(parsed.tgt.tgt):
|
|
if t.flags != 0:
|
|
any_target = True
|
|
lines.append(
|
|
f" [{idx}] flags={t.flags} heading={t.heading:.3f} x={t.x:.3f} y={t.y:.3f} z={t.z:.3f}"
|
|
)
|
|
if not any_target:
|
|
lines.append(" (no enabled targets)")
|
|
|
|
# NOTE: omit hex sample from RIS textual summary to avoid
|
|
# cluttering the application log with large binary dumps.
|
|
# The structured JSON payload (RIS_STATUS_JSON) contains
|
|
# the parsed values that the UI consumes.
|
|
|
|
text_out = "\n".join(lines)
|
|
# Build structured JSON for UI table consumption
|
|
try:
|
|
import json
|
|
|
|
scenario_dict = {
|
|
"timetag": int(parsed.scenario.timetag),
|
|
"flags": int(parsed.scenario.flags),
|
|
"mode": int(parsed.scenario.mode),
|
|
"platform_azimuth": float(parsed.scenario.platform_azimuth),
|
|
"ant_nav_az": float(parsed.scenario.ant_nav_az),
|
|
"ant_nav_el": float(parsed.scenario.ant_nav_el),
|
|
"vx": float(parsed.scenario.vx),
|
|
"vy": float(parsed.scenario.vy),
|
|
"vz": float(parsed.scenario.vz),
|
|
"baro_altitude": float(parsed.scenario.baro_altitude),
|
|
"latitude": float(parsed.scenario.latitude),
|
|
"longitude": float(parsed.scenario.longitude),
|
|
"true_heading": float(parsed.scenario.true_heading),
|
|
}
|
|
targets_list = []
|
|
for idx, t in enumerate(parsed.tgt.tgt):
|
|
targets_list.append(
|
|
{
|
|
"index": idx,
|
|
"flags": int(t.flags),
|
|
"heading": float(t.heading),
|
|
"x": float(t.x),
|
|
"y": float(t.y),
|
|
"z": float(t.z),
|
|
}
|
|
)
|
|
struct = {"scenario": scenario_dict, "targets": targets_list}
|
|
json_bytes = bytearray(json.dumps(struct).encode("utf-8"))
|
|
except Exception:
|
|
json_bytes = bytearray(b"{}")
|
|
|
|
# Store textual representation and structured JSON so GUI can display it directly
|
|
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
|
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
|
|
return
|
|
except Exception:
|
|
# fall through to storing raw payload
|
|
pass
|
|
|
|
# Fallback: store raw payload (as hex dump)
|
|
try:
|
|
text_out = "\n".join([f"{b:02X}" for b in payload])
|
|
self._update_last_payload("RIS_STATUS", bytearray(text_out.encode("utf-8")))
|
|
except Exception:
|
|
self._update_last_payload("RIS_STATUS", payload)
|
|
|
|
def get_and_clear_latest_payloads(self) -> Dict[str, bytearray]:
|
|
"""
|
|
Thread-safely retrieves all new payloads received since the last call
|
|
and clears the internal buffer.
|
|
|
|
Returns:
|
|
Dict[str, bytearray]: A dictionary of the latest payload for each flow.
|
|
"""
|
|
with self._lock:
|
|
# Atomically swap the buffer with an empty one
|
|
new_payloads = self._latest_payloads
|
|
self._latest_payloads = {}
|
|
return new_payloads
|
|
|
|
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
"""Store the last raw packet received (overwritten by subsequent packets)."""
|
|
with self._lock:
|
|
# Keep last packet for immediate display
|
|
self._last_raw_packet = (raw_bytes, addr)
|
|
# Append to history with timestamp and small metadata
|
|
entry = {
|
|
"ts": datetime.datetime.utcnow(),
|
|
"addr": addr,
|
|
"raw": raw_bytes,
|
|
}
|
|
# Try to parse SFP header to capture flow/TID for list display
|
|
try:
|
|
hdr = SFPHeader.from_buffer_copy(raw_bytes)
|
|
entry["flow"] = int(hdr.SFP_FLOW)
|
|
entry["tid"] = int(hdr.SFP_TID)
|
|
# map common flows to names when possible
|
|
flow_map = {
|
|
ord("M"): "MFD",
|
|
ord("S"): "SAR",
|
|
ord("B"): "BIN",
|
|
ord("J"): "JSON",
|
|
}
|
|
entry["flow_name"] = flow_map.get(
|
|
entry["flow"],
|
|
(
|
|
chr(entry["flow"]) if 32 <= entry["flow"] < 127 else str(entry["flow"])
|
|
),
|
|
)
|
|
except Exception:
|
|
# best-effort: leave flow/tid absent
|
|
pass
|
|
self._history.append(entry)
|
|
# Optionally persist to disk (each entry as binary)
|
|
if self._persist:
|
|
try:
|
|
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
|
|
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
|
|
path = os.path.join(self._persist_dir, fname)
|
|
with open(path, "wb") as f:
|
|
f.write(raw_bytes)
|
|
except Exception:
|
|
# don't propagate persistence errors to caller
|
|
pass
|
|
|
|
def get_and_clear_raw_packet(self) -> Optional[tuple]:
|
|
with self._lock:
|
|
pkt = self._last_raw_packet
|
|
self._last_raw_packet = None
|
|
return pkt
|
|
|
|
def get_history(self):
|
|
with self._lock:
|
|
return list(self._history)
|
|
|
|
def clear_history(self):
|
|
with self._lock:
|
|
self._history.clear()
|
|
|
|
def set_history_size(self, n: int):
|
|
with self._lock:
|
|
try:
|
|
n = max(1, int(n))
|
|
except Exception:
|
|
return
|
|
self._history_size = n
|
|
new_deque = collections.deque(self._history, maxlen=self._history_size)
|
|
self._history = new_deque
|
|
|
|
def set_persist(self, enabled: bool):
|
|
with self._lock:
|
|
self._persist = bool(enabled)
|