S1005403_RisCC/target_simulator/gui/payload_router.py

461 lines
18 KiB
Python

# target_simulator/gui/payload_router.py
"""
Payload router for buffering SFP payloads for the GUI.
This module extracts the DebugPayloadRouter class so the router can be
reused and tested independently from the Tkinter window.
"""
import threading
import statistics
import collections
import datetime
import os
import logging
import math
import json
import ctypes
import time
from queue import Queue, Full
from typing import Dict, Optional, Any, List, Callable, Tuple
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.core.models import Target
from target_simulator.utils.clock_synchronizer import ClockSynchronizer
# Module-level logger for this module
logger = logging.getLogger(__name__)
PayloadHandler = Callable[[bytearray], None]
TargetListListener = Callable[[List[Target]], None]
# --- Constants ---
M_TO_FT = 3.28084
class DebugPayloadRouter:
"""
A unified router that handles payloads for the entire application.
It updates the SimulationStateHub and broadcasts processed data to
registered listeners. This class is thread-safe.
"""
def __init__(
self,
simulation_hub: Optional[SimulationStateHub] = None,
):
self.active_archive = None
self._log_prefix = "[DebugPayloadRouter]"
self._lock = threading.Lock()
self._latest_payloads: Dict[str, Any] = {}
self._last_raw_packet: Optional[tuple] = None
self._history_size = 20
self._history = collections.deque(maxlen=self._history_size)
self._persist = False
# Recent latency samples (seconds) computed from clock synchronizer
self._latency_samples = collections.deque(maxlen=1000)
self._hub = simulation_hub
# Timestamp for ownship position integration
self._last_ownship_update_time: Optional[float] = None
# Listeners for real-time target data broadcasts
self._ris_target_listeners: List[TargetListListener] = []
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
self._persist_dir = os.path.join(project_root, "Temp")
try:
os.makedirs(self._persist_dir, exist_ok=True)
except Exception:
pass
self._handlers: Dict[int, PayloadHandler] = {
ord("M"): lambda p: self._update_last_payload("MFD", p),
ord("S"): lambda p: self._update_last_payload("SAR", p),
ord("B"): lambda p: self._update_last_payload("BIN", p),
ord("J"): lambda p: self._update_last_payload("JSON", p),
ord("R"): self._handle_ris_status,
ord("r"): self._handle_ris_status,
}
logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).")
self._logger = logger
# Optional clock synchronizer for latency estimation. ClockSynchronizer
# requires numpy; if unavailable we simply leave latency estimates at 0.
try:
self._clock_sync = ClockSynchronizer()
except Exception:
self._clock_sync = None
def set_archive(self, archive):
"""Sets the current archive session for recording."""
with self._lock:
self.active_archive = archive
def add_ris_target_listener(self, listener: TargetListListener):
"""Registers a callback function to receive updates for real targets."""
with self._lock:
if listener not in self._ris_target_listeners:
self._ris_target_listeners.append(listener)
self._logger.info(f"RIS target listener added: {listener}")
def remove_ris_target_listener(self, listener: TargetListListener):
"""Unregisters a callback function."""
with self._lock:
try:
self._ris_target_listeners.remove(listener)
self._logger.info(f"RIS target listener removed: {listener}")
except ValueError:
pass
def get_handlers(self) -> Dict[int, PayloadHandler]:
return self._handlers
def _update_last_payload(self, flow_id: str, payload: Any):
with self._lock:
self._latest_payloads[flow_id] = payload
def _parse_ris_payload_to_targets(
self, payload: bytearray
) -> Tuple[List[Target], List[int]]:
"""
Parse RIS payload and return a tuple:
- list of Target objects for entries with flags != 0 (active targets)
- list of integer target IDs present with flags == 0 (inactive)
"""
targets: List[Target] = []
inactive_ids: List[int] = []
try:
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
if ris_target.flags != 0:
target = Target(
target_id=i, trajectory=[], active=True, traceable=True
)
# Server's y-axis is East (our x), x-axis is North (our y)
pos_x_ft = float(ris_target.y) * M_TO_FT
pos_y_ft = float(ris_target.x) * M_TO_FT
pos_z_ft = float(ris_target.z) * M_TO_FT
setattr(target, "_pos_x_ft", pos_x_ft)
setattr(target, "_pos_y_ft", pos_y_ft)
setattr(target, "_pos_z_ft", pos_z_ft)
target._update_current_polar_coords()
try:
raw_h = float(ris_target.heading)
if abs(raw_h) <= (2 * math.pi * 1.1):
hdg_deg = math.degrees(raw_h)
unit = "rad"
else:
hdg_deg = raw_h
unit = "deg"
target.current_heading_deg = hdg_deg % 360
setattr(target, "_raw_heading", raw_h)
except (ValueError, TypeError):
target.current_heading_deg = 0.0
targets.append(target)
else:
inactive_ids.append(int(i))
except Exception:
self._logger.exception(
f"{self._log_prefix} Failed to parse RIS payload into Target objects."
)
return targets, inactive_ids
def _handle_ris_status(self, payload: bytearray):
reception_timestamp = time.monotonic()
parsed_payload = None
try:
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
except (ValueError, TypeError):
self._logger.error("Failed to parse SfpRisStatusPayload from buffer.")
return
# --- Update Ownship State ---
if self._hub:
try:
sc = parsed_payload.scenario
delta_t = 0.0
if self._last_ownship_update_time is not None:
delta_t = reception_timestamp - self._last_ownship_update_time
self._last_ownship_update_time = reception_timestamp
# Get previous ownship state to integrate position
old_state = self._hub.get_ownship_state()
old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0))
# Server's vy is East (our x), vx is North (our y)
ownship_vx_fps = float(sc.vy) * M_TO_FT
ownship_vy_fps = float(sc.vx) * M_TO_FT
# Integrate position
new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t
new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t
ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360
ownship_state = {
"timestamp": reception_timestamp,
"position_xy_ft": (new_pos_x_ft, new_pos_y_ft),
"altitude_ft": float(sc.baro_altitude) * M_TO_FT,
"velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps),
"heading_deg": ownship_heading_deg,
"latitude": float(sc.latitude),
"longitude": float(sc.longitude),
}
self._hub.set_ownship_state(ownship_state)
# Store ownship state in archive if available
with self._lock:
archive = self.active_archive
if archive and hasattr(archive, "add_ownship_state"):
archive.add_ownship_state(ownship_state)
except Exception:
self._logger.exception("Failed to update ownship state.")
# --- Feed clock synchronizer with server timetag (if available) ---
try:
if self._clock_sync is not None and parsed_payload is not None:
try:
server_timetag = int(parsed_payload.scenario.timetag)
# Add a sample: raw server timetag + local reception time
self._clock_sync.add_sample(server_timetag, reception_timestamp)
# Compute a latency sample if model can estimate client time
try:
est_gen = self._clock_sync.to_client_time(server_timetag)
latency = reception_timestamp - est_gen
# store only non-negative latencies
if latency >= 0:
with self._lock:
self._latency_samples.append(latency)
except Exception:
pass
except Exception:
pass
except Exception:
pass
# --- Update Target States ---
real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload)
if self._hub:
try:
if hasattr(self._hub, "add_real_packet"):
self._hub.add_real_packet(reception_timestamp)
for tid in inactive_ids or []:
if hasattr(self._hub, "clear_real_target_data"):
self._hub.clear_real_target_data(tid)
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
self._hub.add_real_state(
target_id=target.target_id,
timestamp=reception_timestamp,
state=state_tuple,
)
if hasattr(self._hub, "set_real_heading"):
raw_val = getattr(target, "_raw_heading", None)
self._hub.set_real_heading(
target.target_id,
getattr(target, "current_heading_deg", 0.0),
raw_value=raw_val,
)
except Exception:
self._logger.exception("Failed to process RIS targets for Hub.")
with self._lock:
archive = self.active_archive
if archive:
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
archive.add_real_state(
target_id=target.target_id,
timestamp=reception_timestamp,
state=state_tuple,
)
# --- BROADCAST to all registered listeners ---
with self._lock:
for listener in self._ris_target_listeners:
try:
listener(real_targets)
except Exception:
self._logger.exception(f"Error in RIS target listener: {listener}")
# --- Update Debug Views (unchanged) ---
self._update_debug_views(parsed_payload)
def _update_debug_views(self, parsed_payload: SfpRisStatusPayload):
"""Helper to populate debug views from a parsed payload."""
try:
sc = parsed_payload.scenario
lines = ["RIS Status Payload:\n", "Scenario:"]
# ... text generation logic ...
text_out = "\n".join(lines)
self._update_last_payload(
"RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8"))
)
# ... JSON generation logic ...
def _convert_ctypes(value):
if hasattr(value, "_length_"):
return list(value)
if isinstance(value, ctypes._SimpleCData):
return value.value
return value
scenario_dict = {
f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_
}
targets_list = [
{f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_}
for t in parsed_payload.tgt.tgt
]
struct = {"scenario": scenario_dict, "targets": targets_list}
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
# --- Propagate antenna azimuth to hub ---
if self._hub:
plat_az_rad = scenario_dict.get(
"ant_nav_az", scenario_dict.get("platform_azimuth")
)
if plat_az_rad is not None:
az_deg = math.degrees(float(plat_az_rad))
self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic())
except Exception:
self._logger.exception("Failed to generate text/JSON for RIS debug view.")
def get_and_clear_latest_payloads(self) -> Dict[str, Any]:
with self._lock:
new_payloads = self._latest_payloads
self._latest_payloads = {}
return new_payloads
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
with self._lock:
self._last_raw_packet = (raw_bytes, addr)
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}
try:
hdr = SFPHeader.from_buffer_copy(raw_bytes)
entry["flow"] = int(hdr.SFP_FLOW)
entry["tid"] = int(hdr.SFP_TID)
flow_map = {
ord("M"): "MFD",
ord("S"): "SAR",
ord("B"): "BIN",
ord("J"): "JSON",
ord("R"): "RIS",
ord("r"): "ris",
}
entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"]))
except Exception:
pass
self._history.append(entry)
if self._persist:
try:
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
path = os.path.join(self._persist_dir, fname)
with open(path, "wb") as f:
f.write(raw_bytes)
except Exception:
pass
def get_and_clear_raw_packet(self) -> Optional[tuple]:
with self._lock:
pkt = self._last_raw_packet
self._last_raw_packet = None
return pkt
# --- Latency estimation API ---
def get_estimated_latency_s(self) -> float:
"""Return estimated one-way latency in seconds, or 0.0 if unknown."""
try:
if self._clock_sync is not None:
return float(self._clock_sync.get_average_latency_s())
except Exception:
pass
return 0.0
def get_latency_samples(self, limit: Optional[int] = None) -> List[float]:
"""Return recent latency samples in seconds (most recent last).
Args:
limit: maximum number of samples to return (None = all available)
"""
with self._lock:
samples = list(self._latency_samples)
if limit is not None and limit > 0:
return samples[-limit:]
return samples
def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]:
"""Compute basic statistics over recent latency samples.
Returns a dict with mean_ms, std_ms, var_ms, min_ms, max_ms, count.
"""
with self._lock:
samples = list(self._latency_samples)
if not samples:
return {
"mean_ms": None,
"std_ms": None,
"var_ms": None,
"min_ms": None,
"max_ms": None,
"count": 0,
}
if sample_limit and sample_limit > 0:
samples = samples[-sample_limit:]
# Work in milliseconds for reporting
ms = [s * 1000.0 for s in samples]
mean = statistics.mean(ms)
var = statistics.pvariance(ms) if len(ms) > 1 else 0.0
std = statistics.pstdev(ms) if len(ms) > 1 else 0.0
return {
"mean_ms": round(mean, 3),
"std_ms": round(std, 3),
"var_ms": round(var, 3),
"min_ms": round(min(ms), 3),
"max_ms": round(max(ms), 3),
"count": len(ms),
}
def get_history(self):
with self._lock:
return list(self._history)
def clear_history(self):
with self._lock:
self._history.clear()
def set_history_size(self, n: int):
with self._lock:
n = max(1, int(n))
self._history_size = n
new_deque = collections.deque(self._history, maxlen=self._history_size)
self._history = new_deque
def set_persist(self, enabled: bool):
with self._lock:
self._persist = bool(enabled)