S1005403_RisCC/target_simulator/analysis/simulation_state_hub.py

196 lines
7.8 KiB
Python

# target_simulator/analysis/simulation_state_hub.py
"""
Defines the SimulationStateHub, a thread-safe data store for comparing
simulated target states with real data received from the radar.
"""
import collections
import threading
import math
import logging
from typing import Dict, Deque, Tuple, Optional, List
# A state tuple can contain (timestamp, x, y, z, vx, vy, vz, ...)
# For now, we focus on timestamp and position in feet.
TargetState = Tuple[float, float, float, float]
class SimulationStateHub:
"""
A thread-safe hub to store and manage the history of simulated and real
target states for performance analysis.
"""
def __init__(self, history_size: int = 200):
"""
Initializes the SimulationStateHub.
Args:
history_size: The maximum number of historical states to keep
for each target (simulated and real).
"""
self._lock = threading.Lock()
self._history_size = history_size
self._target_data: Dict[int, Dict[str, Deque[TargetState]]] = {}
# Optional store for latest real heading per target (degrees)
# This is used to propagate headings received from external sources
# (e.g. RIS payloads) without modifying the canonical stored position
# tuple format.
self._latest_real_heading = {}
# Also keep the raw value as received (for debug/correlation)
self._latest_raw_heading = {}
def add_simulated_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""
Adds a new simulated state for a given target.
Args:
target_id: The ID of the target.
timestamp: The local timestamp (e.g., from time.monotonic()) when the state was generated.
state: A tuple representing the target's state (x_ft, y_ft, z_ft).
"""
with self._lock:
if target_id not in self._target_data:
self._initialize_target(target_id)
# Prepend the timestamp to the state tuple
full_state = (timestamp,) + state
self._target_data[target_id]["simulated"].append(full_state)
def add_real_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""
Adds a new real state received from the radar for a given target.
Args:
target_id: The ID of the target.
timestamp: The timestamp from the radar or time of reception.
state: A tuple representing the target's state (x_ft, y_ft, z_ft).
"""
with self._lock:
if target_id not in self._target_data:
self._initialize_target(target_id)
full_state = (timestamp,) + state
self._target_data[target_id]["real"].append(full_state)
# Diagnostic logging: compute azimuth under both axis interpretations
# to detect a possible 90-degree rotation due to swapped axes.
try:
logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
# State is now expected to be (x_ft, y_ft, z_ft)
x_ft = float(state[0]) if len(state) > 0 else 0.0
y_ft = float(state[1]) if len(state) > 1 else 0.0
z_ft = float(state[2]) if len(state) > 2 else 0.0
# Interpretation A: x => East, y => North (current standard)
az_a = -math.degrees(math.atan2(x_ft, y_ft)) if (x_ft != 0 or y_ft != 0) else 0.0
# Interpretation B: swapped axes (x => North, y => East)
az_b = -math.degrees(math.atan2(y_ft, x_ft)) if (x_ft != 0 or y_ft != 0) else 0.0
logger.debug(
"[SimulationStateHub] add_real_state target=%s state_ft=(%.3f, %.3f, %.3f) az_a=%.3f az_b=%.3f",
target_id,
x_ft,
y_ft,
z_ft,
az_a,
az_b,
)
except Exception:
# Never allow diagnostic logging to break hub behavior
pass
def set_real_heading(self, target_id: int, heading_deg: float, raw_value: float = None):
"""
Store the latest real heading (in degrees) for a specific target id.
This keeps the hub backwards-compatible (position tuples unchanged)
while allowing the GUI to retrieve and display headings coming from
external sources (RIS).
"""
# Store the heading exactly as provided (degrees). Do not perform
# heuristics based on motion: the GUI should render the heading using
# the convention 0 = North, +90 = left (West), -90 = right (East).
with self._lock:
try:
tid = int(target_id)
hdg = float(heading_deg) % 360
self._latest_real_heading[tid] = hdg
if raw_value is not None:
try:
self._latest_raw_heading[tid] = float(raw_value)
except Exception:
# ignore invalid raw value
pass
except Exception:
# On error, do nothing (silently ignore invalid heading)
pass
def get_real_heading(self, target_id: int) -> Optional[float]:
"""
Retrieve the last stored real heading for a target, or None if not set.
"""
with self._lock:
return self._latest_real_heading.get(int(target_id))
def get_raw_heading(self, target_id: int) -> Optional[float]:
"""
Retrieve the last stored raw heading value for a target, or None if not set.
"""
with self._lock:
return self._latest_raw_heading.get(int(target_id))
def get_target_history(self, target_id: int) -> Optional[Dict[str, List[TargetState]]]:
"""
Retrieves a copy of the historical data for a specific target.
Args:
target_id: The ID of the target.
Returns:
A dictionary containing lists of 'simulated' and 'real' states,
or None if the target ID is not found.
"""
with self._lock:
if target_id in self._target_data:
return {
"simulated": list(self._target_data[target_id]["simulated"]),
"real": list(self._target_data[target_id]["real"]),
}
return None
def get_all_target_ids(self) -> List[int]:
"""Returns a list of all target IDs currently being tracked."""
with self._lock:
return list(self._target_data.keys())
def reset(self):
"""Clears all stored data for all targets."""
with self._lock:
self._target_data.clear()
# also clear heading caches
self._latest_real_heading.clear()
self._latest_raw_heading.clear()
def _initialize_target(self, target_id: int):
"""Internal helper to create the data structure for a new target."""
if target_id not in self._target_data:
self._target_data[target_id] = {
"simulated": collections.deque(maxlen=self._history_size),
"real": collections.deque(maxlen=self._history_size),
}
def remove_target(self, target_id: int):
"""Remove all stored data for a specific target id."""
with self._lock:
try:
tid = int(target_id)
if tid in self._target_data:
del self._target_data[tid]
if tid in self._latest_real_heading:
del self._latest_real_heading[tid]
if tid in self._latest_raw_heading:
del self._latest_raw_heading[tid]
except Exception:
pass