342 lines
14 KiB
Python
342 lines
14 KiB
Python
# target_simulator/gui/payload_router.py
|
|
|
|
"""Payload router for buffering SFP payloads for the GUI.
|
|
|
|
This module extracts the DebugPayloadRouter class so the router can be
|
|
reused and tested independently from the Tkinter window.
|
|
"""
|
|
|
|
import threading
|
|
import collections
|
|
import datetime
|
|
import os
|
|
import logging
|
|
import math
|
|
import json
|
|
import ctypes
|
|
import time
|
|
from queue import Queue, Full
|
|
from typing import Dict, Optional, Any, List, Callable, Tuple
|
|
|
|
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
|
|
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
|
|
from target_simulator.core.models import Target
|
|
|
|
# Module-level logger for this module
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PayloadHandler = Callable[[bytearray], None]
|
|
TargetListListener = Callable[[List[Target]], None]
|
|
|
|
|
|
class DebugPayloadRouter:
|
|
"""
|
|
A unified router that handles payloads for the entire application.
|
|
It updates the SimulationStateHub and broadcasts processed data to
|
|
registered listeners. This class is thread-safe.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
simulation_hub: Optional[SimulationStateHub] = None,
|
|
update_queue: Optional[Queue] = None,
|
|
):
|
|
self._log_prefix = "[DebugPayloadRouter]"
|
|
self._lock = threading.Lock()
|
|
self._latest_payloads: Dict[str, Any] = {}
|
|
self._last_raw_packet: Optional[tuple] = None
|
|
self._history_size = 20
|
|
self._history = collections.deque(maxlen=self._history_size)
|
|
self._persist = False
|
|
|
|
self._hub = simulation_hub
|
|
self._update_queue = update_queue
|
|
|
|
# Listeners for real-time target data broadcasts
|
|
self._ris_target_listeners: List[TargetListListener] = []
|
|
|
|
project_root = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
)
|
|
self._persist_dir = os.path.join(project_root, "Temp")
|
|
try:
|
|
os.makedirs(self._persist_dir, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
self._handlers: Dict[int, PayloadHandler] = {
|
|
ord("M"): lambda p: self._update_last_payload("MFD", p),
|
|
ord("S"): lambda p: self._update_last_payload("SAR", p),
|
|
ord("B"): lambda p: self._update_last_payload("BIN", p),
|
|
ord("J"): lambda p: self._update_last_payload("JSON", p),
|
|
ord("R"): self._handle_ris_status,
|
|
ord("r"): self._handle_ris_status,
|
|
}
|
|
logger.info(
|
|
f"{self._log_prefix} Initialized (Hub: {self._hub is not None}, Queue: {self._update_queue is not None})."
|
|
)
|
|
self._logger = logger
|
|
|
|
def add_ris_target_listener(self, listener: TargetListListener):
|
|
"""Registers a callback function to receive updates for real targets."""
|
|
with self._lock:
|
|
if listener not in self._ris_target_listeners:
|
|
self._ris_target_listeners.append(listener)
|
|
self._logger.info(f"RIS target listener added: {listener}")
|
|
|
|
def remove_ris_target_listener(self, listener: TargetListListener):
|
|
"""Unregisters a callback function."""
|
|
with self._lock:
|
|
try:
|
|
self._ris_target_listeners.remove(listener)
|
|
self._logger.info(f"RIS target listener removed: {listener}")
|
|
except ValueError:
|
|
pass
|
|
|
|
def get_handlers(self) -> Dict[int, PayloadHandler]:
|
|
return self._handlers
|
|
|
|
def _update_last_payload(self, flow_id: str, payload: Any):
|
|
with self._lock:
|
|
self._latest_payloads[flow_id] = payload
|
|
|
|
def _parse_ris_payload_to_targets(
|
|
self, payload: bytearray
|
|
) -> Tuple[List[Target], List[int]]:
|
|
"""
|
|
Parse RIS payload and return a tuple:
|
|
- list of Target objects for entries with flags != 0 (active targets)
|
|
- list of integer target IDs present with flags == 0 (inactive)
|
|
"""
|
|
targets: List[Target] = []
|
|
inactive_ids: List[int] = []
|
|
try:
|
|
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
|
|
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
|
|
if ris_target.flags != 0:
|
|
target = Target(
|
|
target_id=i, trajectory=[], active=True, traceable=True
|
|
)
|
|
M_TO_FT = 3.280839895
|
|
pos_x_ft = float(ris_target.y) * M_TO_FT
|
|
pos_y_ft = float(ris_target.x) * M_TO_FT
|
|
pos_z_ft = float(ris_target.z) * M_TO_FT
|
|
setattr(target, "_pos_x_ft", pos_x_ft)
|
|
setattr(target, "_pos_y_ft", pos_y_ft)
|
|
setattr(target, "_pos_z_ft", pos_z_ft)
|
|
target._update_current_polar_coords()
|
|
try:
|
|
raw_h = float(ris_target.heading)
|
|
# Server should send heading in radians; but be tolerant:
|
|
# if the magnitude looks like radians (<= ~2*pi) convert to degrees,
|
|
# otherwise assume it's already degrees.
|
|
if abs(raw_h) <= (2 * math.pi * 1.1):
|
|
hdg_deg = math.degrees(raw_h)
|
|
unit = "rad"
|
|
else:
|
|
hdg_deg = raw_h
|
|
unit = "deg"
|
|
target.current_heading_deg = hdg_deg % 360
|
|
# Store the raw value on the Target for later correlation
|
|
try:
|
|
setattr(target, "_raw_heading", raw_h)
|
|
except Exception:
|
|
pass
|
|
self._logger.debug(
|
|
f"Parsed RIS heading for target {i}: raw={raw_h} assumed={unit} hdg_deg={target.current_heading_deg:.6f}"
|
|
)
|
|
except (ValueError, TypeError):
|
|
target.current_heading_deg = 0.0
|
|
targets.append(target)
|
|
else:
|
|
try:
|
|
inactive_ids.append(int(i))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
self._logger.exception(
|
|
f"{self._log_prefix} Failed to parse RIS payload into Target objects."
|
|
)
|
|
return targets, inactive_ids
|
|
|
|
def _handle_ris_status(self, payload: bytearray):
|
|
real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload)
|
|
|
|
if self._hub:
|
|
try:
|
|
# Use the client's monotonic clock as the single source of truth for timestamps
|
|
# to ensure simulated and real data can be correlated.
|
|
reception_timestamp = time.monotonic()
|
|
|
|
# Record a single packet-level arrival timestamp so callers can
|
|
# measure packets/sec (instead of per-target events/sec).
|
|
try:
|
|
if hasattr(self._hub, "add_real_packet"):
|
|
self._hub.add_real_packet(reception_timestamp)
|
|
except Exception:
|
|
# Non-fatal: continue even if packet recording fails
|
|
pass
|
|
|
|
# If payload included inactive targets (flags==0), clear their stored
|
|
# real data so they disappear from the PPI immediately.
|
|
try:
|
|
for tid in inactive_ids or []:
|
|
if hasattr(self._hub, "clear_real_target_data"):
|
|
self._hub.clear_real_target_data(tid)
|
|
except Exception:
|
|
self._logger.debug(
|
|
"Failed to clear inactive target data in hub", exc_info=True
|
|
)
|
|
|
|
# Add real states for active targets
|
|
for target in real_targets:
|
|
state_tuple = (
|
|
getattr(target, "_pos_x_ft", 0.0),
|
|
getattr(target, "_pos_y_ft", 0.0),
|
|
getattr(target, "_pos_z_ft", 0.0),
|
|
)
|
|
self._hub.add_real_state(
|
|
target_id=target.target_id,
|
|
timestamp=reception_timestamp,
|
|
state=state_tuple,
|
|
)
|
|
|
|
# Propagate heading information (if available) into the hub
|
|
try:
|
|
for target in real_targets:
|
|
if hasattr(self._hub, "set_real_heading"):
|
|
raw_val = getattr(target, "_raw_heading", None)
|
|
self._hub.set_real_heading(
|
|
target.target_id,
|
|
getattr(target, "current_heading_deg", 0.0),
|
|
raw_value=raw_val,
|
|
)
|
|
except Exception:
|
|
self._logger.debug(
|
|
"Failed to propagate heading to hub", exc_info=True
|
|
)
|
|
|
|
if self._update_queue:
|
|
try:
|
|
self._update_queue.put_nowait([])
|
|
except Full:
|
|
self._logger.warning(
|
|
f"{self._log_prefix} GUI update queue is full; dropped notification."
|
|
)
|
|
except Exception:
|
|
self._logger.exception(
|
|
"DebugPayloadRouter: Failed to process RIS for Hub."
|
|
)
|
|
|
|
# --- BROADCAST to all registered listeners ---
|
|
with self._lock:
|
|
for listener in self._ris_target_listeners:
|
|
try:
|
|
listener(real_targets)
|
|
except Exception:
|
|
self._logger.exception(f"Error in RIS target listener: {listener}")
|
|
|
|
# --- Buffer other data for debug tabs ---
|
|
try:
|
|
if len(payload) >= SfpRisStatusPayload.size():
|
|
parsed = SfpRisStatusPayload.from_buffer_copy(payload)
|
|
sc = parsed.scenario
|
|
# ... (Text generation logic remains unchanged) ...
|
|
lines = ["RIS Status Payload:\n", "Scenario:"]
|
|
lines.append(f" timetag : {sc.timetag}") # ... etc.
|
|
text_out = "\n".join(lines)
|
|
# Keep backward compatibility: store textual summary under the legacy key
|
|
self._update_last_payload(
|
|
"RIS_STATUS", bytearray(text_out.encode("utf-8"))
|
|
)
|
|
# Also provide explicitly-named text and JSON variants for newer UI consumers
|
|
self._update_last_payload(
|
|
"RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8"))
|
|
)
|
|
|
|
def _convert_ctypes(value):
|
|
if hasattr(value, "_length_"):
|
|
return list(value)
|
|
if isinstance(value, ctypes._SimpleCData):
|
|
return value.value
|
|
return value
|
|
|
|
scenario_dict = {
|
|
f[0]: _convert_ctypes(getattr(parsed.scenario, f[0]))
|
|
for f in parsed.scenario._fields_
|
|
}
|
|
targets_list = [
|
|
{f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_}
|
|
for t in parsed.tgt.tgt
|
|
]
|
|
struct = {"scenario": scenario_dict, "targets": targets_list}
|
|
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
|
|
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
|
|
except Exception:
|
|
self._logger.exception("Failed to generate text/JSON for RIS debug view.")
|
|
|
|
def get_and_clear_latest_payloads(self) -> Dict[str, Any]:
|
|
with self._lock:
|
|
new_payloads = self._latest_payloads
|
|
self._latest_payloads = {}
|
|
return new_payloads
|
|
|
|
# ... (il resto del file rimane invariato) ...
|
|
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
with self._lock:
|
|
self._last_raw_packet = (raw_bytes, addr)
|
|
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}
|
|
try:
|
|
hdr = SFPHeader.from_buffer_copy(raw_bytes)
|
|
entry["flow"] = int(hdr.SFP_FLOW)
|
|
entry["tid"] = int(hdr.SFP_TID)
|
|
flow_map = {
|
|
ord("M"): "MFD",
|
|
ord("S"): "SAR",
|
|
ord("B"): "BIN",
|
|
ord("J"): "JSON",
|
|
ord("R"): "RIS",
|
|
ord("r"): "ris",
|
|
}
|
|
entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"]))
|
|
except Exception:
|
|
pass
|
|
self._history.append(entry)
|
|
if self._persist:
|
|
try:
|
|
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
|
|
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
|
|
path = os.path.join(self._persist_dir, fname)
|
|
with open(path, "wb") as f:
|
|
f.write(raw_bytes)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_and_clear_raw_packet(self) -> Optional[tuple]:
|
|
with self._lock:
|
|
pkt = self._last_raw_packet
|
|
self._last_raw_packet = None
|
|
return pkt
|
|
|
|
def get_history(self):
|
|
with self._lock:
|
|
return list(self._history)
|
|
|
|
def clear_history(self):
|
|
with self._lock:
|
|
self._history.clear()
|
|
|
|
def set_history_size(self, n: int):
|
|
with self._lock:
|
|
try:
|
|
n = max(1, int(n))
|
|
except Exception:
|
|
return
|
|
self._history_size = n
|
|
new_deque = collections.deque(self._history, maxlen=self._history_size)
|
|
self._history = new_deque
|
|
|
|
def set_persist(self, enabled: bool):
|
|
with self._lock:
|
|
self._persist = bool(enabled)
|