704 lines
28 KiB
Python
704 lines
28 KiB
Python
# target_simulator/gui/payload_router.py
|
|
"""
|
|
Payload router for buffering SFP payloads for the GUI.
|
|
|
|
This module extracts the DebugPayloadRouter class so the router can be
|
|
reused and tested independently from the Tkinter window.
|
|
"""
|
|
|
|
import threading
|
|
import statistics
|
|
import collections
|
|
import datetime
|
|
import os
|
|
import logging
|
|
import math
|
|
import json
|
|
import ctypes
|
|
import time
|
|
from queue import Queue, Empty, Full
|
|
from typing import Dict, Optional, Any, List, Callable, Tuple
|
|
|
|
from target_simulator.core.sfp_structures import (
|
|
SFPHeader,
|
|
SfpRisStatusPayload,
|
|
DataTag,
|
|
SfpRisSyncPayload,
|
|
)
|
|
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
|
|
from target_simulator.core.models import Target
|
|
from target_simulator.utils.clock_synchronizer import ClockSynchronizer
|
|
|
|
# Module-level logger for this module
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PayloadHandler = Callable[[bytearray], None]
|
|
TargetListListener = Callable[[List[Target]], None]
|
|
|
|
# --- Constants ---
|
|
M_TO_FT = 3.28084
|
|
PERFORMANCE_SAMPLES_BUFFER_SIZE = 10000
|
|
|
|
|
|
class DebugPayloadRouter:
|
|
"""
|
|
A unified router that handles payloads for the entire application.
|
|
It receives payloads from the network thread, queues them, and processes
|
|
them on a dedicated worker thread to avoid blocking the network receiver.
|
|
This class is thread-safe.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
simulation_hub: Optional[SimulationStateHub] = None,
|
|
):
|
|
self.active_archive = None
|
|
self.logger = logger
|
|
self._log_prefix = "[DebugPayloadRouter]"
|
|
self._lock = threading.Lock()
|
|
self._latest_payloads: Dict[str, Any] = {}
|
|
self._last_raw_packet: Optional[tuple] = None
|
|
self._sfp_debug_history_size = 20
|
|
self._history = collections.deque(maxlen=self._sfp_debug_history_size)
|
|
self._persist = False
|
|
|
|
self._hub = simulation_hub
|
|
self._last_ownship_update_time: Optional[float] = None
|
|
self._ris_target_listeners: List[TargetListListener] = []
|
|
|
|
# --- Worker Thread and Queue for Processing ---
|
|
self._processing_queue: Queue = Queue(maxsize=100)
|
|
self._sync_results_queue: Queue = Queue(
|
|
maxsize=500
|
|
) # Coda per i risultati SYNC (grande buffer)
|
|
self._stop_worker = threading.Event()
|
|
self._processing_thread = threading.Thread(
|
|
target=self._processing_loop, name="PayloadProcessingThread", daemon=True
|
|
)
|
|
self._processing_thread.start()
|
|
|
|
from target_simulator.config import DEBUG_CONFIG
|
|
|
|
self._profiling_enabled = DEBUG_CONFIG.get(
|
|
"enable_performance_profiling", False
|
|
)
|
|
self._perf_counters = {
|
|
"packet_count": 0,
|
|
"_total_packet_count": 0,
|
|
"parse_time_total": 0.0,
|
|
"hub_update_time_total": 0.0,
|
|
"archive_time_total": 0.0,
|
|
"listener_time_total": 0.0,
|
|
"clock_sync_time_total": 0.0,
|
|
"last_report_time": time.time(),
|
|
"max_processing_time": 0.0,
|
|
}
|
|
|
|
if self._profiling_enabled:
|
|
self._perf_samples = collections.deque(
|
|
maxlen=PERFORMANCE_SAMPLES_BUFFER_SIZE
|
|
)
|
|
else:
|
|
self._perf_samples = None
|
|
|
|
project_root = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
)
|
|
self._persist_dir = os.path.join(project_root, "Temp")
|
|
try:
|
|
os.makedirs(self._persist_dir, exist_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
self._handlers: Dict[int, PayloadHandler] = {
|
|
ord("M"): lambda p: self._update_last_payload("MFD", p),
|
|
ord("S"): lambda p: self._update_last_payload("SAR", p),
|
|
ord("B"): lambda p: self._update_last_payload("BIN", p),
|
|
ord("J"): self._handle_json_payload,
|
|
ord("R"): self._handle_ris_status,
|
|
ord("r"): self._handle_ris_status,
|
|
}
|
|
self.logger.info(
|
|
f"{self._log_prefix} Initialized (Hub: {self._hub is not None})."
|
|
)
|
|
|
|
try:
|
|
self._clock_sync = ClockSynchronizer()
|
|
except Exception:
|
|
self._clock_sync = None
|
|
|
|
def shutdown(self):
|
|
"""Signals the processing worker thread to stop."""
|
|
self.logger.info("Shutting down payload router worker thread...")
|
|
self._stop_worker.set()
|
|
try:
|
|
self._processing_queue.put_nowait(None)
|
|
except Full:
|
|
pass
|
|
self._processing_thread.join(timeout=2.0)
|
|
if self._processing_thread.is_alive():
|
|
self.logger.warning("Payload processing thread did not shut down cleanly.")
|
|
|
|
def _processing_loop(self):
|
|
"""Worker thread loop that processes payloads from the queue."""
|
|
self.logger.info("Payload processing worker thread started.")
|
|
while not self._stop_worker.is_set():
|
|
try:
|
|
item = self._processing_queue.get(timeout=1.0)
|
|
if item is None:
|
|
break
|
|
|
|
payload, reception_timestamp = item
|
|
self._dispatch_payload(payload, reception_timestamp)
|
|
self._processing_queue.task_done()
|
|
|
|
except Empty:
|
|
continue
|
|
except Exception:
|
|
self.logger.exception("Unexpected error in payload processing loop.")
|
|
self.logger.info("Payload processing worker thread stopped.")
|
|
|
|
def _dispatch_payload(self, payload: bytearray, reception_timestamp: float):
|
|
"""
|
|
Worker Thread: Analyzes the payload and routes it to the correct handler.
|
|
Distinguishes between RIS status messages and SYNC replies.
|
|
"""
|
|
try:
|
|
if len(payload) < ctypes.sizeof(DataTag):
|
|
self.logger.warning(
|
|
f"Payload too short: {len(payload)} bytes, need at least {ctypes.sizeof(DataTag)}"
|
|
)
|
|
return
|
|
|
|
tag = DataTag.from_buffer_copy(payload)
|
|
tag_str = f"{chr(tag.ID[0]) if 32 <= tag.ID[0] <= 126 else tag.ID[0]}{chr(tag.ID[1]) if 32 <= tag.ID[1] <= 126 else tag.ID[1]}"
|
|
|
|
# 'SY' -> SYNC Reply (ha priorità perché è specifico)
|
|
if tag.ID[0] == ord("S") and tag.ID[1] == ord("Y"):
|
|
self._process_sync_reply_payload(payload, reception_timestamp)
|
|
else:
|
|
# Tutti gli altri payload sul flow 'R' sono RIS Status
|
|
# (che iniziano con scenario_tag, target_tag, ecc.)
|
|
self._process_ris_status_payload(payload, reception_timestamp)
|
|
except Exception:
|
|
self.logger.exception("Error during payload dispatch.")
|
|
|
|
def _process_sync_reply_payload(
|
|
self, payload: bytearray, reception_timestamp: float
|
|
):
|
|
"""
|
|
Worker Thread: Handles a SYNC reply.
|
|
"""
|
|
try:
|
|
# The payload here is the DataTag + SfpRisSyncPayload
|
|
# Skip the DataTag (8 bytes) to get to the actual sync data
|
|
sync_data = payload[ctypes.sizeof(DataTag) :]
|
|
|
|
if len(sync_data) < 16:
|
|
self.logger.warning(
|
|
f"SYNC payload too short: {len(sync_data)} bytes, need at least 16"
|
|
)
|
|
return
|
|
|
|
# Usa la struttura ctypes che ora corrisponde esattamente al C++
|
|
# Ordine corretto: flags, cc_cookie, ris_timetag, tx_period_ms
|
|
sync_payload = SfpRisSyncPayload.from_buffer_copy(sync_data)
|
|
|
|
result = {
|
|
"cookie": sync_payload.cc_cookie,
|
|
"server_timetag": sync_payload.ris_timetag,
|
|
"reception_timestamp": reception_timestamp,
|
|
"flags": sync_payload.flags,
|
|
"tx_period_ms": sync_payload.tx_period_ms,
|
|
}
|
|
|
|
# Put the result into the dedicated queue for the Sync Tool window
|
|
try:
|
|
self._sync_results_queue.put_nowait(result)
|
|
except Full:
|
|
# La coda è piena (probabilmente la finestra Sync Tool non è aperta o non sta consumando)
|
|
# Scartiamo silenziosamente - non è un errore critico
|
|
pass
|
|
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
f"Failed to parse SYNC reply payload: {e}", exc_info=True
|
|
)
|
|
|
|
def _handle_ris_status(self, payload: bytearray):
|
|
"""
|
|
Network Thread: Queues an incoming RIS status payload for processing.
|
|
"""
|
|
reception_timestamp = time.monotonic()
|
|
try:
|
|
self._processing_queue.put_nowait((payload, reception_timestamp))
|
|
except Full:
|
|
self.logger.error("Payload processing queue is full! A packet was dropped.")
|
|
|
|
def _process_ris_status_payload(
|
|
self, payload: bytearray, reception_timestamp: float
|
|
):
|
|
"""
|
|
Worker Thread: Parses and processes a single RIS status payload.
|
|
"""
|
|
t_start = time.perf_counter() if self._profiling_enabled else None
|
|
|
|
if self._profiling_enabled:
|
|
self._perf_counters["packet_count"] += 1
|
|
self._perf_counters["_total_packet_count"] += 1
|
|
|
|
parsed_payload = None
|
|
t_parse_start = time.perf_counter()
|
|
try:
|
|
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
|
|
except (ValueError, TypeError):
|
|
self.logger.error("Failed to parse SfpRisStatusPayload from buffer.")
|
|
return
|
|
t_parse_end = time.perf_counter()
|
|
if self._profiling_enabled:
|
|
self._perf_counters["parse_time_total"] += t_parse_end - t_parse_start
|
|
|
|
t_hub_start = time.perf_counter()
|
|
if self._hub:
|
|
try:
|
|
sc = parsed_payload.scenario
|
|
delta_t = 0.0
|
|
if self._last_ownship_update_time is not None:
|
|
delta_t = reception_timestamp - self._last_ownship_update_time
|
|
self._last_ownship_update_time = reception_timestamp
|
|
old_state = self._hub.get_ownship_state()
|
|
old_pos_xy = old_state.get("position_xy_ft", (0.0, 0.0))
|
|
ownship_vx_fps = float(sc.vy) * M_TO_FT
|
|
ownship_vy_fps = float(sc.vx) * M_TO_FT
|
|
new_pos_x_ft = old_pos_xy[0] + ownship_vx_fps * delta_t
|
|
new_pos_y_ft = old_pos_xy[1] + ownship_vy_fps * delta_t
|
|
ownship_heading_deg = math.degrees(float(sc.true_heading)) % 360
|
|
ownship_state = {
|
|
"timestamp": reception_timestamp,
|
|
"position_xy_ft": (new_pos_x_ft, new_pos_y_ft),
|
|
"altitude_ft": float(sc.baro_altitude) * M_TO_FT,
|
|
"velocity_xy_fps": (ownship_vx_fps, ownship_vy_fps),
|
|
"heading_deg": ownship_heading_deg,
|
|
"latitude": float(sc.latitude),
|
|
"longitude": float(sc.longitude),
|
|
}
|
|
self._hub.set_ownship_state(ownship_state)
|
|
with self._lock:
|
|
archive = self.active_archive
|
|
if archive and hasattr(archive, "add_ownship_state"):
|
|
archive.add_ownship_state(ownship_state)
|
|
except Exception:
|
|
self.logger.exception("Failed to update ownship state.")
|
|
|
|
t_clock_start = time.perf_counter()
|
|
if self._clock_sync is not None and parsed_payload is not None:
|
|
try:
|
|
server_timetag = int(parsed_payload.scenario.timetag)
|
|
self._clock_sync.add_sample(server_timetag, reception_timestamp)
|
|
est_gen = self._clock_sync.to_client_time(server_timetag)
|
|
latency = reception_timestamp - est_gen
|
|
if latency >= 0 and self.active_archive is not None:
|
|
latency_ms = latency * 1000
|
|
with self._lock:
|
|
archive = self.active_archive
|
|
if archive and hasattr(archive, "add_latency_sample"):
|
|
archive.add_latency_sample(reception_timestamp, latency_ms)
|
|
except Exception:
|
|
pass
|
|
t_clock_end = time.perf_counter()
|
|
if self._profiling_enabled:
|
|
self._perf_counters["clock_sync_time_total"] += t_clock_end - t_clock_start
|
|
|
|
real_targets, inactive_ids = self._parse_ris_payload_to_targets(payload)
|
|
|
|
if self._hub:
|
|
try:
|
|
self._hub.add_real_packet(reception_timestamp)
|
|
for tid in inactive_ids or []:
|
|
self._hub.clear_real_target_data(tid)
|
|
for target in real_targets:
|
|
state_tuple = (
|
|
getattr(target, "_pos_x_ft", 0.0),
|
|
getattr(target, "_pos_y_ft", 0.0),
|
|
getattr(target, "_pos_z_ft", 0.0),
|
|
)
|
|
self._hub.add_real_state(
|
|
target.target_id, reception_timestamp, state_tuple
|
|
)
|
|
self._hub.set_real_heading(
|
|
target.target_id,
|
|
target.current_heading_deg,
|
|
raw_value=getattr(target, "_raw_heading", None),
|
|
)
|
|
except Exception:
|
|
self.logger.exception("Failed to process RIS targets for Hub.")
|
|
|
|
t_hub_end = time.perf_counter()
|
|
if self._profiling_enabled:
|
|
self._perf_counters["hub_update_time_total"] += t_hub_end - t_hub_start
|
|
|
|
t_archive_start = time.perf_counter()
|
|
with self._lock:
|
|
archive = self.active_archive
|
|
if archive:
|
|
for target in real_targets:
|
|
state_tuple = (
|
|
getattr(target, "_pos_x_ft", 0.0),
|
|
getattr(target, "_pos_y_ft", 0.0),
|
|
getattr(target, "_pos_z_ft", 0.0),
|
|
)
|
|
archive.add_real_state(
|
|
target.target_id, reception_timestamp, state_tuple
|
|
)
|
|
t_archive_end = time.perf_counter()
|
|
if self._profiling_enabled:
|
|
self._perf_counters["archive_time_total"] += t_archive_end - t_archive_start
|
|
|
|
t_listener_start = time.perf_counter()
|
|
with self._lock:
|
|
for listener in self._ris_target_listeners:
|
|
try:
|
|
listener(real_targets)
|
|
except Exception:
|
|
self.logger.exception(f"Error in RIS target listener: {listener}")
|
|
t_listener_end = time.perf_counter()
|
|
if self._profiling_enabled:
|
|
self._perf_counters["listener_time_total"] += (
|
|
t_listener_end - t_listener_start
|
|
)
|
|
|
|
if (
|
|
self._profiling_enabled
|
|
and t_start is not None
|
|
and self.active_archive is not None
|
|
):
|
|
total_processing_time = time.perf_counter() - t_start
|
|
self._perf_counters["max_processing_time"] = max(
|
|
self._perf_counters["max_processing_time"], total_processing_time
|
|
)
|
|
|
|
if (
|
|
total_processing_time > 0.010
|
|
or self._perf_counters["_total_packet_count"] % 100 == 0
|
|
):
|
|
if self._perf_samples is not None:
|
|
sample = {
|
|
"timestamp": reception_timestamp,
|
|
"total_ms": round(total_processing_time * 1000, 3),
|
|
"parse_ms": round((t_parse_end - t_parse_start) * 1000, 3),
|
|
"hub_ms": round((t_hub_end - t_hub_start) * 1000, 3),
|
|
"archive_ms": round(
|
|
(t_archive_end - t_archive_start) * 1000, 3
|
|
),
|
|
"listener_ms": round(
|
|
(t_listener_end - t_listener_start) * 1000, 3
|
|
),
|
|
"clock_ms": round((t_clock_end - t_clock_start) * 1000, 3),
|
|
}
|
|
self._perf_samples.append(sample)
|
|
if len(self._perf_samples) % 500 == 0:
|
|
self.logger.debug(
|
|
f"Performance samples buffer: {len(self._perf_samples)} samples"
|
|
)
|
|
|
|
current_time = time.time()
|
|
if current_time - self._perf_counters["last_report_time"] >= 5.0:
|
|
self._report_performance_stats()
|
|
self._perf_counters["last_report_time"] = current_time
|
|
|
|
self._update_debug_views(parsed_payload)
|
|
|
|
def get_sync_result(self) -> Optional[Dict]:
|
|
"""Estrae un risultato dalla coda SYNC in modo non bloccante."""
|
|
try:
|
|
return self._sync_results_queue.get_nowait()
|
|
except Empty:
|
|
return None
|
|
|
|
def set_archive(self, archive):
|
|
with self._lock:
|
|
self.active_archive = archive
|
|
if archive is not None:
|
|
if self._perf_samples is not None:
|
|
self._perf_samples.clear()
|
|
self._perf_counters.update(
|
|
{
|
|
"packet_count": 0,
|
|
"_total_packet_count": 0,
|
|
"parse_time_total": 0.0,
|
|
"hub_update_time_total": 0.0,
|
|
"archive_time_total": 0.0,
|
|
"listener_time_total": 0.0,
|
|
"clock_sync_time_total": 0.0,
|
|
"max_processing_time": 0.0,
|
|
"last_report_time": time.time(),
|
|
}
|
|
)
|
|
self.logger.debug("Performance buffers cleared for new simulation")
|
|
|
|
def add_ris_target_listener(self, listener: TargetListListener):
|
|
with self._lock:
|
|
if listener not in self._ris_target_listeners:
|
|
self._ris_target_listeners.append(listener)
|
|
self.logger.info(f"RIS target listener added: {listener}")
|
|
|
|
def remove_ris_target_listener(self, listener: TargetListListener):
|
|
with self._lock:
|
|
try:
|
|
self._ris_target_listeners.remove(listener)
|
|
self.logger.info(f"RIS target listener removed: {listener}")
|
|
except ValueError:
|
|
pass
|
|
|
|
def get_handlers(self) -> Dict[int, PayloadHandler]:
|
|
return self._handlers
|
|
|
|
def _update_last_payload(self, flow_id: str, payload: Any):
|
|
with self._lock:
|
|
self._latest_payloads[flow_id] = payload
|
|
|
|
def _parse_ris_payload_to_targets(
|
|
self, payload: bytearray
|
|
) -> Tuple[List[Target], List[int]]:
|
|
targets: List[Target] = []
|
|
inactive_ids: List[int] = []
|
|
try:
|
|
parsed_payload = SfpRisStatusPayload.from_buffer_copy(payload)
|
|
for i, ris_target in enumerate(parsed_payload.tgt.tgt):
|
|
if ris_target.flags != 0:
|
|
target = Target(
|
|
target_id=i, trajectory=[], active=True, traceable=True
|
|
)
|
|
pos_x_ft = float(ris_target.y) * M_TO_FT
|
|
pos_y_ft = float(ris_target.x) * M_TO_FT
|
|
pos_z_ft = float(ris_target.z) * M_TO_FT
|
|
setattr(target, "_pos_x_ft", pos_x_ft)
|
|
setattr(target, "_pos_y_ft", pos_y_ft)
|
|
setattr(target, "_pos_z_ft", pos_z_ft)
|
|
target._update_current_polar_coords()
|
|
try:
|
|
raw_h = float(ris_target.heading)
|
|
target.current_heading_deg = (
|
|
math.degrees(raw_h) if abs(raw_h) <= 7.0 else raw_h
|
|
) % 360
|
|
setattr(target, "_raw_heading", raw_h)
|
|
except (ValueError, TypeError):
|
|
target.current_heading_deg = 0.0
|
|
targets.append(target)
|
|
else:
|
|
inactive_ids.append(int(i))
|
|
except Exception:
|
|
self.logger.exception("Failed to parse RIS payload into Target objects.")
|
|
return targets, inactive_ids
|
|
|
|
def _update_debug_views(self, parsed_payload: SfpRisStatusPayload):
|
|
try:
|
|
sc = parsed_payload.scenario
|
|
lines = ["RIS Status Payload:\n", "Scenario:"]
|
|
text_out = "\n".join(lines)
|
|
self._update_last_payload(
|
|
"RIS_STATUS_TEXT", bytearray(text_out.encode("utf-8"))
|
|
)
|
|
|
|
def _convert_ctypes(value):
|
|
if hasattr(value, "_length_"):
|
|
return list(value)
|
|
if isinstance(value, ctypes._SimpleCData):
|
|
return value.value
|
|
return value
|
|
|
|
scenario_dict = {
|
|
f[0]: _convert_ctypes(getattr(sc, f[0])) for f in sc._fields_
|
|
}
|
|
targets_list = [
|
|
{f[0]: _convert_ctypes(getattr(t, f[0])) for f in t._fields_}
|
|
for t in parsed_payload.tgt.tgt
|
|
]
|
|
struct = {"scenario": scenario_dict, "targets": targets_list}
|
|
json_bytes = bytearray(json.dumps(struct, indent=2).encode("utf-8"))
|
|
self._update_last_payload("RIS_STATUS_JSON", json_bytes)
|
|
|
|
if self._hub:
|
|
sweep_rad = scenario_dict.get("ant_nav_az")
|
|
if sweep_rad is not None:
|
|
try:
|
|
sweep_deg = math.degrees(float(sweep_rad))
|
|
self._hub.set_antenna_azimuth(
|
|
sweep_deg, timestamp=time.monotonic()
|
|
)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
self.logger.exception("Failed to generate text/JSON for RIS debug view.")
|
|
|
|
def _handle_json_payload(self, payload: bytearray):
|
|
try:
|
|
self._update_last_payload("JSON", payload)
|
|
if not self._hub:
|
|
return
|
|
try:
|
|
obj = json.loads(payload.decode("utf-8"))
|
|
except Exception:
|
|
return
|
|
|
|
def _find_val(keys, dct):
|
|
if not isinstance(dct, dict):
|
|
return None
|
|
for key in keys:
|
|
if key in dct:
|
|
return dct[key]
|
|
sc = dct.get("scenario", {})
|
|
if isinstance(sc, dict):
|
|
for key in keys:
|
|
if key in sc:
|
|
return sc[key]
|
|
return None
|
|
|
|
sweep_val = _find_val(["ant_nav_az", "antenna_azimuth"], obj)
|
|
if sweep_val is not None:
|
|
try:
|
|
s_rad = float(sweep_val)
|
|
if abs(s_rad) > 7.0:
|
|
s_rad = math.radians(s_rad)
|
|
sweep_deg = math.degrees(s_rad)
|
|
self._hub.set_antenna_azimuth(sweep_deg, timestamp=time.monotonic())
|
|
except Exception as e:
|
|
self.logger.debug(f"Error processing antenna azimuth: {e}")
|
|
except Exception:
|
|
self.logger.exception("Error handling JSON payload")
|
|
|
|
def get_and_clear_latest_payloads(self) -> Dict[str, Any]:
|
|
with self._lock:
|
|
new_payloads, self._latest_payloads = self._latest_payloads, {}
|
|
return new_payloads
|
|
|
|
def update_raw_packet(self, raw_bytes: bytes, addr: tuple):
|
|
with self._lock:
|
|
self._last_raw_packet = (raw_bytes, addr)
|
|
entry = {"ts": datetime.datetime.utcnow(), "addr": addr, "raw": raw_bytes}
|
|
try:
|
|
hdr = SFPHeader.from_buffer_copy(raw_bytes)
|
|
entry.update(flow=int(hdr.SFP_FLOW), tid=int(hdr.SFP_TID))
|
|
flow_map = {
|
|
ord(c): n
|
|
for c, n in [
|
|
("M", "MFD"),
|
|
("S", "SAR"),
|
|
("B", "BIN"),
|
|
("J", "JSON"),
|
|
("R", "RIS"),
|
|
]
|
|
}
|
|
entry["flow_name"] = flow_map.get(entry["flow"], str(entry["flow"]))
|
|
except Exception:
|
|
pass
|
|
self._history.append(entry)
|
|
if self._persist:
|
|
try:
|
|
ts = entry["ts"].strftime("%Y%m%dT%H%M%S.%f")
|
|
fname = f"sfp_raw_{ts}_{addr[0].replace(':','_')}_{addr[1]}.bin"
|
|
with open(os.path.join(self._persist_dir, fname), "wb") as f:
|
|
f.write(raw_bytes)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_and_clear_raw_packet(self) -> Optional[tuple]:
|
|
with self._lock:
|
|
pkt, self._last_raw_packet = self._last_raw_packet, None
|
|
return pkt
|
|
|
|
def get_estimated_latency_s(self) -> float:
|
|
try:
|
|
if self._clock_sync:
|
|
return self._clock_sync.get_average_latency_s()
|
|
except Exception:
|
|
pass
|
|
return 0.0
|
|
|
|
def get_latency_samples(self, limit: Optional[int] = None) -> List[tuple]:
|
|
try:
|
|
if self._clock_sync:
|
|
samples = self._clock_sync.get_latency_history()
|
|
return samples[-limit:] if limit else samples
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
def get_latency_stats(self, sample_limit: int = 200) -> Dict[str, Any]:
|
|
try:
|
|
samples = self.get_latency_samples(limit=sample_limit)
|
|
if not samples:
|
|
return {"count": 0}
|
|
|
|
ms = [s[1] * 1000.0 for s in samples]
|
|
return {
|
|
"mean_ms": round(statistics.mean(ms), 3),
|
|
"std_ms": round(statistics.stdev(ms) if len(ms) > 1 else 0.0, 3),
|
|
"min_ms": round(min(ms), 3),
|
|
"max_ms": round(max(ms), 3),
|
|
"count": len(ms),
|
|
}
|
|
except Exception:
|
|
pass
|
|
return {"count": 0}
|
|
|
|
def get_history(self):
|
|
with self._lock:
|
|
return list(self._history)
|
|
|
|
def get_performance_samples(self):
|
|
result = (
|
|
list(self._perf_samples)
|
|
if self._profiling_enabled and self._perf_samples
|
|
else []
|
|
)
|
|
self.logger.debug(
|
|
f"get_performance_samples called: profiling={self._profiling_enabled}, samples_count={len(result)}"
|
|
)
|
|
return result
|
|
|
|
def _report_performance_stats(self):
|
|
try:
|
|
count = self._perf_counters["packet_count"]
|
|
if count == 0:
|
|
return
|
|
|
|
avg_parse = (self._perf_counters["parse_time_total"] / count) * 1000
|
|
avg_hub = (self._perf_counters["hub_update_time_total"] / count) * 1000
|
|
avg_archive = (self._perf_counters["archive_time_total"] / count) * 1000
|
|
avg_listener = (self._perf_counters["listener_time_total"] / count) * 1000
|
|
avg_clock = (self._perf_counters["clock_sync_time_total"] / count) * 1000
|
|
max_proc = self._perf_counters["max_processing_time"] * 1000
|
|
total_avg = avg_parse + avg_hub + avg_archive + avg_listener + avg_clock
|
|
|
|
self.logger.info(
|
|
f"[PERF] Pkts: {count} | Avg: {total_avg:.2f}ms "
|
|
f"(parse:{avg_parse:.2f} hub:{avg_hub:.2f} arch:{avg_archive:.2f} listen:{avg_listener:.2f} sync:{avg_clock:.2f}) | "
|
|
f"Max: {max_proc:.2f}ms"
|
|
)
|
|
|
|
self._perf_counters["packet_count"] = 0
|
|
self._perf_counters["parse_time_total"] = 0.0
|
|
self._perf_counters["hub_update_time_total"] = 0.0
|
|
self._perf_counters["archive_time_total"] = 0.0
|
|
self._perf_counters["listener_time_total"] = 0.0
|
|
self._perf_counters["clock_sync_time_total"] = 0.0
|
|
self._perf_counters["max_processing_time"] = 0.0
|
|
except Exception as e:
|
|
self.logger.debug(f"Error reporting performance stats: {e}")
|
|
|
|
def clear_history(self):
|
|
with self._lock:
|
|
self._history.clear()
|
|
|
|
def set_history_size(self, n: int):
|
|
with self._lock:
|
|
self._sfp_debug_history_size = max(1, int(n))
|
|
self._history = collections.deque(
|
|
self._history, maxlen=self._sfp_debug_history_size
|
|
)
|
|
|
|
def set_persist(self, enabled: bool):
|
|
with self._lock:
|
|
self._persist = bool(enabled)
|