S1005403_RisCC/target_simulator/analysis/simulation_state_hub.py
2025-10-27 15:45:04 +01:00

230 lines
9.2 KiB
Python

# target_simulator/analysis/simulation_state_hub.py
"""
Defines the SimulationStateHub, a thread-safe data store for comparing
simulated target states with real data received from the radar.
"""
import collections
import threading
import math
import logging
from typing import Dict, Deque, Tuple, Optional, List
# A state tuple can contain (timestamp, x, y, z, vx, vy, vz, ...)
# For now, we focus on timestamp and position in feet.
TargetState = Tuple[float, float, float, float]
class SimulationStateHub:
"""
A thread-safe hub to store and manage the history of simulated and real
target states for performance analysis.
"""
def __init__(self, history_size: int = 200):
"""
Initializes the SimulationStateHub.
Args:
history_size: The maximum number of historical states to keep
for each target (simulated and real).
"""
self._lock = threading.Lock()
self._history_size = history_size
self._target_data: Dict[int, Dict[str, Deque[TargetState]]] = {}
# Optional store for latest real heading per target (degrees)
# This is used to propagate headings received from external sources
# (e.g. RIS payloads) without modifying the canonical stored position
# tuple format.
self._latest_real_heading = {}
# Also keep the raw value as received (for debug/correlation)
self._latest_raw_heading = {}
def add_simulated_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""
Adds a new simulated state for a given target.
Args:
target_id: The ID of the target.
timestamp: The local timestamp (e.g., from time.monotonic()) when the state was generated.
state: A tuple representing the target's state (x_ft, y_ft, z_ft).
"""
with self._lock:
if target_id not in self._target_data:
self._initialize_target(target_id)
# Prepend the timestamp to the state tuple
full_state = (timestamp,) + state
self._target_data[target_id]["simulated"].append(full_state)
def add_real_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""
Adds a new real state received from the radar for a given target.
Args:
target_id: The ID of the target.
timestamp: The timestamp from the radar or time of reception.
state: A tuple representing the target's state (x_ft, y_ft, z_ft).
"""
with self._lock:
if target_id not in self._target_data:
self._initialize_target(target_id)
full_state = (timestamp,) + state
self._target_data[target_id]["real"].append(full_state)
# Diagnostic logging: compute azimuth under both axis interpretations
# to detect a possible 90-degree rotation due to swapped axes.
try:
logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
# State is now expected to be (x_ft, y_ft, z_ft)
x_ft = float(state[0]) if len(state) > 0 else 0.0
y_ft = float(state[1]) if len(state) > 1 else 0.0
z_ft = float(state[2]) if len(state) > 2 else 0.0
# Interpretation A: x => East, y => North (current standard)
az_a = -math.degrees(math.atan2(x_ft, y_ft)) if (x_ft != 0 or y_ft != 0) else 0.0
# Interpretation B: swapped axes (x => North, y => East)
az_b = -math.degrees(math.atan2(y_ft, x_ft)) if (x_ft != 0 or y_ft != 0) else 0.0
logger.debug(
"[SimulationStateHub] add_real_state target=%s state_ft=(%.3f, %.3f, %.3f) az_a=%.3f az_b=%.3f",
target_id,
x_ft,
y_ft,
z_ft,
az_a,
az_b,
)
except Exception:
# Never allow diagnostic logging to break hub behavior
pass
def set_real_heading(self, target_id: int, heading_deg: float, raw_value: float = None):
"""
Store the latest real heading (in degrees) for a specific target id.
This keeps the hub backwards-compatible (position tuples unchanged)
while allowing the GUI to retrieve and display headings coming from
external sources (RIS).
"""
# Store the heading exactly as provided (degrees). Do not perform
# heuristics based on motion: the GUI should render the heading using
# the convention 0 = North, +90 = left (West), -90 = right (East).
with self._lock:
try:
tid = int(target_id)
hdg = float(heading_deg) % 360
self._latest_real_heading[tid] = hdg
if raw_value is not None:
try:
self._latest_raw_heading[tid] = float(raw_value)
except Exception:
# ignore invalid raw value
pass
except Exception:
# On error, do nothing (silently ignore invalid heading)
pass
def get_real_heading(self, target_id: int) -> Optional[float]:
"""
Retrieve the last stored real heading for a target, or None if not set.
"""
with self._lock:
return self._latest_real_heading.get(int(target_id))
def get_raw_heading(self, target_id: int) -> Optional[float]:
"""
Retrieve the last stored raw heading value for a target, or None if not set.
"""
with self._lock:
return self._latest_raw_heading.get(int(target_id))
def get_target_history(self, target_id: int) -> Optional[Dict[str, List[TargetState]]]:
"""
Retrieves a copy of the historical data for a specific target.
Args:
target_id: The ID of the target.
Returns:
A dictionary containing lists of 'simulated' and 'real' states,
or None if the target ID is not found.
"""
with self._lock:
if target_id in self._target_data:
return {
"simulated": list(self._target_data[target_id]["simulated"]),
"real": list(self._target_data[target_id]["real"]),
}
return None
def get_all_target_ids(self) -> List[int]:
"""Returns a list of all target IDs currently being tracked."""
with self._lock:
return list(self._target_data.keys())
def has_active_real_targets(self) -> bool:
"""
Checks if there is any real target data currently stored in the hub.
Returns:
True if at least one target has a non-empty 'real' data history,
False otherwise.
"""
with self._lock:
for target_info in self._target_data.values():
if target_info.get("real"): # Check if the 'real' deque is not empty
return True
return False
def reset(self):
"""Clears all stored data for all targets."""
with self._lock:
self._target_data.clear()
# also clear heading caches
self._latest_real_heading.clear()
self._latest_raw_heading.clear()
def _initialize_target(self, target_id: int):
"""Internal helper to create the data structure for a new target."""
if target_id not in self._target_data:
self._target_data[target_id] = {
"simulated": collections.deque(maxlen=self._history_size),
"real": collections.deque(maxlen=self._history_size),
}
def remove_target(self, target_id: int):
"""Remove all stored data for a specific target id."""
with self._lock:
try:
tid = int(target_id)
if tid in self._target_data:
del self._target_data[tid]
if tid in self._latest_real_heading:
del self._latest_real_heading[tid]
if tid in self._latest_raw_heading:
del self._latest_raw_heading[tid]
except Exception:
pass
def clear_real_target_data(self, target_id: int):
"""
Clears only the real data history and heading caches for a specific target,
preserving the simulated data history for analysis.
"""
with self._lock:
try:
tid = int(target_id)
if tid in self._target_data:
self._target_data[tid]["real"].clear()
# Also clear heading caches associated with this real target
if tid in self._latest_real_heading:
del self._latest_real_heading[tid]
if tid in self._latest_raw_heading:
del self._latest_raw_heading[tid]
except Exception:
# Silently ignore errors (e.g., invalid target_id type)
pass