modificata la fase di start della simulazione per non bloccare l'interfaccia.

This commit is contained in:
VALLONGOL 2025-11-03 14:34:57 +01:00
parent 0730068f1c
commit 76178e9888
5 changed files with 776 additions and 234 deletions

View File

@ -15,11 +15,13 @@ from datetime import datetime
# Use absolute imports for robustness and clarity
from target_simulator.gui.ppi_display import PPIDisplay
from target_simulator.gui.ppi_adapter import build_display_data
from target_simulator.gui.connection_settings_window import ConnectionSettingsWindow
from target_simulator.gui.radar_config_window import RadarConfigWindow
from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame
from target_simulator.gui.target_list_frame import TargetListFrame
from target_simulator.gui.connection_panel import ConnectionPanel
from target_simulator.gui.status_bar import StatusBar
from target_simulator.core.communicator_interface import CommunicatorInterface
from target_simulator.core.serial_communicator import SerialCommunicator
@ -598,33 +600,17 @@ class MainView(tk.Tk):
)
def _create_statusbar(self):
status_bar = ttk.Frame(self, relief=tk.SUNKEN)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
ttk.Label(status_bar, text="Target:").pack(side=tk.LEFT, padx=(5, 2))
self.target_status_canvas = tk.Canvas(
status_bar, width=16, height=16, highlightthickness=0
)
self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.target_status_canvas, "#e74c3c")
ttk.Label(status_bar, text="LRU:").pack(side=tk.LEFT, padx=(5, 2))
self.lru_status_canvas = tk.Canvas(
status_bar, width=16, height=16, highlightthickness=0
)
self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.lru_status_canvas, "#e74c3c")
self.status_var = tk.StringVar(value="Ready")
ttk.Label(status_bar, textvariable=self.status_var, anchor=tk.W).pack(
side=tk.LEFT, fill=tk.X, expand=True, padx=5
)
# Small rate indicator showing incoming real-state rate and PPI update rate
try:
self.rate_status_var = tk.StringVar(value="")
ttk.Label(status_bar, textvariable=self.rate_status_var, anchor=tk.E).pack(
side=tk.RIGHT, padx=(4, 8)
)
except Exception:
# Do not allow UI failures to stop initialization
self.rate_status_var = None
# Use the extracted StatusBar widget. Expose the same attributes
# MainView previously provided so callers elsewhere continue to work.
self.status_bar = StatusBar(self)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
self.target_status_canvas = self.status_bar.target_status_canvas
self.lru_status_canvas = self.status_bar.lru_status_canvas
self.status_var = self.status_bar.status_var
self.rate_status_var = getattr(self.status_bar, "rate_status_var", None)
# Id used by show_status_message scheduling
self._status_after_id = None
def show_status_message(self, text: str, timeout_ms: int = 3000):
"""Show a transient status message in the main status bar.
@ -1575,159 +1561,21 @@ class MainView(tk.Tk):
self._update_all_views()
def _build_display_data_from_hub(self) -> Dict[str, List[Target]]:
"""
Builds the data structure for the PPIDisplay by fetching the latest
simulated and real states from the SimulationStateHub.
"""
simulated_targets_for_ppi = []
real_targets_for_ppi = []
if not self.simulation_hub:
return {"simulated": [], "real": []}
target_ids = self.simulation_hub.get_all_target_ids()
for tid in target_ids:
history = self.simulation_hub.get_target_history(tid)
if not history:
continue
# --- Process Simulated Data ---
if history["simulated"]:
last_sim_state = history["simulated"][-1]
_ts, x_ft, y_ft, z_ft = last_sim_state # Hub now stores feet directly
sim_target = Target(target_id=tid, trajectory=[]) # Lightweight object
# Manually set internal cartesian coords and update polar
setattr(sim_target, "_pos_x_ft", x_ft)
setattr(sim_target, "_pos_y_ft", y_ft)
setattr(sim_target, "_pos_z_ft", z_ft)
sim_target._update_current_polar_coords()
# Try to preserve heading information for simulated targets.
# The hub stores only positions; if a Scenario (or the running
# SimulationEngine) has the canonical Target instance, copy its
# computed current_heading_deg so the plotted heading vector
# matches the simulation's internal heading.
try:
heading = None
if (
hasattr(self, "simulation_engine")
and self.simulation_engine
and getattr(self.simulation_engine, "scenario", None)
):
t = self.simulation_engine.scenario.get_target(tid)
if t:
heading = getattr(t, "current_heading_deg", None)
if heading is None and getattr(self, "scenario", None):
t2 = self.scenario.get_target(tid)
if t2:
heading = getattr(t2, "current_heading_deg", None)
if heading is not None:
sim_target.current_heading_deg = float(heading)
except Exception:
pass
# Determine active flag based on the canonical Scenario/SimulationEngine
try:
active_flag = True
if (
hasattr(self, "simulation_engine")
and self.simulation_engine
and getattr(self.simulation_engine, "scenario", None)
):
t_engine = self.simulation_engine.scenario.get_target(tid)
if t_engine is not None:
active_flag = bool(getattr(t_engine, "active", True))
elif getattr(self, "scenario", None):
t_scn = self.scenario.get_target(tid)
if t_scn is not None:
active_flag = bool(getattr(t_scn, "active", True))
except Exception:
active_flag = True
sim_target.active = active_flag
simulated_targets_for_ppi.append(sim_target)
# --- Process Real Data ---
if history["real"]:
last_real_state = history["real"][-1]
_ts, x_ft, y_ft, z_ft = last_real_state # Hub now stores feet directly
real_target = Target(target_id=tid, trajectory=[]) # Lightweight object
setattr(real_target, "_pos_x_ft", x_ft)
setattr(real_target, "_pos_y_ft", y_ft)
setattr(real_target, "_pos_z_ft", z_ft)
real_target._update_current_polar_coords()
# If the hub provides a last-known heading for this real target,
# copy it into the lightweight Target object so the PPI shows
# the correct heading arrow parsed from RIS payloads.
try:
if self.simulation_hub and hasattr(
self.simulation_hub, "get_real_heading"
):
hdg = self.simulation_hub.get_real_heading(tid)
if hdg is not None:
real_target.current_heading_deg = float(hdg) % 360
except Exception:
pass
# Correlation log: show raw->hub->used to help debugging pipeline
try:
if self.simulation_hub and hasattr(
self.simulation_hub, "get_raw_heading"
):
raw_h = self.simulation_hub.get_raw_heading(tid)
else:
raw_h = None
# Compute theta0/theta1 using same conventions as PPIDisplay
try:
az_deg = float(real_target.current_azimuth_deg)
r_nm = float(real_target.current_range_nm)
az_rad = math.radians(az_deg)
x_start = r_nm * math.sin(az_rad)
y_start = r_nm * math.cos(az_rad)
vector_len = (
self.ppi_widget.range_var.get() / 20.0
if hasattr(self, "ppi_widget")
and hasattr(self.ppi_widget, "range_var")
else 1.0
)
hdg_used = float(real_target.current_heading_deg)
hdg_rad = math.radians(hdg_used)
dx = vector_len * math.sin(hdg_rad)
dy = vector_len * math.cos(hdg_rad)
x_end = x_start + dx
y_end = y_start + dy
theta0_deg = -math.degrees(math.atan2(x_start, y_start))
theta1_deg = -math.degrees(math.atan2(x_end, y_end))
except Exception:
theta0_deg = None
theta1_deg = None
# self.logger.debug(
# "Heading pipeline: TID %s raw=%s hub=%s used=%s theta0=%.3f theta1=%.3f",
# tid,
# raw_h,
# getattr(self.simulation_hub, 'get_real_heading')(tid) if self.simulation_hub else None,
# real_target.current_heading_deg,
# theta0_deg if theta0_deg is not None else float('nan'),
# theta1_deg if theta1_deg is not None else float('nan'),
# )
except Exception:
pass
real_target.active = True
real_targets_for_ppi.append(real_target)
# Delegate to ppi_adapter.build_display_data for consistent behavior
try:
self.logger.debug(
"PPIDisplay will receive simulated=%d real=%d targets from hub",
len(simulated_targets_for_ppi),
len(real_targets_for_ppi),
return build_display_data(
self.simulation_hub,
scenario=getattr(self, "scenario", None),
engine=getattr(self, "simulation_engine", None),
ppi_widget=getattr(self, "ppi_widget", None),
logger=getattr(self, "logger", None),
)
except Exception:
pass
return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi}
try:
self.logger.exception("Failed to build display data via ppi_adapter")
except Exception:
pass
return {"simulated": [], "real": []}
def _open_analysis_window(self):
"""Opens the performance analysis window, ensuring only one instance exists."""

View File

@ -0,0 +1,103 @@
from typing import Dict, List, Optional
import math
from target_simulator.core.models import Target
def build_display_data(simulation_hub, scenario=None, engine=None, ppi_widget=None, logger=None) -> Dict[str, List[Target]]:
"""Builds PPI display data from the simulation hub.
Returns a dict with keys 'simulated' and 'real' containing lightweight
Target objects suitable for passing to PPIDisplay.
"""
simulated_targets_for_ppi = []
real_targets_for_ppi = []
if not simulation_hub:
return {"simulated": [], "real": []}
target_ids = simulation_hub.get_all_target_ids()
for tid in target_ids:
history = simulation_hub.get_target_history(tid)
if not history:
continue
# --- Process Simulated Data ---
if history.get("simulated"):
last_sim_state = history["simulated"][-1]
_ts, x_ft, y_ft, z_ft = last_sim_state
sim_target = Target(target_id=tid, trajectory=[])
setattr(sim_target, "_pos_x_ft", x_ft)
setattr(sim_target, "_pos_y_ft", y_ft)
setattr(sim_target, "_pos_z_ft", z_ft)
sim_target._update_current_polar_coords()
# Try to preserve heading information for simulated targets.
try:
heading = None
if engine and getattr(engine, "scenario", None):
t = engine.scenario.get_target(tid)
if t:
heading = getattr(t, "current_heading_deg", None)
if heading is None and scenario:
t2 = scenario.get_target(tid)
if t2:
heading = getattr(t2, "current_heading_deg", None)
if heading is not None:
sim_target.current_heading_deg = float(heading)
except Exception:
pass
# Determine active flag based on the canonical Scenario/SimulationEngine
try:
active_flag = True
if engine and getattr(engine, "scenario", None):
t_engine = engine.scenario.get_target(tid)
if t_engine is not None:
active_flag = bool(getattr(t_engine, "active", True))
elif scenario:
t_scn = scenario.get_target(tid)
if t_scn is not None:
active_flag = bool(getattr(t_scn, "active", True))
except Exception:
active_flag = True
sim_target.active = active_flag
simulated_targets_for_ppi.append(sim_target)
# --- Process Real Data ---
if history.get("real"):
last_real_state = history["real"][-1]
_ts, x_ft, y_ft, z_ft = last_real_state
real_target = Target(target_id=tid, trajectory=[])
setattr(real_target, "_pos_x_ft", x_ft)
setattr(real_target, "_pos_y_ft", y_ft)
setattr(real_target, "_pos_z_ft", z_ft)
real_target._update_current_polar_coords()
# Copy last-known heading if hub provides it
try:
if simulation_hub and hasattr(simulation_hub, "get_real_heading"):
hdg = simulation_hub.get_real_heading(tid)
if hdg is not None:
real_target.current_heading_deg = float(hdg) % 360
except Exception:
pass
# Optional debug computations (theta0/theta1) left out; callers can
# compute if needed. Keep active True for real targets.
real_target.active = True
real_targets_for_ppi.append(real_target)
try:
if logger:
logger.debug(
"PPIDisplay will receive simulated=%d real=%d targets from hub",
len(simulated_targets_for_ppi),
len(real_targets_for_ppi),
)
except Exception:
pass
return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi}

View File

@ -0,0 +1,159 @@
import tkinter as tk
from tkinter import ttk
from typing import Optional
class StatusBar(ttk.Frame):
"""Status bar widget containing connection indicators and rate/status text.
Exposes small API so MainView can delegate status updates without managing
layout details.
"""
def __init__(self, parent):
super().__init__(parent, relief=tk.SUNKEN)
ttk.Label(self, text="Target:").pack(side=tk.LEFT, padx=(5, 2))
self.target_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0)
self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.target_status_canvas, "#e74c3c")
ttk.Label(self, text="LRU:").pack(side=tk.LEFT, padx=(5, 2))
self.lru_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0)
self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.lru_status_canvas, "#e74c3c")
self.status_var = tk.StringVar(value="Ready")
ttk.Label(self, textvariable=self.status_var, anchor=tk.W).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
# Small rate indicator showing incoming real-state rate and PPI update rate
try:
self.rate_status_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(4, 8))
except Exception:
self.rate_status_var = None
# Id for scheduled status clear; used by show_status_message
self._status_after_id: Optional[str] = None
def _draw_status_indicator(self, canvas, color):
try:
canvas.delete("all")
canvas.create_oval(2, 2, 14, 14, fill=color, outline="black")
except Exception:
pass
def set_target_connected(self, is_connected: bool):
color = "#2ecc40" if is_connected else "#e74c3c"
try:
self._draw_status_indicator(self.target_status_canvas, color)
except Exception:
pass
def set_lru_connected(self, is_connected: bool):
color = "#2ecc40" if is_connected else "#e74c3c"
try:
self._draw_status_indicator(self.lru_status_canvas, color)
except Exception:
pass
def show_status_message(self, text: str, timeout_ms: int = 3000):
try:
try:
if self._status_after_id is not None:
self.after_cancel(self._status_after_id)
except Exception:
pass
self.status_var.set(text)
def _clear():
try:
self.status_var.set("Ready")
except Exception:
pass
self._status_after_id = self.after(timeout_ms, _clear)
except Exception:
# Fallback: set the var and do not schedule clear
try:
self.status_var.set(text)
except Exception:
pass
class StatusBar(ttk.Frame):
"""Status bar widget containing connection indicators and rate/status text.
Exposes small API so MainView can delegate status updates without managing
layout details.
"""
def __init__(self, parent):
super().__init__(parent, relief=tk.SUNKEN)
ttk.Label(self, text="Target:").pack(side=tk.LEFT, padx=(5, 2))
self.target_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0)
self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.target_status_canvas, "#e74c3c")
ttk.Label(self, text="LRU:").pack(side=tk.LEFT, padx=(5, 2))
self.lru_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0)
self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.lru_status_canvas, "#e74c3c")
self.status_var = tk.StringVar(value="Ready")
ttk.Label(self, textvariable=self.status_var, anchor=tk.W).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
# Small rate indicator showing incoming real-state rate and PPI update rate
try:
self.rate_status_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(4, 8))
except Exception:
self.rate_status_var = None
# Id for scheduled status clear; used by show_status_message
self._status_after_id: Optional[str] = None
def _draw_status_indicator(self, canvas, color):
try:
canvas.delete("all")
canvas.create_oval(2, 2, 14, 14, fill=color, outline="black")
except Exception:
pass
def set_target_connected(self, is_connected: bool):
color = "#2ecc40" if is_connected else "#e74c3c"
try:
self._draw_status_indicator(self.target_status_canvas, color)
except Exception:
pass
def set_lru_connected(self, is_connected: bool):
color = "#2ecc40" if is_connected else "#e74c3c"
try:
self._draw_status_indicator(self.lru_status_canvas, color)
except Exception:
pass
def show_status_message(self, text: str, timeout_ms: int = 3000):
try:
try:
if self._status_after_id is not None:
self.after_cancel(self._status_after_id)
except Exception:
pass
self.status_var.set(text)
def _clear():
try:
self.status_var.set("Ready")
except Exception:
pass
self._status_after_id = self.after(timeout_ms, _clear)
except Exception:
# Fallback: set the var and do not schedule clear
try:
self.status_var.set(text)
except Exception:
pass

View File

@ -1,4 +1,12 @@
import time
import threading
from typing import Optional
from tkinter import messagebox
from target_simulator.core.simulation_engine import SimulationEngine
from target_simulator.analysis.simulation_archive import SimulationArchive
import time
import threading
from typing import Optional
from tkinter import messagebox
@ -47,7 +55,6 @@ class SimulationController:
except Exception:
use_json = False
# Helper: wait until hub reports no active real targets (bounded)
def _wait_for_clear(timeout_s: float = 3.0, poll_interval: float = 0.2) -> bool:
waited = 0.0
while waited < timeout_s:
@ -210,6 +217,144 @@ class SimulationController:
pass
return False
def _reset_radar_state_no_ui(self, main_view) -> bool:
"""Same logic as reset_radar_state but avoids any Tk UI calls (messagebox).
This variant is safe to call from a background thread; failures are
logged and a boolean is returned so the caller can decide how to
notify the user on the main thread.
"""
target_comm = getattr(self.communicator_manager, "target_communicator", None)
if not target_comm or not getattr(target_comm, "is_open", False):
try:
self.logger.error("Cannot reset radar state: communicator is not connected.")
except Exception:
pass
return False
try:
use_json = bool(getattr(target_comm, "_use_json_protocol", False))
except Exception:
use_json = False
def _wait_for_clear(timeout_s: float = 3.0, poll_interval: float = 0.2) -> bool:
waited = 0.0
while waited < timeout_s:
try:
if not self.simulation_hub.has_active_real_targets():
return True
except Exception:
try:
self.logger.debug("Error while querying simulation_hub during reset wait", exc_info=True)
except Exception:
pass
time.sleep(poll_interval)
waited += poll_interval
return False
# Legacy textual path
if not use_json:
cmd = "tgtset /-s\n"
try:
self.logger.info("Sending legacy reset command: %s", cmd.strip())
except Exception:
pass
try:
ok = target_comm.send_commands([cmd])
except Exception:
try:
self.logger.exception("Failed to send legacy reset command")
except Exception:
pass
ok = False
if not ok:
try:
self.logger.error("Failed to send reset command to the radar.")
except Exception:
pass
return False
cleared = _wait_for_clear()
return bool(cleared)
# JSON-capable communicator path
try:
json_reset = '{"CMD":"reset"}\n'
try:
self.logger.info("Sending JSON reset command: %s", json_reset.strip())
except Exception:
pass
try:
ok = target_comm.send_commands([json_reset])
except Exception:
try:
self.logger.exception("Failed to send JSON reset command")
except Exception:
pass
ok = False
if ok:
cleared = _wait_for_clear()
if cleared:
try:
self.logger.info("JSON reset acknowledged: server cleared active flags.")
except Exception:
pass
return True
except Exception:
try:
self.logger.exception("Unexpected error while attempting JSON reset")
except Exception:
pass
# 3) Build and send per-target JSON payloads that explicitly clear active flag
try:
try:
self.logger.info("Building per-target JSON reset payloads to clear active flags on all targets.")
except Exception:
pass
json_payloads = command_builder.build_json_reset_ids()
except Exception:
try:
self.logger.exception("Failed to build per-target JSON reset payloads")
except Exception:
pass
return False
all_sent_ok = True
for i, payload in enumerate(json_payloads):
p = payload if payload.endswith("\n") else payload + "\n"
try:
self.logger.info("Sending JSON reset payload part %d/%d", i + 1, len(json_payloads))
except Exception:
pass
try:
ok = target_comm.send_commands([p])
except Exception:
try:
self.logger.exception("Exception while sending JSON reset payload part %d", i + 1)
except Exception:
pass
ok = False
if not ok:
all_sent_ok = False
try:
self.logger.error("Failed to send JSON reset payload part %d", i + 1)
except Exception:
pass
break
if not all_sent_ok:
try:
self.logger.error("Failed to send one or more JSON reset payloads to the radar.")
except Exception:
pass
return False
cleared = _wait_for_clear()
return bool(cleared)
def start_simulation(self, main_view):
"""Start live simulation using data from main_view (keeps compatibility)."""
if main_view.is_simulation_running.get():
@ -268,81 +413,193 @@ class SimulationController:
except Exception:
pass
# Reset radar state first
if not self.reset_radar_state(main_view):
# Do UI updates (disable/notify) and run heavy ops in background
def _background_start():
# Reset radar state in background
ok = self._reset_radar_state_no_ui(main_view)
if not ok:
try:
self.logger.error("Aborting simulation start due to radar reset failure.")
except Exception:
pass
try:
main_view.after(0, lambda: messagebox.showerror("Reset Error", "Failed to reset radar before starting simulation."))
except Exception:
pass
try:
main_view.after(0, lambda: main_view._update_button_states())
except Exception:
pass
return
time.sleep(1)
# Send initial scenario
try:
self.logger.error("Aborting simulation start due to radar reset failure.")
self.logger.info("Sending initial scenario state before starting live updates...")
except Exception:
pass
return
try:
sent = target_comm.send_scenario(main_view.scenario)
except Exception:
sent = False
time.sleep(1)
if not sent:
try:
self.logger.error("Failed to send initial scenario state. Aborting live simulation start.")
except Exception:
pass
try:
main_view.after(0, lambda: messagebox.showerror("Send Error", "Failed to send the initial scenario configuration. Cannot start live simulation."))
except Exception:
pass
try:
main_view.after(0, lambda: main_view._update_button_states())
except Exception:
pass
return
# Initialize engine
try:
engine = SimulationEngine(communicator=target_comm, simulation_hub=self.simulation_hub)
engine.set_time_multiplier(main_view.time_multiplier)
engine.set_update_interval(update_interval)
engine.load_scenario(main_view.scenario)
self.simulation_engine = engine
except Exception:
try:
self.logger.exception("Failed to initialize SimulationEngine")
except Exception:
pass
try:
main_view.after(0, lambda: messagebox.showerror("Engine Error", "Failed to initialize simulation engine."))
except Exception:
pass
try:
main_view.after(0, lambda: main_view._update_button_states())
except Exception:
pass
return
# Compute durations, archive, and start engine thread
try:
durations = [getattr(t, "_total_duration_s", 0.0) for t in main_view.scenario.get_all_targets()]
total_sim_time = max(durations) if durations else 0.0
except Exception:
total_sim_time = 0.0
try:
current_archive = SimulationArchive(main_view.scenario)
engine.archive = current_archive
if target_comm and hasattr(target_comm, "router"):
router = target_comm.router()
if router:
router.set_archive(current_archive)
except Exception:
current_archive = None
try:
engine.start()
except Exception:
try:
self.logger.exception("Failed to start simulation engine thread")
except Exception:
pass
try:
main_view.after(0, lambda: messagebox.showerror("Start Error", "Failed to start simulation engine thread."))
except Exception:
pass
try:
main_view.after(0, lambda: main_view._update_button_states())
except Exception:
pass
return
# Finalize on main thread
def _on_started():
try:
main_view.simulation_engine = engine
self.current_archive = current_archive
main_view.current_archive = current_archive
main_view.total_sim_time = total_sim_time
main_view.sim_elapsed_time = 0.0
try:
main_view.sim_slider_var.set(0.0)
except Exception:
pass
try:
main_view._update_simulation_progress_display()
except Exception:
pass
try:
if hasattr(main_view, "ppi_widget") and main_view.ppi_widget:
main_view.ppi_widget.clear_previews()
except Exception:
pass
main_view.is_simulation_running.set(True)
try:
main_view._update_button_states()
except Exception:
pass
except Exception:
pass
try:
main_view.after(0, _on_started)
except Exception:
pass
# UI: show status and disable controls before background work
try:
self.logger.info("Sending initial scenario state before starting live updates...")
main_view.after(0, lambda: main_view.status_bar.show_status_message("Starting simulation...", timeout_ms=0))
except Exception:
pass
if not target_comm.send_scenario(main_view.scenario):
try:
self.logger.error("Failed to send initial scenario state. Aborting live simulation start.")
messagebox.showerror(
"Send Error",
"Failed to send the initial scenario configuration. Cannot start live simulation.",
)
except Exception:
pass
try:
main_view.after(0, lambda: main_view._update_button_states())
except Exception:
pass
t = threading.Thread(target=_background_start, daemon=True)
t.start()
def stop_simulation(self, main_view):
if not main_view.is_simulation_running.get() or not getattr(main_view, "simulation_engine", None):
return
try:
self.logger.info("Initial scenario state sent successfully.")
except Exception:
pass
if main_view.current_archive:
try:
main_view.current_archive.save()
except Exception:
pass
main_view.current_archive = None
try:
target_comm = getattr(self.communicator_manager, "target_communicator", None)
if target_comm and hasattr(target_comm, "router"):
router = target_comm.router()
if router:
router.set_archive(None)
except Exception:
pass
try:
main_view._refresh_analysis_list()
except Exception:
pass
try:
self.logger.info("Starting live simulation...")
except Exception:
pass
try:
main_view.scenario.reset_simulation()
except Exception:
pass
# Create engine and start
try:
engine = SimulationEngine(communicator=target_comm, simulation_hub=self.simulation_hub)
engine.set_time_multiplier(main_view.time_multiplier)
engine.set_update_interval(update_interval)
engine.load_scenario(main_view.scenario)
main_view.simulation_engine = engine
self.simulation_engine = engine
except Exception:
try:
self.logger.exception("Failed to initialize SimulationEngine")
main_view.simulation_engine.stop()
except Exception:
pass
return
# Compute total duration
try:
durations = [getattr(t, "_total_duration_s", 0.0) for t in main_view.scenario.get_all_targets()]
main_view.total_sim_time = max(durations) if durations else 0.0
except Exception:
main_view.total_sim_time = 0.0
# Reset slider and labels
main_view.sim_elapsed_time = 0.0
try:
main_view.sim_slider_var.set(0.0)
except Exception:
pass
try:
main_view._update_simulation_progress_display()
main_view.simulation_engine = None
self.simulation_engine = None
except Exception:
pass
# Archive wiring
main_view.is_simulation_running.set(False)
try:
main_view._update_button_states()
except Exception:
pass
self.current_archive = SimulationArchive(main_view.scenario)
self.simulation_engine.archive = self.current_archive
if target_comm and hasattr(target_comm, "router"):

View File

@ -0,0 +1,175 @@
import types
import time
import pytest
from target_simulator.simulation.simulation_controller import SimulationController
class DummyVar:
def __init__(self, value=None):
self._v = value
def get(self):
return self._v
def set(self, v):
self._v = v
class DummyComm:
def __init__(self, use_json=False, send_results=None):
self.is_open = True
self._use_json_protocol = use_json
# send_results can be a list or a callable
self._send_calls = []
self._send_results = send_results
def send_commands(self, cmds):
self._send_calls.append(cmds)
if callable(self._send_results):
return self._send_results(cmds)
if isinstance(self._send_results, list):
if len(self._send_results) >= len(self._send_calls):
return self._send_results[len(self._send_calls) - 1]
return self._send_results[-1]
return True
def send_scenario(self, scenario):
return True
def router(self):
return None
class DummyCommManager:
def __init__(self, target_comm=None):
self.target_communicator = target_comm
class DummyHub:
def __init__(self, active_sequence=None):
# active_sequence: iterable of booleans returned by has_active_real_targets
self._seq = list(active_sequence) if active_sequence is not None else []
def has_active_real_targets(self):
if self._seq:
return self._seq.pop(0)
return False
def reset(self):
pass
class DummyScenario:
def __init__(self):
class T:
_total_duration_s = 5.0
self.targets = [T()]
def get_all_targets(self):
return self.targets
def reset_simulation(self):
pass
class DummyMainView:
def __init__(self):
self.is_simulation_running = DummyVar(False)
self.update_time = DummyVar(1.0)
self.time_multiplier = 1.0
self.scenario = DummyScenario()
self.ppi_widget = types.SimpleNamespace(clear_trails=lambda: None, clear_previews=lambda: None)
self.sim_slider_var = DummyVar(0.0)
self.simulation_engine = None
self.current_archive = None
def _update_simulation_progress_display(self):
pass
def _update_button_states(self):
pass
def _refresh_analysis_list(self):
pass
def test_reset_returns_false_when_not_connected():
hub = DummyHub()
cm = DummyCommManager(None)
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
assert controller.reset_radar_state(mv) is False
def test_legacy_reset_success(monkeypatch):
# Legacy communicator: _use_json_protocol False
comm = DummyComm(use_json=False, send_results=[True])
cm = DummyCommManager(comm)
hub = DummyHub(active_sequence=[False])
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
assert controller.reset_radar_state(mv) is True
def test_json_reset_fallback_to_per_target(monkeypatch):
# JSON communicator that fails simple reset but succeeds per-target
results = [False, True] # first send (json reset) -> False, second (per-target) -> True
def send_results(cmds):
# pop result accordingly
return results.pop(0)
comm = DummyComm(use_json=True, send_results=send_results)
cm = DummyCommManager(comm)
# hub returns False (cleared) after per-target
hub = DummyHub(active_sequence=[True, False])
# Patch command_builder.build_json_reset_ids to return two payloads
import target_simulator.simulation.simulation_controller as scmod
monkeypatch.setattr(scmod.command_builder, "build_json_reset_ids", lambda: ["{\"id\":1}"])
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
assert controller.reset_radar_state(mv) is True
def test_start_simulation_creates_engine_and_sets_running(monkeypatch):
# Patch SimulationEngine used in controller to a dummy engine
class DummyEngine:
def __init__(self, communicator=None, simulation_hub=None):
self.started = False
def set_time_multiplier(self, v):
pass
def set_update_interval(self, v):
pass
def load_scenario(self, s):
pass
def start(self):
self.started = True
def stop(self):
self.started = False
def is_running(self):
return self.started
monkeypatch.setattr("target_simulator.simulation.simulation_controller.SimulationEngine", DummyEngine)
comm = DummyComm(use_json=False, send_results=[True])
cm = DummyCommManager(comm)
hub = DummyHub()
controller = SimulationController(cm, hub, config_manager=None, logger=None)
mv = DummyMainView()
# start simulation
controller.start_simulation(mv)
# engine should be created and main_view.simulation_engine set
assert mv.simulation_engine is not None
assert mv.is_simulation_running.get() is True