# target_simulator/analysis/simulation_state_hub.py """ Defines the SimulationStateHub, a thread-safe data store for comparing simulated target states with real data received from the radar. """ import collections import threading import math import logging from typing import Dict, Deque, Tuple, Optional, List # A state tuple can contain (timestamp, x, y, z, vx, vy, vz, ...) # For now, we focus on timestamp and position in feet. TargetState = Tuple[float, float, float, float] class SimulationStateHub: """ A thread-safe hub to store and manage the history of simulated and real target states for performance analysis. """ def __init__(self, history_size: int = 200): """ Initializes the SimulationStateHub. Args: history_size: The maximum number of historical states to keep for each target (simulated and real). """ self._lock = threading.Lock() self._history_size = history_size self._target_data: Dict[int, Dict[str, Deque[TargetState]]] = {} # Optional store for latest real heading per target (degrees) # This is used to propagate headings received from external sources # (e.g. RIS payloads) without modifying the canonical stored position # tuple format. self._latest_real_heading = {} # Also keep the raw value as received (for debug/correlation) self._latest_raw_heading = {} def add_simulated_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]): """ Adds a new simulated state for a given target. Args: target_id: The ID of the target. timestamp: The local timestamp (e.g., from time.monotonic()) when the state was generated. state: A tuple representing the target's state (x_ft, y_ft, z_ft). """ with self._lock: if target_id not in self._target_data: self._initialize_target(target_id) # Prepend the timestamp to the state tuple full_state = (timestamp,) + state self._target_data[target_id]["simulated"].append(full_state) def add_real_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]): """ Adds a new real state received from the radar for a given target. Args: target_id: The ID of the target. timestamp: The timestamp from the radar or time of reception. state: A tuple representing the target's state (x_ft, y_ft, z_ft). """ with self._lock: if target_id not in self._target_data: self._initialize_target(target_id) full_state = (timestamp,) + state self._target_data[target_id]["real"].append(full_state) # Diagnostic logging: compute azimuth under both axis interpretations # to detect a possible 90-degree rotation due to swapped axes. try: logger = logging.getLogger(__name__) if logger.isEnabledFor(logging.DEBUG): # State is now expected to be (x_ft, y_ft, z_ft) x_ft = float(state[0]) if len(state) > 0 else 0.0 y_ft = float(state[1]) if len(state) > 1 else 0.0 z_ft = float(state[2]) if len(state) > 2 else 0.0 # Interpretation A: x => East, y => North (current standard) az_a = -math.degrees(math.atan2(x_ft, y_ft)) if (x_ft != 0 or y_ft != 0) else 0.0 # Interpretation B: swapped axes (x => North, y => East) az_b = -math.degrees(math.atan2(y_ft, x_ft)) if (x_ft != 0 or y_ft != 0) else 0.0 logger.debug( "[SimulationStateHub] add_real_state target=%s state_ft=(%.3f, %.3f, %.3f) az_a=%.3f az_b=%.3f", target_id, x_ft, y_ft, z_ft, az_a, az_b, ) except Exception: # Never allow diagnostic logging to break hub behavior pass def set_real_heading(self, target_id: int, heading_deg: float, raw_value: float = None): """ Store the latest real heading (in degrees) for a specific target id. This keeps the hub backwards-compatible (position tuples unchanged) while allowing the GUI to retrieve and display headings coming from external sources (RIS). """ # Store the heading exactly as provided (degrees). Do not perform # heuristics based on motion: the GUI should render the heading using # the convention 0 = North, +90 = left (West), -90 = right (East). with self._lock: try: tid = int(target_id) hdg = float(heading_deg) % 360 self._latest_real_heading[tid] = hdg if raw_value is not None: try: self._latest_raw_heading[tid] = float(raw_value) except Exception: # ignore invalid raw value pass except Exception: # On error, do nothing (silently ignore invalid heading) pass def get_real_heading(self, target_id: int) -> Optional[float]: """ Retrieve the last stored real heading for a target, or None if not set. """ with self._lock: return self._latest_real_heading.get(int(target_id)) def get_raw_heading(self, target_id: int) -> Optional[float]: """ Retrieve the last stored raw heading value for a target, or None if not set. """ with self._lock: return self._latest_raw_heading.get(int(target_id)) def get_target_history(self, target_id: int) -> Optional[Dict[str, List[TargetState]]]: """ Retrieves a copy of the historical data for a specific target. Args: target_id: The ID of the target. Returns: A dictionary containing lists of 'simulated' and 'real' states, or None if the target ID is not found. """ with self._lock: if target_id in self._target_data: return { "simulated": list(self._target_data[target_id]["simulated"]), "real": list(self._target_data[target_id]["real"]), } return None def get_all_target_ids(self) -> List[int]: """Returns a list of all target IDs currently being tracked.""" with self._lock: return list(self._target_data.keys()) def reset(self): """Clears all stored data for all targets.""" with self._lock: self._target_data.clear() # also clear heading caches self._latest_real_heading.clear() self._latest_raw_heading.clear() def _initialize_target(self, target_id: int): """Internal helper to create the data structure for a new target.""" if target_id not in self._target_data: self._target_data[target_id] = { "simulated": collections.deque(maxlen=self._history_size), "real": collections.deque(maxlen=self._history_size), } def remove_target(self, target_id: int): """Remove all stored data for a specific target id.""" with self._lock: try: tid = int(target_id) if tid in self._target_data: del self._target_data[tid] if tid in self._latest_real_heading: del self._latest_real_heading[tid] if tid in self._latest_raw_heading: del self._latest_raw_heading[tid] except Exception: pass def clear_real_target_data(self, target_id: int): """ Clears only the real data history and heading caches for a specific target, preserving the simulated data history for analysis. """ with self._lock: try: tid = int(target_id) if tid in self._target_data: self._target_data[tid]["real"].clear() # Also clear heading caches associated with this real target if tid in self._latest_real_heading: del self._latest_real_heading[tid] if tid in self._latest_raw_heading: del self._latest_raw_heading[tid] except Exception: # Silently ignore errors (e.g., invalid target_id type) pass