603 lines
24 KiB
Python
603 lines
24 KiB
Python
# target_simulator/gui/payload_router.py
|
|
"""
|
|
Payload router for buffering SFP payloads for the GUI.
|
|
|
|
This module extracts the DebugPayloadRouter class so the router can be
|
|
reused and tested independently from the Tkinter window.
|
|
"""
|
|
|
|
import threading
|
|
import statistics
|
|
import collections
|
|
import datetime
|
|
import os
|
|
import logging
|
|
import math
|
|
import json
|
|
import ctypes
|
|
import time
|
|
from queue import Queue, Full
|
|
from typing import Dict, Optional, Any, List, Callable, Tuple
|
|
|
|
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
|
|
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
|
|
from target_simulator.core.models import Target
|
|
from target_simulator.utils.clock_synchronizer import ClockSynchronizer
|
|
|
|
# Module-level logger for this module
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PayloadHandler = Callable[[bytearray], None]
|
|
TargetListListener = Callable[[List[Target]], None]
|
|
|
|
# --- Constants ---
|
|
M_TO_FT = 3.28084
|
|
|
|
|
|
class DebugPayloadRouter:
|
|
"""
|
|
A unified router that handles payloads for the entire application.
|
|
It updates the SimulationStateHub and broadcasts processed data to
|
|
registered listeners. This class is thread-safe.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
simulation_hub: Optional[SimulationStateHub] = None,
|
|
):
|
|
self.active_archive = None
|
|
self._log_prefix = "[DebugPayloadRouter]"
|
|
self._lock = threading.Lock()
|
|
self._latest_payloads: Dict[str, Any] = {}
|
|
self._last_raw_packet: Optional[tuple] = None
|
|
self._history_size = 20
|
|
self._history = collections.deque(maxlen=self._history_size)
|
|
self._persist = False
|
|
|
|
# Latency samples: (timestamp, latency_s) tuples computed from clock synchronizer
|
|
# Large buffer to capture entire simulation (10000 samples = ~2.7 hours at 1Hz)
|
|
self._latency_samples = collections.deque(maxlen=10000)
|
|
|
|
self._hub = simulation_hub
|
|
|
|
# Timestamp for ownship position integration
|
|
self._last_ownship_update_time: Optional[float] = None
|
|
|
|
# Listeners for real-time target data broadcasts
|
|
self._ris_target_listeners: List[TargetListListener] = []
|
|
|
|
project_root = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
)
|
|
self._persist_dir = os.path.join(project_root, "Temp")
|
|
try:
|
|
os.makedirs(self._persist_dir, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
self._handlers: Dict[int, PayloadHandler] = {
|
|
ord("M"): lambda p: self._update_last_payload("MFD", p),
|
|
ord("S"): lambda p: self._update_last_payload("SAR", p),
|
|
ord("B"): lambda p: self._update_last_payload("BIN", p),
|
|
# JSON payloads may contain antenna nav fields (ant_nav_az / ant_nav_el).
|
|
# Route JSON to a dedicated handler that will both store the raw
|
|
# payload and, when possible, extract antenna az/el to update the hub
|
|
# so the PPI antenna sweep can animate even when using JSON protocol.
|
|
ord("J"): self._handle_json_payload,
|
|
ord("R"): self._handle_ris_status,
|
|
ord("r"): self._handle_ris_status,
|
|
}
|
|
logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).")
|
|
self._logger = logger
|
|
# Optional clock synchronizer for latency estimation. ClockSynchronizer
|
|
# requires numpy; if unavailable we simply leave latency estimates at 0.
|
|
try:
|
|
self._clock_sync = ClockSynchronizer()
|
|
except Exception:
|
|
self._clock_sync = None
|
|
|
|
def set_archive(self, archive):
|
|
"""Set the current archive object used to persist incoming states.
|
|
|
|
Args:
|
|
archive: An object implementing the small archive API (e.g. add_real_state,
|
|
add_ownship_state). The router will call archive methods while
|
|
processing incoming payloads.
|
|
"""
|
|
with self._lock:
|
|
self.active_archive = archive
|
|
# Clear latency buffer when starting a new simulation
|
|
if archive is not None:
|
|
self._latency_samples.clear()
|
|
self._logger.debug("Latency buffer cleared for new simulation")
|
|
|
|
def add_ris_target_listener(self, listener: TargetListListener):
|
|
"""Register a listener that will be called with lists of real Targets.
|
|
|
|
The listener will be invoked from the router's internal thread context.
|
|
"""
|
|
with self._lock:
|
|
if listener not in self._ris_target_listeners:
|
|
self._ris_target_listeners.append(listener)
|
|
self._logger.info(f"RIS target listener added: {listener}")
|
|
|
|
def remove_ris_target_listener(self, listener: TargetListListener):
|
|
"""Unregister a previously registered RIS target listener."""
|
|
with self._lock:
|
|
try:
|
|
self._ris_target_listeners.remove(listener)
|
|
self._logger.info(f"RIS target listener removed: {listener}")
|
|
except ValueError:
|
|
pass
|
|
|
|
def get_handlers(self) -> Dict[int, PayloadHandler]:
|
|
"""Return the mapping of SFP flow byte values to payload handlers."""
|
|
return self._handlers
|
|
|
|
def _update_last_payload(self, flow_id: str, payload: Any):
|
|
"""Store the last payload for a logical flow id in a thread-safe way."""
|
|
with self._lock:
|
|
self._latest_payloads[flow_id] = payload
|
|
|
|
def _parse_ris_payload_to_targets(
|
|
self, payload: bytearray
|
|
) -> Tuple[List[Target], List[int]]:
|
|
"""
|
|
Parse RIS payload and return a tuple:
|
|
- list of Target objects for entries with flags != 0 (active targets)
|
|
- list of integer target IDs present with flags == 0 (inactive)
|
|
"""
|
|
targets: List[Target] = []
|
|
inactive_ids: List[int] = []
|
|
try:
|
|
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
|
|
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
|
|
if ris_target.flags != 0:
|
|
target = Target(
|
|
target_id=i, trajectory=[], active=True, traceable=True
|
|
)
|
|
# Server's y-axis is East (our x), x-axis is North (our y)
|
|
pos_x_ft = float(ris_target.y) * M_TO_FT
|
|
pos_y_ft = float(ris_target.x) * M_TO_FT
|
|
pos_z_ft = float(ris_target.z) * M_TO_FT
|
|
setattr(target, "_pos_x_ft", pos_x_ft)
|
|
setattr(target, "_pos_y_ft", pos_y_ft)
|
|
setattr(target, "_pos_z_ft", pos_z_ft)
|
|
target._update_current_polar_coords()
|
|
try:
|
|
raw_h = float(ris_target.heading)
|
|
if abs(raw_h) <= (2 * math.pi * 1.1):
|
|
hdg_deg = math.degrees(raw_h)
|
|
unit = "rad"
|
|
else:
|
|
hdg_deg = raw_h
|
|
unit = "deg"
|
|
target.current_heading_deg = hdg_deg % 360
|
|
setattr(target, "_raw_heading", raw_h)
|
|
except (ValueError, TypeError):
|
|
target.current_heading_deg = 0.0
|
|
targets.append(target)
|
|
else:
|
|
inactive_ids.append(int(i))
|
|
except Exception:
|
|
self._logger.exception(
|
|
f"{self._log_prefix} Failed to parse RIS payload into Target objects."
|
|
)
|
|
return targets, inactive_ids
|
|
|
|
def _handle_ris_status(self, payload: bytearray):
|
|
"""
|
|
Parse and process an RIS status payload.
|
|
|
|
Ingressi: payload (bytearray) - raw RIS status packet
|
|
Uscite: None (side-effects: updates SimulationStateHub, archives, notifies listeners)
|
|
Commento: extracts ownship, targets, updates hub.set_antenna_azimuth when available.
|
|
"""
|
|
reception_timestamp = time.monotonic()
|
|
|
|
parsed_payload = None
|
|
try:
|
|
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
|
|
except (ValueError, TypeError):
|
|
self._logger.error("Failed to parse SfpRisStatusPayload from buffer.")
|
|
return
|
|
|
|
# --- Update Ownship State ---
|
|
if self._hub:
|
|
try:
|
|
sc = parsed_payload.scenario
|
|
|
|
delta_t = 0.0
|
|
if self._last_ownship_update_time is not None:
|
|
delta_t = reception_timestamp - self._last_ownship_update_time
|
|
self._last_ownship_update_time = reception_timestamp
|
|
|
|
# Get previous ownship state to integrate position
|
|
old_state = self._hub.get_ownship_state()
|
|
old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0))
|
|
|
|
# Server's vy is East (our x), vx is North (our y)
|
|
ownship_vx_fps = float(sc.vy) * M_TO_FT
|
|
ownship_vy_fps = float(sc.vx) * M_TO_FT
|
|
|
|
# Integrate position
|
|
new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t
|
|
new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t
|
|
|
|
ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360
|
|
|
|
ownship_state = {
|
|
"timestamp": reception_timestamp,
|
|
"position_xy_ft": (new_pos_x_ft, new_pos_y_ft),
|
|
"altitude_ft": float(sc.baro_altitude) * M_TO_FT,
|
|
"velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps),
|
|
"heading_deg": ownship_heading_deg,
|
|
"latitude": float(sc.latitude),
|
|
"longitude": float(sc.longitude),
|
|
}
|
|
self._hub.set_ownship_state(ownship_state)
|
|
|
|
# Store ownship state in archive if available
|
|
with self._lock:
|
|
archive = self.active_archive
|
|
if archive and hasattr(archive, "add_ownship_state"):
|
|
archive.add_ownship_state(ownship_state)
|
|
except Exception:
|
|
self._logger.exception("Failed to update ownship state.")
|
|
|
|
# --- Feed clock synchronizer with server timetag (if available) ---
|
|
try:
|
|
if self._clock_sync is not None and parsed_payload is not None:
|
|
try:
|
|
server_timetag = int(parsed_payload.scenario.timetag)
|
|
# Add a sample: raw server timetag + local reception time
|
|
self._clock_sync.add_sample(server_timetag, reception_timestamp)
|
|
# Compute a latency sample if model can estimate client time
|
|
try:
|
|
est_gen = self._clock_sync.to_client_time(server_timetag)
|
|
latency = reception_timestamp - est_gen
|
|
# Store latencies only during active simulation (when archive is set)
|
|
if latency >= 0 and self.active_archive is not None:
|
|
with self._lock:
|
|
self._latency_samples.append(
|
|
(reception_timestamp, latency)
|
|
)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
# --- Update Target States ---
|
|
real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload)
|
|
|
|
if self._hub:
|
|
try:
|
|
if hasattr(self._hub, "add_real_packet"):
|
|
self._hub.add_real_packet(reception_timestamp)
|
|
|
|
for tid in inactive_ids or []:
|
|
if hasattr(self._hub, "clear_real_target_data"):
|
|
self._hub.clear_real_target_data(tid)
|
|
|
|
for target in real_targets:
|
|
state_tuple = (
|
|
getattr(target, "_pos_x_ft", 0.0),
|
|
getattr(target, "_pos_y_ft", 0.0),
|
|
getattr(target, "_pos_z_ft", 0.0),
|
|
)
|
|
self._hub.add_real_state(
|
|
target_id=target.target_id,
|
|
timestamp=reception_timestamp,
|
|
state=state_tuple,
|
|
)
|
|
if hasattr(self._hub, "set_real_heading"):
|
|
raw_val = getattr(target, "_raw_heading", None)
|
|
self._hub.set_real_heading(
|
|
target.target_id,
|
|
getattr(target, "current_heading_deg", 0.0),
|
|
raw_value=raw_val,
|
|
)
|
|
except Exception:
|
|
self._logger.exception("Failed to process RIS targets for Hub.")
|
|
|
|
with self._lock:
|
|
archive = self.active_archive
|
|
if archive:
|
|
for target in real_targets:
|
|
state_tuple = (
|
|
getattr(target, "_pos_x_ft", 0.0),
|
|
getattr(target, "_pos_y_ft", 0.0),
|
|
getattr(target, "_pos_z_ft", 0.0),
|
|
)
|
|
archive.add_real_state(
|
|
target_id=target.target_id,
|
|
timestamp=reception_timestamp,
|
|
state=state_tuple,
|
|
)
|
|
|
|
# --- BROADCAST to all registered listeners ---
|
|
with self._lock:
|
|
for listener in self._ris_target_listeners:
|
|
try:
|
|
listener(real_targets)
|
|
except Exception:
|
|
self._logger.exception(f"Error in RIS target listener: {listener}")
|
|
|
|
# --- Update Debug Views (unchanged) ---
|
|
self._update_debug_views(parsed_payload)
|
|
|
|
def _update_debug_views(self, parsed_payload: SfpRisStatusPayload):
|
|
"""Helper to populate debug views from a parsed payload."""
|
|
try:
|
|
sc = parsed_payload.scenario
|
|
lines = ["RIS Status Payload:\n", "Scenario:"]
|
|
# ... text generation logic ...
|
|
text_out = "\n".join(lines)
|
|
self._update_last_payload(
|
|
"RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8"))
|
|
)
|
|
|
|
# ... JSON generation logic ...
|
|
def _convert_ctypes(value):
|
|
if hasattr(value, "_length_"):
|
|
return list(value)
|
|
if isinstance(value, ctypes._SimpleCData):
|
|
return value.value
|
|
return value
|
|
|
|
scenario_dict = {
|
|
f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_
|
|
}
|
|
targets_list = [
|
|
{f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_}
|
|
for t in parsed_payload.tgt.tgt
|
|
]
|
|
struct = {"scenario": scenario_dict, "targets": targets_list}
|
|
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
|
|
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
|
|
|
|
# --- Propagate antenna azimuth to hub ---
|
|
if self._hub:
|
|
# Get platform heading and relative antenna sweep
|
|
heading_rad = scenario_dict.get(
|
|
"true_heading", scenario_dict.get("platform_azimuth")
|
|
)
|
|
sweep_rad = scenario_dict.get("ant_nav_az")
|
|
|
|
total_az_rad = None
|
|
if heading_rad is not None:
|
|
total_az_rad = float(heading_rad)
|
|
if sweep_rad is not None:
|
|
total_az_rad += float(sweep_rad)
|
|
|
|
if total_az_rad is not None:
|
|
az_deg = math.degrees(total_az_rad)
|
|
self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic())
|
|
|
|
except Exception:
|
|
self._logger.exception("Failed to generate text/JSON for RIS debug view.")
|
|
|
|
def _handle_json_payload(self, payload: bytearray):
|
|
"""
|
|
Handle JSON payloads. Store raw JSON and, if present, extract
|
|
antenna navigation fields (ant_nav_az / ant_nav_el) and propagate
|
|
them to the SimulationStateHub so the PPI antenna can animate.
|
|
"""
|
|
try:
|
|
# Always store the raw payload for debug inspection
|
|
self._update_last_payload("JSON", payload)
|
|
|
|
if not self._hub:
|
|
return
|
|
|
|
# Try to decode and parse JSON
|
|
try:
|
|
text = payload.decode("utf-8")
|
|
obj = json.loads(text)
|
|
except Exception:
|
|
return
|
|
|
|
# Helper to find fields in varied JSON structures
|
|
def _find_val(key_candidates, dct):
|
|
if not isinstance(dct, dict):
|
|
return None
|
|
# Check top-level
|
|
for key in key_candidates:
|
|
if key in dct:
|
|
return dct[key]
|
|
# Check scenario sub-dict if present
|
|
sc = dct.get("scenario") or dct.get("sc")
|
|
if isinstance(sc, dict):
|
|
for key in key_candidates:
|
|
if key in sc:
|
|
return sc[key]
|
|
return None
|
|
|
|
# Find platform heading and relative antenna sweep using multiple potential keys
|
|
heading_val = _find_val(
|
|
["true_heading", "platform_azimuth", "heading"], obj
|
|
)
|
|
sweep_val = _find_val(
|
|
["ant_nav_az", "antenna_azimuth", "sweep_azimuth"], obj
|
|
)
|
|
|
|
total_az_val = None
|
|
if heading_val is not None:
|
|
try:
|
|
h_rad = float(heading_val)
|
|
# If heading is in degrees (> 2*pi), convert to radians
|
|
if abs(h_rad) > (2 * math.pi + 0.01):
|
|
h_rad = math.radians(h_rad)
|
|
|
|
total_az_rad = h_rad
|
|
if sweep_val is not None:
|
|
s_rad = float(sweep_val)
|
|
# If sweep is in degrees, convert
|
|
if abs(s_rad) > (2 * math.pi + 0.01):
|
|
s_rad = math.radians(s_rad)
|
|
total_az_rad += s_rad
|
|
|
|
az_deg = math.degrees(total_az_rad)
|
|
# LOGGARE IL SUCCESSO PER DEBUG
|
|
self._logger.debug(
|
|
f"Found azimuth info in JSON: heading={heading_val}, sweep={sweep_val} -> total_deg={az_deg}"
|
|
)
|
|
self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic())
|
|
except Exception as e:
|
|
self._logger.debug(f"Error processing azimuth values: {e}")
|
|
pass
|
|
else:
|
|
pass
|
|
# self._logger.debug("No heading info found in JSON payload")
|
|
|
|
# Optionally capture elevation for future UI use
|
|
# plat_el = _find_val(["ant_nav_el", "platform_elevation"], obj)
|
|
# if plat_el is not None:
|
|
# pass
|
|
|
|
except Exception as e:
|
|
self._logger.exception("Error handling JSON payload")
|
|
|
|
def get_and_clear_latest_payloads(self) -> Dict[str, Any]:
|
|
"""Atomically retrieve and clear the latest payloads cache.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A copy of the latest payloads mapping captured at call time.
|
|
"""
|
|
with self._lock:
|
|
new_payloads = self._latest_payloads
|
|
self._latest_payloads = {}
|
|
return new_payloads
|
|
|
|
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
"""Record the last raw packet received and optionally persist it.
|
|
|
|
Args:
|
|
raw_bytes (bytes): Raw packet contents.
|
|
addr (tuple): Source address tuple (ip, port).
|
|
"""
|
|
with self._lock:
|
|
self._last_raw_packet = (raw_bytes, addr)
|
|
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}
|
|
try:
|
|
hdr = SFPHeader.from_buffer_copy(raw_bytes)
|
|
entry["flow"] = int(hdr.SFP_FLOW)
|
|
entry["tid"] = int(hdr.SFP_TID)
|
|
flow_map = {
|
|
ord("M"): "MFD",
|
|
ord("S"): "SAR",
|
|
ord("B"): "BIN",
|
|
ord("J"): "JSON",
|
|
ord("R"): "RIS",
|
|
ord("r"): "ris",
|
|
}
|
|
entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"]))
|
|
except Exception:
|
|
pass
|
|
self._history.append(entry)
|
|
if self._persist:
|
|
try:
|
|
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
|
|
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
|
|
path = os.path.join(self._persist_dir, fname)
|
|
with open(path, "wb") as f:
|
|
f.write(raw_bytes)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_and_clear_raw_packet(self) -> Optional[tuple]:
|
|
"""Return the last raw packet (if any) and clear the cached value."""
|
|
with self._lock:
|
|
pkt = self._last_raw_packet
|
|
self._last_raw_packet = None
|
|
return pkt
|
|
|
|
# --- Latency estimation API ---
|
|
def get_estimated_latency_s(self) -> float:
|
|
"""Return estimated one-way latency in seconds, or 0.0 if unknown."""
|
|
try:
|
|
if self._clock_sync is not None:
|
|
return float(self._clock_sync.get_average_latency_s())
|
|
except Exception:
|
|
pass
|
|
return 0.0
|
|
|
|
def get_latency_samples(self, limit: Optional[int] = None) -> List[tuple]:
|
|
"""Return recent latency samples as (timestamp, latency_s) tuples (most recent last).
|
|
|
|
Args:
|
|
limit: maximum number of samples to return (None = all available)
|
|
|
|
Returns:
|
|
List of (timestamp, latency_s) tuples
|
|
"""
|
|
with self._lock:
|
|
samples = list(self._latency_samples)
|
|
if limit is not None and limit > 0:
|
|
return samples[-limit:]
|
|
return samples
|
|
|
|
def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]:
|
|
"""Compute basic statistics over recent latency samples.
|
|
|
|
Returns a dict with mean_ms, std_ms, var_ms, min_ms, max_ms, count.
|
|
"""
|
|
with self._lock:
|
|
samples = list(self._latency_samples)
|
|
|
|
if not samples:
|
|
return {
|
|
"mean_ms": None,
|
|
"std_ms": None,
|
|
"var_ms": None,
|
|
"min_ms": None,
|
|
"max_ms": None,
|
|
"count": 0,
|
|
}
|
|
|
|
if sample_limit and sample_limit > 0:
|
|
samples = samples[-sample_limit:]
|
|
|
|
# Extract latency values from (timestamp, latency) tuples and work in milliseconds
|
|
ms = [s[1] * 1000.0 for s in samples]
|
|
mean = statistics.mean(ms)
|
|
var = statistics.pvariance(ms) if len(ms) > 1 else 0.0
|
|
std = statistics.pstdev(ms) if len(ms) > 1 else 0.0
|
|
return {
|
|
"mean_ms": round(mean, 3),
|
|
"std_ms": round(std, 3),
|
|
"var_ms": round(var, 3),
|
|
"min_ms": round(min(ms), 3),
|
|
"max_ms": round(max(ms), 3),
|
|
"count": len(ms),
|
|
}
|
|
|
|
def get_history(self):
|
|
"""Return a shallow copy of the internal history deque as a list."""
|
|
with self._lock:
|
|
return list(self._history)
|
|
|
|
def clear_history(self):
|
|
"""Clear the in-memory history buffer used by debug views."""
|
|
with self._lock:
|
|
self._history.clear()
|
|
|
|
def set_history_size(self, n: int):
|
|
"""Adjust the maximum history size kept for debug playback.
|
|
|
|
Args:
|
|
n (int): New maximum number of entries to retain (minimum 1).
|
|
"""
|
|
with self._lock:
|
|
n = max(1, int(n))
|
|
self._history_size = n
|
|
new_deque = collections.deque(self._history, maxlen=self._history_size)
|
|
self._history = new_deque
|
|
|
|
def set_persist(self, enabled: bool):
|
|
"""Enable or disable persisting of raw packets to disk for debugging."""
|
|
with self._lock:
|
|
self._persist = bool(enabled)
|