# target_simulator/utils/latency_monitor.py """ Provides a LatencyMonitor for actively measuring network latency using SYNC packets. """ import tkinter as tk import time import random import statistics from collections import deque from typing import Optional from target_simulator.core.sfp_communicator import SFPCommunicator from target_simulator.gui.payload_router import DebugPayloadRouter class LatencyMonitor: """ Manages periodic SYNC requests to actively measure and average network latency. This utility runs on the Tkinter event loop, sending a SYNC packet at a configured interval and processing replies to maintain a moving average of the one-way latency (RTT/2). """ def __init__( self, master: tk.Tk, router: DebugPayloadRouter, communicator: SFPCommunicator, interval_ms: int = 1000, history_size: int = 20, ): """ Initializes the LatencyMonitor. Args: master: The root Tkinter window, used for scheduling 'after' events. router: The DebugPayloadRouter to get SYNC replies from. communicator: The SFPCommunicator to send SYNC requests with. interval_ms: The interval in milliseconds between SYNC requests. history_size: The number of recent latency samples to average. """ self.master = master self.router = router self.communicator = communicator self.interval_ms = interval_ms self._pending_requests = {} # {cookie: send_timestamp} self._latency_history = deque(maxlen=history_size) self._is_running = False self._after_id_send = None self._after_id_process = None def start(self): """Starts the periodic sending and processing loops.""" if self._is_running: return self._is_running = True self._schedule_send() self._schedule_process() def stop(self): """Stops the periodic loops.""" if not self._is_running: return self._is_running = False if self._after_id_send: self.master.after_cancel(self._after_id_send) self._after_id_send = None if self._after_id_process: self.master.after_cancel(self._after_id_process) self._after_id_process = None def _schedule_send(self): """Schedules the next SYNC packet send.""" if not self._is_running: return self._send_sync_request() self._after_id_send = self.master.after(self.interval_ms, self._schedule_send) def _schedule_process(self): """Schedules the next reply processing check.""" if not self._is_running: return self._process_replies() # Process replies more frequently than sending requests self._after_id_process = self.master.after(100, self._schedule_process) def _send_sync_request(self): """Sends a single SYNC request if the communicator is connected.""" if not self.communicator or not self.communicator.is_open: return cookie = random.randint(0, 2**32 - 1) send_time = time.monotonic() if self.communicator.send_sync_request(cookie): self._pending_requests[cookie] = send_time def _process_replies(self): """Processes all available SYNC replies from the router's queue.""" while True: result = self.router.get_sync_result() if not result: break # No more results in the queue cookie = result.get("cookie") reception_time = result.get("reception_timestamp") if cookie in self._pending_requests: send_time = self._pending_requests.pop(cookie) rtt_s = reception_time - send_time latency_s = rtt_s / 2.0 if latency_s >= 0: self._latency_history.append(latency_s) def get_stable_latency_s(self) -> float: """ Returns the moving average of the one-way latency in seconds. Returns 0.0 if not enough data is available. """ if not self._latency_history: return 0.0 try: return statistics.mean(self._latency_history) except statistics.StatisticsError: return 0.0