S1005403_RisCC/target_simulator/analysis/simulation_state_hub.py

102 lines
3.7 KiB
Python

# target_simulator/analysis/simulation_state_hub.py
"""
Defines the SimulationStateHub, a thread-safe data store for comparing
simulated target states with real data received from the radar.
"""
import collections
import threading
from typing import Dict, Deque, Tuple, Optional, List
# A state tuple can contain (timestamp, x, y, z, vx, vy, vz, ...)
# For now, we focus on timestamp and position.
TargetState = Tuple[float, float, float, float]
class SimulationStateHub:
"""
A thread-safe hub to store and manage the history of simulated and real
target states for performance analysis.
"""
def __init__(self, history_size: int = 200):
"""
Initializes the SimulationStateHub.
Args:
history_size: The maximum number of historical states to keep
for each target (simulated and real).
"""
self._lock = threading.Lock()
self._history_size = history_size
self._target_data: Dict[int, Dict[str, Deque[TargetState]]] = {}
def add_simulated_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""
Adds a new simulated state for a given target.
Args:
target_id: The ID of the target.
timestamp: The local timestamp (e.g., from time.monotonic()) when the state was generated.
state: A tuple representing the target's state (e.g., x_ft, y_ft, z_ft).
"""
with self._lock:
if target_id not in self._target_data:
self._initialize_target(target_id)
# Prepend the timestamp to the state tuple
full_state = (timestamp,) + state
self._target_data[target_id]["simulated"].append(full_state)
def add_real_state(self, target_id: int, timestamp: float, state: Tuple[float, ...]):
"""
Adds a new real state received from the radar for a given target.
Args:
target_id: The ID of the target.
timestamp: The timestamp from the radar or time of reception.
state: A tuple representing the target's state (e.g., x_ft, y_ft, z_ft).
"""
with self._lock:
if target_id not in self._target_data:
self._initialize_target(target_id)
full_state = (timestamp,) + state
self._target_data[target_id]["real"].append(full_state)
def get_target_history(self, target_id: int) -> Optional[Dict[str, List[TargetState]]]:
"""
Retrieves a copy of the historical data for a specific target.
Args:
target_id: The ID of the target.
Returns:
A dictionary containing lists of 'simulated' and 'real' states,
or None if the target ID is not found.
"""
with self._lock:
if target_id in self._target_data:
return {
"simulated": list(self._target_data[target_id]["simulated"]),
"real": list(self._target_data[target_id]["real"]),
}
return None
def get_all_target_ids(self) -> List[int]:
"""Returns a list of all target IDs currently being tracked."""
with self._lock:
return list(self._target_data.keys())
def reset(self):
"""Clears all stored data for all targets."""
with self._lock:
self._target_data.clear()
def _initialize_target(self, target_id: int):
"""Internal helper to create the data structure for a new target."""
if target_id not in self._target_data:
self._target_data[target_id] = {
"simulated": collections.deque(maxlen=self._history_size),
"real": collections.deque(maxlen=self._history_size),
}