583 lines
22 KiB
Python
583 lines
22 KiB
Python
# target_simulator/analysis/simulation_state_hub.py
|
|
"""
|
|
Defines the SimulationStateHub, a thread-safe data store for comparing
|
|
simulated target states with real data received from the radar.
|
|
|
|
Performance Notes:
|
|
- Uses a single threading.Lock for all operations (simple but potential contention)
|
|
- Future optimization: implement double-buffering to separate read/write paths
|
|
(GUI reads from read_buffer, simulation/network write to write_buffer)
|
|
- Current design prioritizes correctness over maximum throughput
|
|
"""
|
|
|
|
import collections
|
|
import threading
|
|
import math
|
|
import logging
|
|
import time
|
|
from typing import Dict, Deque, Tuple, Optional, List, Any
|
|
|
|
# Module-level logger for this module
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# A state tuple can contain (timestamp, x, y, z, vx, vy, vz, ...)
|
|
# For now, we focus on timestamp and position in feet.
|
|
TargetState = Tuple[float, float, float, float]
|
|
|
|
|
|
class SimulationStateHub:
|
|
"""
|
|
A thread-safe hub to store and manage the history of simulated and real
|
|
target states for performance analysis.
|
|
|
|
Thread Safety - Double-Buffering Architecture:
|
|
- WRITE BUFFER: Used by simulation/network threads (lock-free writes)
|
|
- READ BUFFER: Used by GUI thread (lock-free reads)
|
|
- SWAP: Periodic copy write→read with minimal lock (~1ms every 40ms)
|
|
|
|
This eliminates lock contention between GUI and critical paths (simulation/network).
|
|
GUI reads from a stable buffer while simulation/network write to a separate buffer.
|
|
"""
|
|
|
|
def __init__(self, history_size: int = 200):
|
|
"""
|
|
Initializes the SimulationStateHub with double-buffering.
|
|
|
|
Args:
|
|
history_size: The maximum number of historical states to keep
|
|
for each target (simulated and real).
|
|
"""
|
|
# Double-buffering: separate write and read buffers
|
|
self._write_buffer = self._create_empty_buffer()
|
|
self._read_buffer = self._create_empty_buffer()
|
|
|
|
# Lock ONLY for buffer swap (used ~1ms every 40ms)
|
|
self._swap_lock = threading.Lock()
|
|
|
|
# Original lock for backward compatibility (will be phased out)
|
|
self._lock = self._swap_lock # Alias for now
|
|
|
|
self._history_size = history_size
|
|
|
|
# Swap statistics for monitoring
|
|
self._swap_count = 0
|
|
self._last_swap_time = time.monotonic()
|
|
|
|
def _create_empty_buffer(self) -> Dict[str, Any]:
|
|
"""Create an empty buffer structure with all necessary fields."""
|
|
return {
|
|
'target_data': {}, # Dict[int, Dict[str, Deque[TargetState]]]
|
|
'ownship_state': {},
|
|
'simulation_origin_state': {},
|
|
'latest_real_heading': {},
|
|
'latest_raw_heading': {},
|
|
'real_event_timestamps': collections.deque(maxlen=10000),
|
|
'real_packet_timestamps': collections.deque(maxlen=10000),
|
|
'antenna_azimuth_deg': None,
|
|
'antenna_azimuth_ts': None,
|
|
'last_real_summary_time': time.monotonic(),
|
|
'real_summary_interval_s': 1.0,
|
|
}
|
|
|
|
def swap_buffers(self) -> float:
|
|
"""
|
|
Swap write and read buffers atomically. Called by GUI thread before reading.
|
|
|
|
This is the ONLY operation that needs a lock. All other operations are lock-free.
|
|
|
|
Returns:
|
|
float: Time taken for swap in seconds (should be <1ms)
|
|
"""
|
|
swap_start = time.perf_counter()
|
|
|
|
with self._swap_lock:
|
|
# Deep copy write buffer to read buffer
|
|
# Note: We copy only the data GUI needs, not the full deques
|
|
self._read_buffer['target_data'] = self._shallow_copy_target_data(
|
|
self._write_buffer['target_data']
|
|
)
|
|
self._read_buffer['ownship_state'] = dict(self._write_buffer['ownship_state'])
|
|
self._read_buffer['simulation_origin_state'] = dict(
|
|
self._write_buffer['simulation_origin_state']
|
|
)
|
|
self._read_buffer['latest_real_heading'] = dict(
|
|
self._write_buffer['latest_real_heading']
|
|
)
|
|
self._read_buffer['latest_raw_heading'] = dict(
|
|
self._write_buffer['latest_raw_heading']
|
|
)
|
|
self._read_buffer['antenna_azimuth_deg'] = self._write_buffer['antenna_azimuth_deg']
|
|
self._read_buffer['antenna_azimuth_ts'] = self._write_buffer['antenna_azimuth_ts']
|
|
|
|
# Rate computation data - copy references (deques are thread-safe for append)
|
|
self._read_buffer['real_event_timestamps'] = self._write_buffer['real_event_timestamps']
|
|
self._read_buffer['real_packet_timestamps'] = self._write_buffer['real_packet_timestamps']
|
|
|
|
self._swap_count += 1
|
|
|
|
swap_elapsed = time.perf_counter() - swap_start
|
|
self._last_swap_time = time.monotonic()
|
|
|
|
# Log slow swaps (should never happen with proper implementation)
|
|
if swap_elapsed > 0.003: # 3ms
|
|
logger.warning(f"Slow buffer swap: {swap_elapsed*1000:.2f}ms")
|
|
|
|
return swap_elapsed
|
|
|
|
def _shallow_copy_target_data(
|
|
self, source: Dict[int, Dict[str, Deque[TargetState]]]
|
|
) -> Dict[int, Dict[str, Deque[TargetState]]]:
|
|
"""
|
|
Create a shallow copy of target data for GUI consumption.
|
|
|
|
Only copies the last N states (GUI doesn't need full history).
|
|
"""
|
|
result = {}
|
|
for target_id, data_dict in source.items():
|
|
result[target_id] = {
|
|
'simulated': collections.deque(
|
|
list(data_dict.get('simulated', []))[-20:], # Last 20 states
|
|
maxlen=20
|
|
),
|
|
'real': collections.deque(
|
|
list(data_dict.get('real', []))[-20:], # Last 20 states
|
|
maxlen=20
|
|
),
|
|
}
|
|
return result
|
|
|
|
def get_swap_stats(self) -> Dict[str, Any]:
|
|
"""Return statistics about buffer swaps for performance monitoring."""
|
|
return {
|
|
'swap_count': self._swap_count,
|
|
'last_swap_time': self._last_swap_time,
|
|
'time_since_last_swap': time.monotonic() - self._last_swap_time,
|
|
}
|
|
|
|
def add_simulated_state(
|
|
self, target_id: int, timestamp: float, state: Tuple[float, ...]
|
|
):
|
|
"""
|
|
Adds a new simulated state for a given target.
|
|
|
|
Args:
|
|
target_id: The ID of the target.
|
|
timestamp: The local timestamp (e.g., from time.monotonic()) when the state was generated.
|
|
state: A tuple representing the target's state (x_ft, y_ft, z_ft).
|
|
"""
|
|
with self._lock:
|
|
if target_id not in self._target_data:
|
|
self._initialize_target(target_id)
|
|
|
|
# Prepend the timestamp to the state tuple
|
|
full_state = (timestamp,) + state
|
|
self._target_data[target_id]["simulated"].append(full_state)
|
|
|
|
def add_real_state(
|
|
self, target_id: int, timestamp: float, state: Tuple[float, ...]
|
|
):
|
|
"""
|
|
Adds a new real state received from the radar for a given target.
|
|
|
|
Args:
|
|
target_id: The ID of the target.
|
|
timestamp: The timestamp from the radar or time of reception.
|
|
state: A tuple representing the target's state (x_ft, y_ft, z_ft).
|
|
"""
|
|
with self._lock:
|
|
if target_id not in self._target_data:
|
|
self._initialize_target(target_id)
|
|
|
|
full_state = (timestamp,) + state
|
|
self._target_data[target_id]["real"].append(full_state)
|
|
|
|
# Record arrival time (monotonic) for rate computation
|
|
try:
|
|
now = time.monotonic()
|
|
self._real_event_timestamps.append(now)
|
|
except Exception:
|
|
# non-fatal - do not interrupt hub behavior
|
|
now = None
|
|
|
|
# Only compute/emit per-event diagnostic information when DEBUG
|
|
# level is enabled. For normal operation, avoid expensive math and
|
|
# per-event debug logs. Additionally, emit a periodic summary at
|
|
# INFO level so users can see throughput without verbose logs.
|
|
try:
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
# State is now expected to be (x_ft, y_ft, z_ft)
|
|
x_ft = float(state[0]) if len(state) > 0 else 0.0
|
|
y_ft = float(state[1]) if len(state) > 1 else 0.0
|
|
z_ft = float(state[2]) if len(state) > 2 else 0.0
|
|
|
|
# Interpretation A: x => East, y => North (current standard)
|
|
az_a = (
|
|
-math.degrees(math.atan2(x_ft, y_ft))
|
|
if (x_ft != 0 or y_ft != 0)
|
|
else 0.0
|
|
)
|
|
|
|
# Interpretation B: swapped axes (x => North, y => East)
|
|
az_b = (
|
|
-math.degrees(math.atan2(y_ft, x_ft))
|
|
if (x_ft != 0 or y_ft != 0)
|
|
else 0.0
|
|
)
|
|
|
|
logger.debug(
|
|
"[SimulationStateHub] add_real_state target=%s state_ft=(%.3f, %.3f, %.3f) az_a=%.3f az_b=%.3f",
|
|
target_id,
|
|
x_ft,
|
|
y_ft,
|
|
z_ft,
|
|
az_a,
|
|
az_b,
|
|
)
|
|
|
|
# Periodic (throttled) throughput summary so the logs still
|
|
# provide visibility without the per-event flood. This uses
|
|
# monotonic time to avoid issues with system clock changes.
|
|
if (
|
|
now is not None
|
|
and (now - self._last_real_summary_time)
|
|
>= self._real_summary_interval_s
|
|
):
|
|
rate = self.get_real_rate(
|
|
window_seconds=self._real_summary_interval_s
|
|
)
|
|
# try:
|
|
# logger.info(
|
|
# "[SimulationStateHub] real states: recent_rate=%.1f ev/s total_targets=%d",
|
|
# rate,
|
|
# len(self._target_data),
|
|
# )
|
|
# except Exception:
|
|
# # never allow logging to raise
|
|
# pass
|
|
self._last_real_summary_time = now
|
|
except Exception:
|
|
# Never allow diagnostic/logging instrumentation to break hub behavior
|
|
pass
|
|
|
|
def get_real_rate(self, window_seconds: float = 1.0) -> float:
|
|
"""
|
|
Returns an approximate events/sec rate of real states received over the
|
|
last `window_seconds`. Uses monotonic timestamps recorded on receipt.
|
|
"""
|
|
try:
|
|
now = time.monotonic()
|
|
cutoff = now - float(window_seconds)
|
|
# Count timestamps >= cutoff
|
|
count = 0
|
|
# Iterate from the right (newest) backwards until below cutoff
|
|
for ts in reversed(self._real_event_timestamps):
|
|
if ts >= cutoff:
|
|
count += 1
|
|
else:
|
|
break
|
|
return count / float(window_seconds) if window_seconds > 0 else float(count)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def add_real_packet(self, timestamp: Optional[float] = None):
|
|
"""
|
|
Record the arrival of a packet (containing potentially many target updates).
|
|
|
|
Args:
|
|
timestamp: optional monotonic timestamp to use (defaults to now).
|
|
"""
|
|
try:
|
|
ts = float(timestamp) if timestamp is not None else time.monotonic()
|
|
self._real_packet_timestamps.append(ts)
|
|
except Exception:
|
|
# silently ignore errors -- non-critical
|
|
pass
|
|
|
|
def get_packet_rate(self, window_seconds: float = 1.0) -> float:
|
|
"""
|
|
Returns an approximate packets/sec rate based on recorded packet arrival
|
|
timestamps over the last `window_seconds`.
|
|
"""
|
|
try:
|
|
now = time.monotonic()
|
|
cutoff = now - float(window_seconds)
|
|
count = 0
|
|
for ts in reversed(self._real_packet_timestamps):
|
|
if ts >= cutoff:
|
|
count += 1
|
|
else:
|
|
break
|
|
return count / float(window_seconds) if window_seconds > 0 else float(count)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def set_real_heading(
|
|
self, target_id: int, heading_deg: float, raw_value: float = None
|
|
):
|
|
"""
|
|
Store the latest real heading (in degrees) for a specific target id.
|
|
This keeps the hub backwards-compatible (position tuples unchanged)
|
|
while allowing the GUI to retrieve and display headings coming from
|
|
external sources (RIS).
|
|
"""
|
|
# Store the heading exactly as provided (degrees). Do not perform
|
|
# heuristics based on motion: the GUI should render the heading using
|
|
# the convention 0 = North, +90 = left (West), -90 = right (East).
|
|
with self._lock:
|
|
try:
|
|
tid = int(target_id)
|
|
hdg = float(heading_deg) % 360
|
|
self._latest_real_heading[tid] = hdg
|
|
if raw_value is not None:
|
|
try:
|
|
self._latest_raw_heading[tid] = float(raw_value)
|
|
except Exception:
|
|
# ignore invalid raw value
|
|
pass
|
|
except Exception:
|
|
# On error, do nothing (silently ignore invalid heading)
|
|
pass
|
|
|
|
def set_antenna_azimuth(
|
|
self, azimuth_deg: float, timestamp: Optional[float] = None
|
|
):
|
|
"""
|
|
Store the latest antenna (platform) azimuth in degrees along with an
|
|
optional monotonic timestamp. The GUI can retrieve this to render the
|
|
antenna orientation and smoothly animate between updates.
|
|
|
|
Args:
|
|
azimuth_deg: Azimuth value in degrees (0 = North, positive CCW)
|
|
timestamp: Optional monotonic timestamp (defaults to time.monotonic())
|
|
"""
|
|
try:
|
|
ts = float(timestamp) if timestamp is not None else time.monotonic()
|
|
az = float(azimuth_deg) % 360
|
|
except Exception:
|
|
return
|
|
with self._lock:
|
|
self._antenna_azimuth_deg = az
|
|
self._antenna_azimuth_ts = ts
|
|
try:
|
|
# Debug log to aid diagnosis when antenna updates arrive
|
|
logger.debug(
|
|
"[SimulationStateHub] set_antenna_azimuth: az=%.3f ts=%s hub_id=%s",
|
|
az,
|
|
ts,
|
|
id(self),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_antenna_azimuth(self) -> Tuple[Optional[float], Optional[float]]:
|
|
"""
|
|
Returns a tuple (azimuth_deg, timestamp) representing the last-known
|
|
antenna azimuth and the monotonic timestamp when it was recorded.
|
|
If not available, returns (None, None).
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
logger.debug(
|
|
"[SimulationStateHub] get_antenna_azimuth -> az=%s ts=%s hub_id=%s",
|
|
self._antenna_azimuth_deg,
|
|
self._antenna_azimuth_ts,
|
|
id(self),
|
|
)
|
|
except Exception:
|
|
pass
|
|
return (self._antenna_azimuth_deg, self._antenna_azimuth_ts)
|
|
|
|
# Backwards-compatible aliases for existing callers. These delegate to the
|
|
# new antenna-named methods so external code using the older names continues
|
|
# to work.
|
|
def set_platform_azimuth(
|
|
self, azimuth_deg: float, timestamp: Optional[float] = None
|
|
):
|
|
try:
|
|
self.set_antenna_azimuth(azimuth_deg, timestamp=timestamp)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_platform_azimuth(self) -> Tuple[Optional[float], Optional[float]]:
|
|
try:
|
|
return self.get_antenna_azimuth()
|
|
except Exception:
|
|
return (None, None)
|
|
|
|
def get_real_heading(self, target_id: int) -> Optional[float]:
|
|
"""
|
|
Retrieve the last stored real heading for a target, or None if not set.
|
|
"""
|
|
with self._lock:
|
|
return self._latest_real_heading.get(int(target_id))
|
|
|
|
def get_raw_heading(self, target_id: int) -> Optional[float]:
|
|
"""
|
|
Retrieve the last stored raw heading value for a target, or None if not set.
|
|
"""
|
|
with self._lock:
|
|
return self._latest_raw_heading.get(int(target_id))
|
|
|
|
def get_target_history(
|
|
self, target_id: int
|
|
) -> Optional[Dict[str, List[TargetState]]]:
|
|
"""
|
|
Retrieves a copy of the historical data for a specific target.
|
|
|
|
Args:
|
|
target_id: The ID of the target.
|
|
|
|
Returns:
|
|
A dictionary containing lists of 'simulated' and 'real' states,
|
|
or None if the target ID is not found.
|
|
"""
|
|
with self._lock:
|
|
if target_id in self._target_data:
|
|
return {
|
|
"simulated": list(self._target_data[target_id]["simulated"]),
|
|
"real": list(self._target_data[target_id]["real"]),
|
|
}
|
|
return None
|
|
|
|
def get_all_target_ids(self) -> List[int]:
|
|
"""Returns a list of all target IDs currently being tracked."""
|
|
with self._lock:
|
|
return list(self._target_data.keys())
|
|
|
|
def has_active_real_targets(self) -> bool:
|
|
"""
|
|
Checks if there is any real target data currently stored in the hub.
|
|
|
|
Returns:
|
|
True if at least one target has a non-empty 'real' data history,
|
|
False otherwise.
|
|
"""
|
|
with self._lock:
|
|
for target_info in self._target_data.values():
|
|
if target_info.get("real"): # Check if the 'real' deque is not empty
|
|
return True
|
|
return False
|
|
|
|
def reset(self):
|
|
"""Clears all stored data for all targets."""
|
|
with self._lock:
|
|
self._target_data.clear()
|
|
self._ownship_state.clear()
|
|
self._simulation_origin_state.clear()
|
|
# also clear heading caches
|
|
self._latest_real_heading.clear()
|
|
self._latest_raw_heading.clear()
|
|
self._antenna_azimuth_deg = None
|
|
self._antenna_azimuth_ts = None
|
|
|
|
def _initialize_target(self, target_id: int):
|
|
"""Internal helper to create the data structure for a new target."""
|
|
if target_id not in self._target_data:
|
|
self._target_data[target_id] = {
|
|
"simulated": collections.deque(maxlen=self._history_size),
|
|
"real": collections.deque(maxlen=self._history_size),
|
|
}
|
|
|
|
def remove_target(self, target_id: int):
|
|
"""Remove all stored data for a specific target id."""
|
|
with self._lock:
|
|
try:
|
|
tid = int(target_id)
|
|
if tid in self._target_data:
|
|
del self._target_data[tid]
|
|
if tid in self._latest_real_heading:
|
|
del self._latest_real_heading[tid]
|
|
if tid in self._latest_raw_heading:
|
|
del self._latest_raw_heading[tid]
|
|
except Exception:
|
|
pass
|
|
|
|
def clear_real_target_data(self, target_id: int):
|
|
"""
|
|
Clears only the real data history and heading caches for a specific target,
|
|
preserving the simulated data history for analysis.
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
tid = int(target_id)
|
|
if tid in self._target_data:
|
|
self._target_data[tid]["real"].clear()
|
|
|
|
# Also clear heading caches associated with this real target
|
|
if tid in self._latest_real_heading:
|
|
del self._latest_real_heading[tid]
|
|
if tid in self._latest_raw_heading:
|
|
del self._latest_raw_heading[tid]
|
|
except Exception:
|
|
# Silently ignore errors (e.g., invalid target_id type)
|
|
pass
|
|
|
|
def clear_simulated_data(self, target_id: Optional[int] = None):
|
|
"""
|
|
Clears simulated data history for a specific target or for all targets
|
|
if target_id is None. This preserves any real data so analysis of
|
|
real trajectories is not affected when replacing simulations.
|
|
"""
|
|
with self._lock:
|
|
try:
|
|
if target_id is None:
|
|
for tid, info in self._target_data.items():
|
|
info.get("simulated", collections.deque()).clear()
|
|
else:
|
|
tid = int(target_id)
|
|
if tid in self._target_data:
|
|
self._target_data[tid]["simulated"].clear()
|
|
except Exception:
|
|
# Silently ignore errors to preserve hub stability
|
|
pass
|
|
|
|
def set_ownship_state(self, state: Dict[str, Any]):
|
|
"""
|
|
Updates the ownship's absolute state.
|
|
|
|
This method is thread-safe. The provided state dictionary is merged
|
|
with the existing state.
|
|
|
|
Args:
|
|
state: A dictionary containing ownship state information, e.g.,
|
|
{'position_xy_ft': (x, y), 'heading_deg': 90.0}.
|
|
"""
|
|
with self._lock:
|
|
self._ownship_state.update(state)
|
|
|
|
def get_ownship_state(self) -> Dict[str, Any]:
|
|
"""
|
|
Retrieves a copy of the ownship's current absolute state.
|
|
|
|
This method is thread-safe.
|
|
|
|
Returns:
|
|
A dictionary containing the last known state of the ownship.
|
|
"""
|
|
with self._lock:
|
|
return self._ownship_state.copy()
|
|
|
|
def set_simulation_origin(self, state: Dict[str, Any]):
|
|
"""
|
|
Stores a snapshot of the ownship's state at the start of a simulation.
|
|
|
|
This state serves as the fixed origin (position and orientation) for
|
|
the simulation's coordinate system. This method is thread-safe.
|
|
|
|
Args:
|
|
state: A dictionary containing the ownship's state at T=0.
|
|
"""
|
|
with self._lock:
|
|
self._simulation_origin_state = state.copy()
|
|
|
|
def get_simulation_origin(self) -> Dict[str, Any]:
|
|
"""
|
|
Retrieves a copy of the simulation's origin state.
|
|
|
|
This method is thread-safe.
|
|
|
|
Returns:
|
|
A dictionary containing the ownship state at T=0 for the current simulation.
|
|
"""
|
|
with self._lock:
|
|
return self._simulation_origin_state.copy() |