# target_simulator/analysis/simulation_state_hub.py """ Defines the SimulationStateHub, a thread-safe data store for comparing simulated target states with real data received from the radar. """ import collections import threading from typing import Dict, Deque, Tuple, Optional, List # A state tuple can contain (timestamp, x, y, z, vx, vy, vz, ...) # For now, we focus on timestamp and position. TargetState = Tuple[float, float, float, float] class SimulationStateHub: """ A thread-safe hub to store and manage the history of simulated and real target states for performance analysis. """ def __init__(self, history_size: int = 200): """ Initializes the SimulationStateHub. Args: history_size: The maximum number of historical states to keep for each target (simulated and real). """ self._lock = threading.Lock() self._history_size = history_size self._target_data: Dict[int, Dict[str, Deque[TargetState]]] = {} def add_simulated_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]): """ Adds a new simulated state for a given target. Args: target_id: The ID of the target. timestamp: The local timestamp (e.g., from time.monotonic()) when the state was generated. state: A tuple representing the target's state (e.g., x_ft, y_ft, z_ft). """ with self._lock: if target_id not in self._target_data: self._initialize_target(target_id) # Prepend the timestamp to the state tuple full_state = (timestamp,) + state self._target_data[target_id]["simulated"].append(full_state) def add_real_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]): """ Adds a new real state received from the radar for a given target. Args: target_id: The ID of the target. timestamp: The timestamp from the radar or time of reception. state: A tuple representing the target's state (e.g., x_ft, y_ft, z_ft). """ with self._lock: if target_id not in self._target_data: self._initialize_target(target_id) full_state = (timestamp,) + state self._target_data[target_id]["real"].append(full_state) def get_target_history(self, target_id: int) -> Optional[Dict[str, List[TargetState]]]: """ Retrieves a copy of the historical data for a specific target. Args: target_id: The ID of the target. Returns: A dictionary containing lists of 'simulated' and 'real' states, or None if the target ID is not found. """ with self._lock: if target_id in self._target_data: return { "simulated": list(self._target_data[target_id]["simulated"]), "real": list(self._target_data[target_id]["real"]), } return None def get_all_target_ids(self) -> List[int]: """Returns a list of all target IDs currently being tracked.""" with self._lock: return list(self._target_data.keys()) def reset(self): """Clears all stored data for all targets.""" with self._lock: self._target_data.clear() def _initialize_target(self, target_id: int): """Internal helper to create the data structure for a new target.""" if target_id not in self._target_data: self._target_data[target_id] = { "simulated": collections.deque(maxlen=self._history_size), "real": collections.deque(maxlen=self._history_size), }