S1005403_RisCC/target_simulator/gui/payload_router.py

548 lines
21 KiB
Python

# target_simulator/gui/payload_router.py
"""
Payload router for buffering SFP payloads for the GUI.
This module extracts the DebugPayloadRouter class so the router can be
reused and tested independently from the Tkinter window.
"""
import threading
import statistics
import collections
import datetime
import os
import logging
import math
import json
import ctypes
import time
from queue import Queue, Full
from typing import Dict, Optional, Any, List, Callable, Tuple
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.core.models import Target
from target_simulator.utils.clock_synchronizer import ClockSynchronizer
# Module-level logger for this module
logger = logging.getLogger(__name__)
PayloadHandler = Callable[[bytearray], None]
TargetListListener = Callable[[List[Target]], None]
# --- Constants ---
M_TO_FT = 3.28084
class DebugPayloadRouter:
"""
A unified router that handles payloads for the entire application.
It updates the SimulationStateHub and broadcasts processed data to
registered listeners. This class is thread-safe.
"""
def __init__(
self,
simulation_hub: Optional[SimulationStateHub] = None,
):
self.active_archive = None
self._log_prefix = "[DebugPayloadRouter]"
self._lock = threading.Lock()
self._latest_payloads: Dict[str, Any] = {}
self._last_raw_packet: Optional[tuple] = None
self._history_size = 20
self._history = collections.deque(maxlen=self._history_size)
self._persist = False
# Recent latency samples (seconds) computed from clock synchronizer
self._latency_samples = collections.deque(maxlen=1000)
self._hub = simulation_hub
# Timestamp for ownship position integration
self._last_ownship_update_time: Optional[float] = None
# Listeners for real-time target data broadcasts
self._ris_target_listeners: List[TargetListListener] = []
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
self._persist_dir = os.path.join(project_root, "Temp")
try:
os.makedirs(self._persist_dir, exist_ok=True)
except Exception:
pass
self._handlers: Dict[int, PayloadHandler] = {
ord("M"): lambda p: self._update_last_payload("MFD", p),
ord("S"): lambda p: self._update_last_payload("SAR", p),
ord("B"): lambda p: self._update_last_payload("BIN", p),
# JSON payloads may contain antenna nav fields (ant_nav_az / ant_nav_el).
# Route JSON to a dedicated handler that will both store the raw
# payload and, when possible, extract antenna az/el to update the hub
# so the PPI antenna sweep can animate even when using JSON protocol.
ord("J"): self._handle_json_payload,
ord("R"): self._handle_ris_status,
ord("r"): self._handle_ris_status,
}
logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).")
self._logger = logger
# Optional clock synchronizer for latency estimation. ClockSynchronizer
# requires numpy; if unavailable we simply leave latency estimates at 0.
try:
self._clock_sync = ClockSynchronizer()
except Exception:
self._clock_sync = None
def set_archive(self, archive):
"""Sets the current archive session for recording."""
with self._lock:
self.active_archive = archive
def add_ris_target_listener(self, listener: TargetListListener):
"""Registers a callback function to receive updates for real targets."""
with self._lock:
if listener not in self._ris_target_listeners:
self._ris_target_listeners.append(listener)
self._logger.info(f"RIS target listener added: {listener}")
def remove_ris_target_listener(self, listener: TargetListListener):
"""Unregisters a callback function."""
with self._lock:
try:
self._ris_target_listeners.remove(listener)
self._logger.info(f"RIS target listener removed: {listener}")
except ValueError:
pass
def get_handlers(self) -> Dict[int, PayloadHandler]:
return self._handlers
def _update_last_payload(self, flow_id: str, payload: Any):
with self._lock:
self._latest_payloads[flow_id] = payload
def _parse_ris_payload_to_targets(
self, payload: bytearray
) -> Tuple[List[Target], List[int]]:
"""
Parse RIS payload and return a tuple:
- list of Target objects for entries with flags != 0 (active targets)
- list of integer target IDs present with flags == 0 (inactive)
"""
targets: List[Target] = []
inactive_ids: List[int] = []
try:
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
if ris_target.flags != 0:
target = Target(
target_id=i, trajectory=[], active=True, traceable=True
)
# Server's y-axis is East (our x), x-axis is North (our y)
pos_x_ft = float(ris_target.y) * M_TO_FT
pos_y_ft = float(ris_target.x) * M_TO_FT
pos_z_ft = float(ris_target.z) * M_TO_FT
setattr(target, "_pos_x_ft", pos_x_ft)
setattr(target, "_pos_y_ft", pos_y_ft)
setattr(target, "_pos_z_ft", pos_z_ft)
target._update_current_polar_coords()
try:
raw_h = float(ris_target.heading)
if abs(raw_h) <= (2 * math.pi * 1.1):
hdg_deg = math.degrees(raw_h)
unit = "rad"
else:
hdg_deg = raw_h
unit = "deg"
target.current_heading_deg = hdg_deg % 360
setattr(target, "_raw_heading", raw_h)
except (ValueError, TypeError):
target.current_heading_deg = 0.0
targets.append(target)
else:
inactive_ids.append(int(i))
except Exception:
self._logger.exception(
f"{self._log_prefix} Failed to parse RIS payload into Target objects."
)
return targets, inactive_ids
def _handle_ris_status(self, payload: bytearray):
reception_timestamp = time.monotonic()
parsed_payload = None
try:
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
except (ValueError, TypeError):
self._logger.error("Failed to parse SfpRisStatusPayload from buffer.")
return
# --- Update Ownship State ---
if self._hub:
try:
sc = parsed_payload.scenario
delta_t = 0.0
if self._last_ownship_update_time is not None:
delta_t = reception_timestamp - self._last_ownship_update_time
self._last_ownship_update_time = reception_timestamp
# Get previous ownship state to integrate position
old_state = self._hub.get_ownship_state()
old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0))
# Server's vy is East (our x), vx is North (our y)
ownship_vx_fps = float(sc.vy) * M_TO_FT
ownship_vy_fps = float(sc.vx) * M_TO_FT
# Integrate position
new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t
new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t
ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360
ownship_state = {
"timestamp": reception_timestamp,
"position_xy_ft": (new_pos_x_ft, new_pos_y_ft),
"altitude_ft": float(sc.baro_altitude) * M_TO_FT,
"velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps),
"heading_deg": ownship_heading_deg,
"latitude": float(sc.latitude),
"longitude": float(sc.longitude),
}
self._hub.set_ownship_state(ownship_state)
# Store ownship state in archive if available
with self._lock:
archive = self.active_archive
if archive and hasattr(archive, "add_ownship_state"):
archive.add_ownship_state(ownship_state)
except Exception:
self._logger.exception("Failed to update ownship state.")
# --- Feed clock synchronizer with server timetag (if available) ---
try:
if self._clock_sync is not None and parsed_payload is not None:
try:
server_timetag = int(parsed_payload.scenario.timetag)
# Add a sample: raw server timetag + local reception time
self._clock_sync.add_sample(server_timetag, reception_timestamp)
# Compute a latency sample if model can estimate client time
try:
est_gen = self._clock_sync.to_client_time(server_timetag)
latency = reception_timestamp - est_gen
# store only non-negative latencies
if latency >= 0:
with self._lock:
self._latency_samples.append(latency)
except Exception:
pass
except Exception:
pass
except Exception:
pass
# --- Update Target States ---
real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload)
if self._hub:
try:
if hasattr(self._hub, "add_real_packet"):
self._hub.add_real_packet(reception_timestamp)
for tid in inactive_ids or []:
if hasattr(self._hub, "clear_real_target_data"):
self._hub.clear_real_target_data(tid)
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
self._hub.add_real_state(
target_id=target.target_id,
timestamp=reception_timestamp,
state=state_tuple,
)
if hasattr(self._hub, "set_real_heading"):
raw_val = getattr(target, "_raw_heading", None)
self._hub.set_real_heading(
target.target_id,
getattr(target, "current_heading_deg", 0.0),
raw_value=raw_val,
)
except Exception:
self._logger.exception("Failed to process RIS targets for Hub.")
with self._lock:
archive = self.active_archive
if archive:
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
archive.add_real_state(
target_id=target.target_id,
timestamp=reception_timestamp,
state=state_tuple,
)
# --- BROADCAST to all registered listeners ---
with self._lock:
for listener in self._ris_target_listeners:
try:
listener(real_targets)
except Exception:
self._logger.exception(f"Error in RIS target listener: {listener}")
# --- Update Debug Views (unchanged) ---
self._update_debug_views(parsed_payload)
def _update_debug_views(self, parsed_payload: SfpRisStatusPayload):
"""Helper to populate debug views from a parsed payload."""
try:
sc = parsed_payload.scenario
lines = ["RIS Status Payload:\n", "Scenario:"]
# ... text generation logic ...
text_out = "\n".join(lines)
self._update_last_payload(
"RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8"))
)
# ... JSON generation logic ...
def _convert_ctypes(value):
if hasattr(value, "_length_"):
return list(value)
if isinstance(value, ctypes._SimpleCData):
return value.value
return value
scenario_dict = {
f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_
}
targets_list = [
{f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_}
for t in parsed_payload.tgt.tgt
]
struct = {"scenario": scenario_dict, "targets": targets_list}
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
# --- Propagate antenna azimuth to hub ---
if self._hub:
# Get platform heading and relative antenna sweep
heading_rad = scenario_dict.get("true_heading", scenario_dict.get("platform_azimuth"))
sweep_rad = scenario_dict.get("ant_nav_az")
total_az_rad = None
if heading_rad is not None:
total_az_rad = float(heading_rad)
if sweep_rad is not None:
total_az_rad += float(sweep_rad)
if total_az_rad is not None:
az_deg = math.degrees(total_az_rad)
self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic())
except Exception:
self._logger.exception("Failed to generate text/JSON for RIS debug view.")
def _handle_json_payload(self, payload: bytearray):
"""
Handle JSON payloads. Store raw JSON and, if present, extract
antenna navigation fields (ant_nav_az / ant_nav_el) and propagate
them to the SimulationStateHub so the PPI antenna can animate.
"""
try:
# Always store the raw payload for debug inspection
self._update_last_payload("JSON", payload)
if not self._hub:
return
# Try to decode and parse JSON
try:
text = payload.decode("utf-8")
obj = json.loads(text)
except Exception:
return
# The JSON may follow the same structure as the RIS debug JSON
# we generate elsewhere: {"scenario": {...}, "targets": [...]}
def _find_scenario_field(dct, key):
if not isinstance(dct, dict):
return None
# Direct scenario container
sc = dct.get("scenario") or dct.get("sc") or dct
if isinstance(sc, dict) and key in sc:
return sc.get(key)
# Top-level key
if key in dct:
return dct.get(key)
# Nested search
for v in dct.values():
if isinstance(v, dict) and key in v:
return v.get(key)
return None
# Find platform heading and relative antenna sweep
heading_val = _find_scenario_field(obj, "true_heading") or _find_scenario_field(
obj, "platform_azimuth"
)
sweep_val = _find_scenario_field(obj, "ant_nav_az")
total_az_val = None
if heading_val is not None:
total_az_val = float(heading_val)
if sweep_val is not None:
total_az_val += float(sweep_val)
if total_az_val is not None:
try:
# Values may be in radians or degrees; if small absolute
# value (< 2*pi) assume radians and convert.
val = float(total_az_val)
if abs(val) <= (2 * math.pi + 0.01):
az_deg = math.degrees(val)
else:
az_deg = val
self._hub.set_antenna_azimuth(az_deg, timestamp=time.monotonic())
except Exception:
pass
# Optionally capture elevation for future UI use
plat_el = _find_scenario_field(obj, "ant_nav_el") or _find_scenario_field(
obj, "platform_elevation"
)
if plat_el is not None:
try:
_ = float(plat_el)
# For now we don't store elevation in the hub; leave as TODO.
except Exception:
pass
except Exception:
self._logger.exception("Error handling JSON payload")
def get_and_clear_latest_payloads(self) -> Dict[str, Any]:
with self._lock:
new_payloads = self._latest_payloads
self._latest_payloads = {}
return new_payloads
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
with self._lock:
self._last_raw_packet = (raw_bytes, addr)
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}
try:
hdr = SFPHeader.from_buffer_copy(raw_bytes)
entry["flow"] = int(hdr.SFP_FLOW)
entry["tid"] = int(hdr.SFP_TID)
flow_map = {
ord("M"): "MFD",
ord("S"): "SAR",
ord("B"): "BIN",
ord("J"): "JSON",
ord("R"): "RIS",
ord("r"): "ris",
}
entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"]))
except Exception:
pass
self._history.append(entry)
if self._persist:
try:
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
path = os.path.join(self._persist_dir, fname)
with open(path, "wb") as f:
f.write(raw_bytes)
except Exception:
pass
def get_and_clear_raw_packet(self) -> Optional[tuple]:
with self._lock:
pkt = self._last_raw_packet
self._last_raw_packet = None
return pkt
# --- Latency estimation API ---
def get_estimated_latency_s(self) -> float:
"""Return estimated one-way latency in seconds, or 0.0 if unknown."""
try:
if self._clock_sync is not None:
return float(self._clock_sync.get_average_latency_s())
except Exception:
pass
return 0.0
def get_latency_samples(self, limit: Optional[int] = None) -> List[float]:
"""Return recent latency samples in seconds (most recent last).
Args:
limit: maximum number of samples to return (None = all available)
"""
with self._lock:
samples = list(self._latency_samples)
if limit is not None and limit > 0:
return samples[-limit:]
return samples
def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]:
"""Compute basic statistics over recent latency samples.
Returns a dict with mean_ms, std_ms, var_ms, min_ms, max_ms, count.
"""
with self._lock:
samples = list(self._latency_samples)
if not samples:
return {
"mean_ms": None,
"std_ms": None,
"var_ms": None,
"min_ms": None,
"max_ms": None,
"count": 0,
}
if sample_limit and sample_limit > 0:
samples = samples[-sample_limit:]
# Work in milliseconds for reporting
ms = [s * 1000.0 for s in samples]
mean = statistics.mean(ms)
var = statistics.pvariance(ms) if len(ms) > 1 else 0.0
std = statistics.pstdev(ms) if len(ms) > 1 else 0.0
return {
"mean_ms": round(mean, 3),
"std_ms": round(std, 3),
"var_ms": round(var, 3),
"min_ms": round(min(ms), 3),
"max_ms": round(max(ms), 3),
"count": len(ms),
}
def get_history(self):
with self._lock:
return list(self._history)
def clear_history(self):
with self._lock:
self._history.clear()
def set_history_size(self, n: int):
with self._lock:
n = max(1, int(n))
self._history_size = n
new_deque = collections.deque(self._history, maxlen=self._history_size)
self._history = new_deque
def set_persist(self, enabled: bool):
with self._lock:
self._persist = bool(enabled)