104 lines
4.0 KiB
Python
104 lines
4.0 KiB
Python
from typing import Dict, List, Optional
|
|
import math
|
|
from target_simulator.core.models import Target
|
|
|
|
|
|
def build_display_data(simulation_hub, scenario=None, engine=None, ppi_widget=None, logger=None) -> Dict[str, List[Target]]:
|
|
"""Builds PPI display data from the simulation hub.
|
|
|
|
Returns a dict with keys 'simulated' and 'real' containing lightweight
|
|
Target objects suitable for passing to PPIDisplay.
|
|
"""
|
|
simulated_targets_for_ppi = []
|
|
real_targets_for_ppi = []
|
|
|
|
if not simulation_hub:
|
|
return {"simulated": [], "real": []}
|
|
|
|
target_ids = simulation_hub.get_all_target_ids()
|
|
|
|
for tid in target_ids:
|
|
history = simulation_hub.get_target_history(tid)
|
|
if not history:
|
|
continue
|
|
|
|
# --- Process Simulated Data ---
|
|
if history.get("simulated"):
|
|
last_sim_state = history["simulated"][-1]
|
|
_ts, x_ft, y_ft, z_ft = last_sim_state
|
|
|
|
sim_target = Target(target_id=tid, trajectory=[])
|
|
setattr(sim_target, "_pos_x_ft", x_ft)
|
|
setattr(sim_target, "_pos_y_ft", y_ft)
|
|
setattr(sim_target, "_pos_z_ft", z_ft)
|
|
sim_target._update_current_polar_coords()
|
|
|
|
# Try to preserve heading information for simulated targets.
|
|
try:
|
|
heading = None
|
|
if engine and getattr(engine, "scenario", None):
|
|
t = engine.scenario.get_target(tid)
|
|
if t:
|
|
heading = getattr(t, "current_heading_deg", None)
|
|
if heading is None and scenario:
|
|
t2 = scenario.get_target(tid)
|
|
if t2:
|
|
heading = getattr(t2, "current_heading_deg", None)
|
|
if heading is not None:
|
|
sim_target.current_heading_deg = float(heading)
|
|
except Exception:
|
|
pass
|
|
|
|
# Determine active flag based on the canonical Scenario/SimulationEngine
|
|
try:
|
|
active_flag = True
|
|
if engine and getattr(engine, "scenario", None):
|
|
t_engine = engine.scenario.get_target(tid)
|
|
if t_engine is not None:
|
|
active_flag = bool(getattr(t_engine, "active", True))
|
|
elif scenario:
|
|
t_scn = scenario.get_target(tid)
|
|
if t_scn is not None:
|
|
active_flag = bool(getattr(t_scn, "active", True))
|
|
except Exception:
|
|
active_flag = True
|
|
sim_target.active = active_flag
|
|
simulated_targets_for_ppi.append(sim_target)
|
|
|
|
# --- Process Real Data ---
|
|
if history.get("real"):
|
|
last_real_state = history["real"][-1]
|
|
_ts, x_ft, y_ft, z_ft = last_real_state
|
|
|
|
real_target = Target(target_id=tid, trajectory=[])
|
|
setattr(real_target, "_pos_x_ft", x_ft)
|
|
setattr(real_target, "_pos_y_ft", y_ft)
|
|
setattr(real_target, "_pos_z_ft", z_ft)
|
|
real_target._update_current_polar_coords()
|
|
|
|
# Copy last-known heading if hub provides it
|
|
try:
|
|
if simulation_hub and hasattr(simulation_hub, "get_real_heading"):
|
|
hdg = simulation_hub.get_real_heading(tid)
|
|
if hdg is not None:
|
|
real_target.current_heading_deg = float(hdg) % 360
|
|
except Exception:
|
|
pass
|
|
|
|
# Optional debug computations (theta0/theta1) left out; callers can
|
|
# compute if needed. Keep active True for real targets.
|
|
real_target.active = True
|
|
real_targets_for_ppi.append(real_target)
|
|
|
|
try:
|
|
if logger:
|
|
logger.debug(
|
|
"PPIDisplay will receive simulated=%d real=%d targets from hub",
|
|
len(simulated_targets_for_ppi),
|
|
len(real_targets_for_ppi),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi}
|