S1005403_RisCC/target_simulator/gui/payload_router.py

403 lines
16 KiB
Python

# target_simulator/gui/payload_router.py
"""Payload router for buffering SFP payloads for the GUI.
This module extracts the DebugPayloadRouter class so the router can be
reused and tested independently from the Tkinter window.
"""
import threading
import collections
import datetime
import os
import logging
import math
import json
import ctypes
import time
from queue import Queue, Full
from typing import Dict, Optional, Any, List, Callable, Tuple
from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.core.models import Target
from target_simulator.utils.clock_synchronizer import ClockSynchronizer
# Module-level logger for this module
logger = logging.getLogger(__name__)
PayloadHandler = Callable[[bytearray], None]
TargetListListener = Callable[[List[Target]], None]
class DebugPayloadRouter:
"""
A unified router that handles payloads for the entire application.
It updates the SimulationStateHub and broadcasts processed data to
registered listeners. This class is thread-safe.
"""
def __init__(
self,
simulation_hub: Optional[SimulationStateHub] = None,
):
self.active_archive = None
self._log_prefix = "[DebugPayloadRouter]"
self._lock = threading.Lock()
self._latest_payloads: Dict[str, Any] = {}
self._last_raw_packet: Optional[tuple] = None
self._history_size = 20
self._history = collections.deque(maxlen=self._history_size)
self._persist = False
self._hub = simulation_hub
self._clock_synchronizer = ClockSynchronizer()
# Listeners for real-time target data broadcasts
self._ris_target_listeners: List[TargetListListener] = []
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
self._persist_dir = os.path.join(project_root, "Temp")
try:
os.makedirs(self._persist_dir, exist_ok=True)
except Exception:
pass
self._handlers: Dict[int, PayloadHandler] = {
ord("M"): lambda p: self._update_last_payload("MFD", p),
ord("S"): lambda p: self._update_last_payload("SAR", p),
ord("B"): lambda p: self._update_last_payload("BIN", p),
ord("J"): lambda p: self._update_last_payload("JSON", p),
ord("R"): self._handle_ris_status,
ord("r"): self._handle_ris_status,
}
logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}).")
self._logger = logger
def set_archive(self, archive):
"""Imposta la sessione di archivio corrente per la registrazione."""
with self._lock:
self.active_archive = archive
def add_ris_target_listener(self, listener: TargetListListener):
"""Registers a callback function to receive updates for real targets."""
with self._lock:
if listener not in self._ris_target_listeners:
self._ris_target_listeners.append(listener)
self._logger.info(f"RIS target listener added: {listener}")
def remove_ris_target_listener(self, listener: TargetListListener):
"""Unregisters a callback function."""
with self._lock:
try:
self._ris_target_listeners.remove(listener)
self._logger.info(f"RIS target listener removed: {listener}")
except ValueError:
pass
def get_handlers(self) -> Dict[int, PayloadHandler]:
return self._handlers
def _update_last_payload(self, flow_id: str, payload: Any):
with self._lock:
self._latest_payloads[flow_id] = payload
def _parse_ris_payload_to_targets(
self, payload: bytearray
) -> Tuple[List[Target], List[int]]:
"""
Parse RIS payload and return a tuple:
- list of Target objects for entries with flags != 0 (active targets)
- list of integer target IDs present with flags == 0 (inactive)
"""
targets: List[Target] = []
inactive_ids: List[int] = []
try:
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
if ris_target.flags != 0:
target = Target(
target_id=i, trajectory=[], active=True, traceable=True
)
M_TO_FT = 3.280839895
pos_x_ft = float(ris_target.y) * M_TO_FT
pos_y_ft = float(ris_target.x) * M_TO_FT
pos_z_ft = float(ris_target.z) * M_TO_FT
setattr(target, "_pos_x_ft", pos_x_ft)
setattr(target, "_pos_y_ft", pos_y_ft)
setattr(target, "_pos_z_ft", pos_z_ft)
target._update_current_polar_coords()
try:
raw_h = float(ris_target.heading)
# Server should send heading in radians; but be tolerant:
# if the magnitude looks like radians (<= ~2*pi) convert to degrees,
# otherwise assume it's already degrees.
if abs(raw_h) <= (2 * math.pi * 1.1):
hdg_deg = math.degrees(raw_h)
unit = "rad"
else:
hdg_deg = raw_h
unit = "deg"
target.current_heading_deg = hdg_deg % 360
# Store the raw value on the Target for later correlation
try:
setattr(target, "_raw_heading", raw_h)
except Exception:
pass
self._logger.debug(
f"Parsed RIS heading for target {i}: raw={raw_h} assumed={unit} hdg_deg={target.current_heading_deg:.6f}"
)
except (ValueError, TypeError):
target.current_heading_deg = 0.0
targets.append(target)
else:
try:
inactive_ids.append(int(i))
except Exception:
pass
except Exception:
self._logger.exception(
f"{self._log_prefix} Failed to parse RIS payload into Target objects."
)
return targets, inactive_ids
def _handle_ris_status(self, payload: bytearray):
# --- MODIFICA INIZIO ---
client_reception_time = time.monotonic()
# Attempt to parse payload and server timetag for synchronization
try:
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
server_timetag = parsed_payload.scenario.timetag
# 1. Update the synchronization model with the new sample
self._clock_synchronizer.add_sample(server_timetag, client_reception_time)
# 2. Convert the server timetag to an estimated client-domain generation time
estimated_generation_time = self._clock_synchronizer.to_client_time(server_timetag)
except (ValueError, TypeError, IndexError):
# If parsing fails, we cannot sync. Fallback to reception time.
self._logger.warning("Could not parse RIS payload for timetag. Using reception time for sync.")
estimated_generation_time = client_reception_time
real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload)
if self._hub:
try:
# Record a single packet-level arrival timestamp
if hasattr(self._hub, "add_real_packet"):
self._hub.add_real_packet(client_reception_time)
# Clear inactive targets
for tid in inactive_ids or []:
if hasattr(self._hub, "clear_real_target_data"):
self._hub.clear_real_target_data(tid)
# Add real states for active targets using the ESTIMATED generation time
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
self._hub.add_real_state(
target_id=target.target_id,
timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE
state=state_tuple,
)
# Propagate heading information
for target in real_targets:
if hasattr(self._hub, "set_real_heading"):
raw_val = getattr(target, "_raw_heading", None)
self._hub.set_real_heading(
target.target_id,
getattr(target, "current_heading_deg", 0.0),
raw_value=raw_val,
)
except Exception:
self._logger.exception(
"DebugPayloadRouter: Failed to process RIS for Hub."
)
with self._lock:
archive = self.active_archive
if archive:
for target in real_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
archive.add_real_state(
target_id=target.target_id,
timestamp=estimated_generation_time, # <-- MODIFICA CHIAVE
state=state_tuple,
)
# --- MODIFICA FINE ---
# --- BROADCAST to all registered listeners ---
with self._lock:
for listener in self._ris_target_listeners:
try:
listener(real_targets)
except Exception:
self._logger.exception(f"Error in RIS target listener: {listener}")
# ... (il resto della funzione per il debug rimane invariato)
try:
if len(payload) >= SfpRisStatusPayload.size():
# Re-parse if not already done (for robustness)
if 'parsed_payload' not in locals():
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
sc = parsed_payload.scenario
# ... (Text generation logic remains unchanged) ...
lines = ["RIS Status Payload:\n", "Scenario:"]
lines.append(f" timetag : {sc.timetag}") # ... etc.
text_out = "\n".join(lines)
self._update_last_payload(
"RIS_STATUS", bytearray(text_out.encode("utf-8"))
)
self._update_last_payload(
"RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8"))
)
def _convert_ctypes(value):
if hasattr(value, "_length_"):
return list(value)
if isinstance(value, ctypes._SimpleCData):
return value.value
return value
scenario_dict = {
f[0]: _convert_ctypes(getattr(parsed_payload.scenario, f[0]))
for f in parsed_payload.scenario._fields_
}
targets_list = [
{f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_}
for t in parsed_payload.tgt.tgt
]
struct = {"scenario": scenario_dict, "targets": targets_list}
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
try:
plat = None
if "ant_nav_az" in scenario_dict:
plat = scenario_dict.get("ant_nav_az")
elif "platform_azimuth" in scenario_dict:
plat = scenario_dict.get("platform_azimuth")
if (
plat is not None
and self._hub
and hasattr(self._hub, "set_platform_azimuth")
):
try:
val = float(plat)
if abs(val) <= (2 * math.pi * 1.1):
deg = math.degrees(val)
else:
deg = val
if hasattr(self._hub, "set_antenna_azimuth"):
self._hub.set_antenna_azimuth(
deg, timestamp=client_reception_time
)
else:
self._hub.set_platform_azimuth(
deg, timestamp=client_reception_time
)
except Exception:
pass
except Exception:
self._logger.debug(
"Error while extracting antenna azimuth from RIS payload",
exc_info=True,
)
except Exception:
self._logger.exception("Failed to generate text/JSON for RIS debug view.")
def get_and_clear_latest_payloads(self) -> Dict[str, Any]:
with self._lock:
new_payloads = self._latest_payloads
self._latest_payloads = {}
return new_payloads
# ... (il resto del file rimane invariato) ...
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
with self._lock:
self._last_raw_packet = (raw_bytes, addr)
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}
try:
hdr = SFPHeader.from_buffer_copy(raw_bytes)
entry["flow"] = int(hdr.SFP_FLOW)
entry["tid"] = int(hdr.SFP_TID)
flow_map = {
ord("M"): "MFD",
ord("S"): "SAR",
ord("B"): "BIN",
ord("J"): "JSON",
ord("R"): "RIS",
ord("r"): "ris",
}
entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"]))
except Exception:
pass
self._history.append(entry)
if self._persist:
try:
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
path = os.path.join(self._persist_dir, fname)
with open(path, "wb") as f:
f.write(raw_bytes)
except Exception:
pass
def get_and_clear_raw_packet(self) -> Optional[tuple]:
with self._lock:
pkt = self._last_raw_packet
self._last_raw_packet = None
return pkt
def get_history(self):
with self._lock:
return list(self._history)
def clear_history(self):
with self._lock:
self._history.clear()
def set_history_size(self, n: int):
with self._lock:
try:
n = max(1, int(n))
except Exception:
return
self._history_size = n
new_deque = collections.deque(self._history, maxlen=self._history_size)
self._history = new_deque
def set_persist(self, enabled: bool):
with self._lock:
self._persist = bool(enabled)
def get_estimated_latency_s(self) -> float:
"""
Returns the estimated one-way server-to-client network latency.
Returns:
The estimated latency in seconds, or 0.0 if not available.
"""
if hasattr(self, '_clock_synchronizer') and self._clock_synchronizer:
return self._clock_synchronizer.get_average_latency_s()
return 0.0