S1005403_RisCC/target_simulator/gui/payload_router.py

412 lines
17 KiB
Python

# target_simulator/gui/payload_router.py
"""Payload router for buffering SFP payloads for the GUI.
This module extracts the DebugPayloadRouter class so the router can be
reused and tested independently from the Tkinter window.
"""
import threading
import collections
import datetime
import os
import logging
import math
import json
import ctypes
import time
from queue import Queue, Full
from typing import Dict, Optional, Any, List, Callable, Tuple
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.core.models import Target
# Module-level logger for this module
logger = logging.getLogger(__name__)
PayloadHandler = Callable[[bytearray], None]
TargetListListener = Callable[[List[Target]], None]
class DebugPayloadRouter:
"""
A unified router that handles payloads for the entire application.
It updates the SimulationStateHub and broadcasts processed data to
registered listeners. This class is thread-safe.
"""
def __init__(
self,
simulation_hub: Optional[SimulationStateHub] = None,
):
self.active_archive = None
self._log_prefix = "[DebugPayloadRouter]"
self._lock = threading.Lock()
self._latest_payloads: Dict[str, Any] = {}
self._last_raw_packet: Optional[tuple] = None
self._history_size = 20
self._history = collections.deque(maxlen=self._history_size)
self._persist = False
self._hub = simulation_hub
# Listeners for real-time target data broadcasts
self._ris_target_listeners: List[TargetListListener] = []
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
self._persist_dir = os.path.join(project_root, "Temp")
try:
os.makedirs(self._persist_dir, exist_ok=True)
except Exception:
pass
self._handlers: Dict[int, PayloadHandler] = {
ord("M"): lambda p: self._update_last_payload("MFD", p),
ord("S"): lambda p: self._update_last_payload("SAR", p),
ord("B"): lambda p: self._update_last_payload("BIN", p),
ord("J"): lambda p: self._update_last_payload("JSON", p),
ord("R"): self._handle_ris_status,
ord("r"): self._handle_ris_status,
}
logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).")
self._logger = logger
def set_archive(self, archive):
"""Imposta la sessione di archivio corrente per la registrazione."""
with self._lock:
self.active_archive = archive
def add_ris_target_listener(self, listener: TargetListListener):
"""Registers a callback function to receive updates for real targets."""
with self._lock:
if listener not in self._ris_target_listeners:
self._ris_target_listeners.append(listener)
self._logger.info(f"RIS target listener added: {listener}")
def remove_ris_target_listener(self, listener: TargetListListener):
"""Unregisters a callback function."""
with self._lock:
try:
self._ris_target_listeners.remove(listener)
self._logger.info(f"RIS target listener removed: {listener}")
except ValueError:
pass
def get_handlers(self) -> Dict[int, PayloadHandler]:
return self._handlers
def _update_last_payload(self, flow_id: str, payload: Any):
with self._lock:
self._latest_payloads[flow_id] = payload
def _parse_ris_payload_to_targets(
self, payload: bytearray
) -> Tuple[List[Target], List[int]]:
"""
Parse RIS payload and return a tuple:
- list of Target objects for entries with flags != 0 (active targets)
- list of integer target IDs present with flags == 0 (inactive)
"""
targets: List[Target] = []
inactive_ids: List[int] = []
try:
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
if ris_target.flags != 0:
target = Target(
target_id=i, trajectory=[], active=True, traceable=True
)
M_TO_FT = 3.280839895
pos_x_ft = float(ris_target.y) * M_TO_FT
pos_y_ft = float(ris_target.x) * M_TO_FT
pos_z_ft = float(ris_target.z) * M_TO_FT
setattr(target, "_pos_x_ft", pos_x_ft)
setattr(target, "_pos_y_ft", pos_y_ft)
setattr(target, "_pos_z_ft", pos_z_ft)
target._update_current_polar_coords()
try:
raw_h = float(ris_target.heading)
# Server should send heading in radians; but be tolerant:
# if the magnitude looks like radians (<= ~2*pi) convert to degrees,
# otherwise assume it's already degrees.
if abs(raw_h) <= (2 * math.pi * 1.1):
hdg_deg = math.degrees(raw_h)
unit = "rad"
else:
hdg_deg = raw_h
unit = "deg"
target.current_heading_deg = hdg_deg % 360
# Store the raw value on the Target for later correlation
try:
setattr(target, "_raw_heading", raw_h)
except Exception:
pass
self._logger.debug(
f"Parsed RIS heading for target {i}: raw={raw_h} assumed={unit} hdg_deg={target.current_heading_deg:.6f}"
)
except (ValueError, TypeError):
target.current_heading_deg = 0.0
targets.append(target)
else:
try:
inactive_ids.append(int(i))
except Exception:
pass
except Exception:
self._logger.exception(
f"{self._log_prefix} Failed to parse RIS payload into Target objects."
)
return targets, inactive_ids
def _handle_ris_status(self, payload: bytearray):
real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload)
if self._hub:
try:
# Use the client's monotonic clock as the single source of truth for timestamps
# to ensure simulated and real data can be correlated.
reception_timestamp = time.monotonic()
# Record a single packet-level arrival timestamp so callers can
# measure packets/sec (instead of per-target events/sec).
try:
if hasattr(self._hub, "add_real_packet"):
self._hub.add_real_packet(reception_timestamp)
except Exception:
# Non-fatal: continue even if packet recording fails
pass
# If payload included inactive targets (flags==0), clear their stored
# real data so they disappear from the PPI immediately.
try:
for tid in inactive_ids or []:
if hasattr(self._hub, "clear_real_target_data"):
self._hub.clear_real_target_data(tid)
except Exception:
self._logger.debug(
"Failed to clear inactive target data in hub", exc_info=True
)
# Add real states for active targets
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
self._hub.add_real_state(
target_id=target.target_id,
timestamp=reception_timestamp,
state=state_tuple,
)
# Propagate heading information (if available) into the hub
try:
for target in real_targets:
if hasattr(self._hub, "set_real_heading"):
raw_val = getattr(target, "_raw_heading", None)
self._hub.set_real_heading(
target.target_id,
getattr(target, "current_heading_deg", 0.0),
raw_value=raw_val,
)
except Exception:
self._logger.debug(
"Failed to propagate heading to hub", exc_info=True
)
# if self._update_queue:
# try:
# self._update_queue.put_nowait([])
# except Full:
# self._logger.warning(
# f"{self._log_prefix} GUI update queue is full; dropped notification."
# )
except Exception:
self._logger.exception(
"DebugPayloadRouter: Failed to process RIS for Hub."
)
with self._lock:
archive = self.active_archive
if archive:
reception_timestamp = time.monotonic()
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
archive.add_real_state(
target_id=target.target_id,
timestamp=reception_timestamp,
state=state_tuple,
)
# --- BROADCAST to all registered listeners ---
with self._lock:
for listener in self._ris_target_listeners:
try:
listener(real_targets)
except Exception:
self._logger.exception(f"Error in RIS target listener: {listener}")
# --- Buffer other data for debug tabs ---
try:
if len(payload) >= SfpRisStatusPayload.size():
parsed = SfpRisStatusPayload.from_buffer_copy(payload)
sc = parsed.scenario
# ... (Text generation logic remains unchanged) ...
lines = ["RIS Status Payload:\n", "Scenario:"]
lines.append(f" timetag : {sc.timetag}") # ... etc.
text_out = "\n".join(lines)
# Keep backward compatibility: store textual summary under the legacy key
self._update_last_payload(
"RIS_STATUS", bytearray(text_out.encode("utf-8"))
)
# Also provide explicitly-named text and JSON variants for newer UI consumers
self._update_last_payload(
"RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8"))
)
def _convert_ctypes(value):
if hasattr(value, "_length_"):
return list(value)
if isinstance(value, ctypes._SimpleCData):
return value.value
return value
scenario_dict = {
f[0]: _convert_ctypes(getattr(parsed.scenario, f[0]))
for f in parsed.scenario._fields_
}
targets_list = [
{f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_}
for t in parsed.tgt.tgt
]
struct = {"scenario": scenario_dict, "targets": targets_list}
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
# Propagate antenna azimuth into the hub so the GUI can render
# the antenna orientation. Prefer the RIS field `ant_nav_az` (this
# is the antenna navigation azimuth). For backward compatibility
# fall back to `platform_azimuth` if `ant_nav_az` is not present.
try:
plat = None
if "ant_nav_az" in scenario_dict:
plat = scenario_dict.get("ant_nav_az")
elif "platform_azimuth" in scenario_dict:
plat = scenario_dict.get("platform_azimuth")
if (
plat is not None
and self._hub
and hasattr(self._hub, "set_platform_azimuth")
):
try:
val = float(plat)
# If the value looks like radians (<= ~2*pi) convert to degrees
if abs(val) <= (2 * math.pi * 1.1):
deg = math.degrees(val)
else:
deg = val
recv_ts = (
reception_timestamp
if "reception_timestamp" in locals()
else time.monotonic()
)
try:
# New API: set_antenna_azimuth
if hasattr(self._hub, "set_antenna_azimuth"):
self._hub.set_antenna_azimuth(
deg, timestamp=recv_ts
)
else:
# Fallback to legacy name if present
self._hub.set_platform_azimuth(
deg, timestamp=recv_ts
)
except Exception:
self._logger.debug(
"Failed to set antenna/platform azimuth on hub",
exc_info=True,
)
except Exception:
pass
except Exception:
self._logger.debug(
"Error while extracting antenna azimuth from RIS payload",
exc_info=True,
)
except Exception:
self._logger.exception("Failed to generate text/JSON for RIS debug view.")
def get_and_clear_latest_payloads(self) -> Dict[str, Any]:
with self._lock:
new_payloads = self._latest_payloads
self._latest_payloads = {}
return new_payloads
# ... (il resto del file rimane invariato) ...
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
with self._lock:
self._last_raw_packet = (raw_bytes, addr)
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}
try:
hdr = SFPHeader.from_buffer_copy(raw_bytes)
entry["flow"] = int(hdr.SFP_FLOW)
entry["tid"] = int(hdr.SFP_TID)
flow_map = {
ord("M"): "MFD",
ord("S"): "SAR",
ord("B"): "BIN",
ord("J"): "JSON",
ord("R"): "RIS",
ord("r"): "ris",
}
entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"]))
except Exception:
pass
self._history.append(entry)
if self._persist:
try:
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
path = os.path.join(self._persist_dir, fname)
with open(path, "wb") as f:
f.write(raw_bytes)
except Exception:
pass
def get_and_clear_raw_packet(self) -> Optional[tuple]:
with self._lock:
pkt = self._last_raw_packet
self._last_raw_packet = None
return pkt
def get_history(self):
with self._lock:
return list(self._history)
def clear_history(self):
with self._lock:
self._history.clear()
def set_history_size(self, n: int):
with self._lock:
try:
n = max(1, int(n))
except Exception:
return
self._history_size = n
new_deque = collections.deque(self._history, maxlen=self._history_size)
self._history = new_deque
def set_persist(self, enabled: bool):
with self._lock:
self._persist = bool(enabled)