From 76178e988839c622692aeb45ada2d17d4ae3fb62 Mon Sep 17 00:00:00 2001 From: VALLONGOL Date: Mon, 3 Nov 2025 14:34:57 +0100 Subject: [PATCH] modificata la fase di start della simulazione per non bloccare l'interfaccia. --- target_simulator/gui/main_view.py | 202 ++-------- target_simulator/gui/ppi_adapter.py | 103 +++++ target_simulator/gui/status_bar.py | 159 ++++++++ .../simulation/simulation_controller.py | 371 +++++++++++++++--- .../simulation/test_simulation_controller.py | 175 +++++++++ 5 files changed, 776 insertions(+), 234 deletions(-) create mode 100644 target_simulator/gui/ppi_adapter.py create mode 100644 target_simulator/gui/status_bar.py create mode 100644 tests/simulation/test_simulation_controller.py diff --git a/target_simulator/gui/main_view.py b/target_simulator/gui/main_view.py index a7e378a..6b48047 100644 --- a/target_simulator/gui/main_view.py +++ b/target_simulator/gui/main_view.py @@ -15,11 +15,13 @@ from datetime import datetime # Use absolute imports for robustness and clarity from target_simulator.gui.ppi_display import PPIDisplay +from target_simulator.gui.ppi_adapter import build_display_data from target_simulator.gui.connection_settings_window import ConnectionSettingsWindow from target_simulator.gui.radar_config_window import RadarConfigWindow from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame from target_simulator.gui.target_list_frame import TargetListFrame from target_simulator.gui.connection_panel import ConnectionPanel +from target_simulator.gui.status_bar import StatusBar from target_simulator.core.communicator_interface import CommunicatorInterface from target_simulator.core.serial_communicator import SerialCommunicator @@ -598,33 +600,17 @@ class MainView(tk.Tk): ) def _create_statusbar(self): - status_bar = ttk.Frame(self, relief=tk.SUNKEN) - status_bar.pack(side=tk.BOTTOM, fill=tk.X) - ttk.Label(status_bar, text="Target:").pack(side=tk.LEFT, padx=(5, 2)) - self.target_status_canvas = tk.Canvas( - status_bar, width=16, height=16, highlightthickness=0 - ) - self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 10)) - self._draw_status_indicator(self.target_status_canvas, "#e74c3c") - ttk.Label(status_bar, text="LRU:").pack(side=tk.LEFT, padx=(5, 2)) - self.lru_status_canvas = tk.Canvas( - status_bar, width=16, height=16, highlightthickness=0 - ) - self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 10)) - self._draw_status_indicator(self.lru_status_canvas, "#e74c3c") - self.status_var = tk.StringVar(value="Ready") - ttk.Label(status_bar, textvariable=self.status_var, anchor=tk.W).pack( - side=tk.LEFT, fill=tk.X, expand=True, padx=5 - ) - # Small rate indicator showing incoming real-state rate and PPI update rate - try: - self.rate_status_var = tk.StringVar(value="") - ttk.Label(status_bar, textvariable=self.rate_status_var, anchor=tk.E).pack( - side=tk.RIGHT, padx=(4, 8) - ) - except Exception: - # Do not allow UI failures to stop initialization - self.rate_status_var = None + # Use the extracted StatusBar widget. Expose the same attributes + # MainView previously provided so callers elsewhere continue to work. + self.status_bar = StatusBar(self) + self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) + + self.target_status_canvas = self.status_bar.target_status_canvas + self.lru_status_canvas = self.status_bar.lru_status_canvas + self.status_var = self.status_bar.status_var + self.rate_status_var = getattr(self.status_bar, "rate_status_var", None) + # Id used by show_status_message scheduling + self._status_after_id = None def show_status_message(self, text: str, timeout_ms: int = 3000): """Show a transient status message in the main status bar. @@ -1575,159 +1561,21 @@ class MainView(tk.Tk): self._update_all_views() def _build_display_data_from_hub(self) -> Dict[str, List[Target]]: - """ - Builds the data structure for the PPIDisplay by fetching the latest - simulated and real states from the SimulationStateHub. - """ - simulated_targets_for_ppi = [] - real_targets_for_ppi = [] - - if not self.simulation_hub: - return {"simulated": [], "real": []} - - target_ids = self.simulation_hub.get_all_target_ids() - - for tid in target_ids: - history = self.simulation_hub.get_target_history(tid) - if not history: - continue - - # --- Process Simulated Data --- - if history["simulated"]: - last_sim_state = history["simulated"][-1] - _ts, x_ft, y_ft, z_ft = last_sim_state # Hub now stores feet directly - - sim_target = Target(target_id=tid, trajectory=[]) # Lightweight object - # Manually set internal cartesian coords and update polar - setattr(sim_target, "_pos_x_ft", x_ft) - setattr(sim_target, "_pos_y_ft", y_ft) - setattr(sim_target, "_pos_z_ft", z_ft) - sim_target._update_current_polar_coords() - # Try to preserve heading information for simulated targets. - # The hub stores only positions; if a Scenario (or the running - # SimulationEngine) has the canonical Target instance, copy its - # computed current_heading_deg so the plotted heading vector - # matches the simulation's internal heading. - try: - heading = None - if ( - hasattr(self, "simulation_engine") - and self.simulation_engine - and getattr(self.simulation_engine, "scenario", None) - ): - t = self.simulation_engine.scenario.get_target(tid) - if t: - heading = getattr(t, "current_heading_deg", None) - if heading is None and getattr(self, "scenario", None): - t2 = self.scenario.get_target(tid) - if t2: - heading = getattr(t2, "current_heading_deg", None) - if heading is not None: - sim_target.current_heading_deg = float(heading) - except Exception: - pass - - # Determine active flag based on the canonical Scenario/SimulationEngine - try: - active_flag = True - if ( - hasattr(self, "simulation_engine") - and self.simulation_engine - and getattr(self.simulation_engine, "scenario", None) - ): - t_engine = self.simulation_engine.scenario.get_target(tid) - if t_engine is not None: - active_flag = bool(getattr(t_engine, "active", True)) - elif getattr(self, "scenario", None): - t_scn = self.scenario.get_target(tid) - if t_scn is not None: - active_flag = bool(getattr(t_scn, "active", True)) - except Exception: - active_flag = True - sim_target.active = active_flag - simulated_targets_for_ppi.append(sim_target) - - # --- Process Real Data --- - if history["real"]: - last_real_state = history["real"][-1] - _ts, x_ft, y_ft, z_ft = last_real_state # Hub now stores feet directly - - real_target = Target(target_id=tid, trajectory=[]) # Lightweight object - setattr(real_target, "_pos_x_ft", x_ft) - setattr(real_target, "_pos_y_ft", y_ft) - setattr(real_target, "_pos_z_ft", z_ft) - real_target._update_current_polar_coords() - # If the hub provides a last-known heading for this real target, - # copy it into the lightweight Target object so the PPI shows - # the correct heading arrow parsed from RIS payloads. - try: - if self.simulation_hub and hasattr( - self.simulation_hub, "get_real_heading" - ): - hdg = self.simulation_hub.get_real_heading(tid) - if hdg is not None: - real_target.current_heading_deg = float(hdg) % 360 - except Exception: - pass - - # Correlation log: show raw->hub->used to help debugging pipeline - try: - if self.simulation_hub and hasattr( - self.simulation_hub, "get_raw_heading" - ): - raw_h = self.simulation_hub.get_raw_heading(tid) - else: - raw_h = None - # Compute theta0/theta1 using same conventions as PPIDisplay - try: - az_deg = float(real_target.current_azimuth_deg) - r_nm = float(real_target.current_range_nm) - az_rad = math.radians(az_deg) - x_start = r_nm * math.sin(az_rad) - y_start = r_nm * math.cos(az_rad) - vector_len = ( - self.ppi_widget.range_var.get() / 20.0 - if hasattr(self, "ppi_widget") - and hasattr(self.ppi_widget, "range_var") - else 1.0 - ) - hdg_used = float(real_target.current_heading_deg) - hdg_rad = math.radians(hdg_used) - dx = vector_len * math.sin(hdg_rad) - dy = vector_len * math.cos(hdg_rad) - x_end = x_start + dx - y_end = y_start + dy - theta0_deg = -math.degrees(math.atan2(x_start, y_start)) - theta1_deg = -math.degrees(math.atan2(x_end, y_end)) - except Exception: - theta0_deg = None - theta1_deg = None - - # self.logger.debug( - # "Heading pipeline: TID %s raw=%s hub=%s used=%s theta0=%.3f theta1=%.3f", - # tid, - # raw_h, - # getattr(self.simulation_hub, 'get_real_heading')(tid) if self.simulation_hub else None, - # real_target.current_heading_deg, - # theta0_deg if theta0_deg is not None else float('nan'), - # theta1_deg if theta1_deg is not None else float('nan'), - # ) - except Exception: - pass - - real_target.active = True - real_targets_for_ppi.append(real_target) - + # Delegate to ppi_adapter.build_display_data for consistent behavior try: - self.logger.debug( - "PPIDisplay will receive simulated=%d real=%d targets from hub", - len(simulated_targets_for_ppi), - len(real_targets_for_ppi), + return build_display_data( + self.simulation_hub, + scenario=getattr(self, "scenario", None), + engine=getattr(self, "simulation_engine", None), + ppi_widget=getattr(self, "ppi_widget", None), + logger=getattr(self, "logger", None), ) except Exception: - pass - - return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi} + try: + self.logger.exception("Failed to build display data via ppi_adapter") + except Exception: + pass + return {"simulated": [], "real": []} def _open_analysis_window(self): """Opens the performance analysis window, ensuring only one instance exists.""" diff --git a/target_simulator/gui/ppi_adapter.py b/target_simulator/gui/ppi_adapter.py new file mode 100644 index 0000000..c1465be --- /dev/null +++ b/target_simulator/gui/ppi_adapter.py @@ -0,0 +1,103 @@ +from typing import Dict, List, Optional +import math +from target_simulator.core.models import Target + + +def build_display_data(simulation_hub, scenario=None, engine=None, ppi_widget=None, logger=None) -> Dict[str, List[Target]]: + """Builds PPI display data from the simulation hub. + + Returns a dict with keys 'simulated' and 'real' containing lightweight + Target objects suitable for passing to PPIDisplay. + """ + simulated_targets_for_ppi = [] + real_targets_for_ppi = [] + + if not simulation_hub: + return {"simulated": [], "real": []} + + target_ids = simulation_hub.get_all_target_ids() + + for tid in target_ids: + history = simulation_hub.get_target_history(tid) + if not history: + continue + + # --- Process Simulated Data --- + if history.get("simulated"): + last_sim_state = history["simulated"][-1] + _ts, x_ft, y_ft, z_ft = last_sim_state + + sim_target = Target(target_id=tid, trajectory=[]) + setattr(sim_target, "_pos_x_ft", x_ft) + setattr(sim_target, "_pos_y_ft", y_ft) + setattr(sim_target, "_pos_z_ft", z_ft) + sim_target._update_current_polar_coords() + + # Try to preserve heading information for simulated targets. + try: + heading = None + if engine and getattr(engine, "scenario", None): + t = engine.scenario.get_target(tid) + if t: + heading = getattr(t, "current_heading_deg", None) + if heading is None and scenario: + t2 = scenario.get_target(tid) + if t2: + heading = getattr(t2, "current_heading_deg", None) + if heading is not None: + sim_target.current_heading_deg = float(heading) + except Exception: + pass + + # Determine active flag based on the canonical Scenario/SimulationEngine + try: + active_flag = True + if engine and getattr(engine, "scenario", None): + t_engine = engine.scenario.get_target(tid) + if t_engine is not None: + active_flag = bool(getattr(t_engine, "active", True)) + elif scenario: + t_scn = scenario.get_target(tid) + if t_scn is not None: + active_flag = bool(getattr(t_scn, "active", True)) + except Exception: + active_flag = True + sim_target.active = active_flag + simulated_targets_for_ppi.append(sim_target) + + # --- Process Real Data --- + if history.get("real"): + last_real_state = history["real"][-1] + _ts, x_ft, y_ft, z_ft = last_real_state + + real_target = Target(target_id=tid, trajectory=[]) + setattr(real_target, "_pos_x_ft", x_ft) + setattr(real_target, "_pos_y_ft", y_ft) + setattr(real_target, "_pos_z_ft", z_ft) + real_target._update_current_polar_coords() + + # Copy last-known heading if hub provides it + try: + if simulation_hub and hasattr(simulation_hub, "get_real_heading"): + hdg = simulation_hub.get_real_heading(tid) + if hdg is not None: + real_target.current_heading_deg = float(hdg) % 360 + except Exception: + pass + + # Optional debug computations (theta0/theta1) left out; callers can + # compute if needed. Keep active True for real targets. + real_target.active = True + real_targets_for_ppi.append(real_target) + + try: + if logger: + logger.debug( + "PPIDisplay will receive simulated=%d real=%d targets from hub", + len(simulated_targets_for_ppi), + len(real_targets_for_ppi), + ) + except Exception: + pass + + return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi} diff --git a/target_simulator/gui/status_bar.py b/target_simulator/gui/status_bar.py new file mode 100644 index 0000000..9c3bc65 --- /dev/null +++ b/target_simulator/gui/status_bar.py @@ -0,0 +1,159 @@ +import tkinter as tk +from tkinter import ttk +from typing import Optional + + +class StatusBar(ttk.Frame): + """Status bar widget containing connection indicators and rate/status text. + + Exposes small API so MainView can delegate status updates without managing + layout details. + """ + + def __init__(self, parent): + super().__init__(parent, relief=tk.SUNKEN) + + ttk.Label(self, text="Target:").pack(side=tk.LEFT, padx=(5, 2)) + self.target_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0) + self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 10)) + self._draw_status_indicator(self.target_status_canvas, "#e74c3c") + + ttk.Label(self, text="LRU:").pack(side=tk.LEFT, padx=(5, 2)) + self.lru_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0) + self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 10)) + self._draw_status_indicator(self.lru_status_canvas, "#e74c3c") + + self.status_var = tk.StringVar(value="Ready") + ttk.Label(self, textvariable=self.status_var, anchor=tk.W).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5) + + # Small rate indicator showing incoming real-state rate and PPI update rate + try: + self.rate_status_var = tk.StringVar(value="") + ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(4, 8)) + except Exception: + self.rate_status_var = None + + # Id for scheduled status clear; used by show_status_message + self._status_after_id: Optional[str] = None + + def _draw_status_indicator(self, canvas, color): + try: + canvas.delete("all") + canvas.create_oval(2, 2, 14, 14, fill=color, outline="black") + except Exception: + pass + + def set_target_connected(self, is_connected: bool): + color = "#2ecc40" if is_connected else "#e74c3c" + try: + self._draw_status_indicator(self.target_status_canvas, color) + except Exception: + pass + + def set_lru_connected(self, is_connected: bool): + color = "#2ecc40" if is_connected else "#e74c3c" + try: + self._draw_status_indicator(self.lru_status_canvas, color) + except Exception: + pass + + def show_status_message(self, text: str, timeout_ms: int = 3000): + try: + try: + if self._status_after_id is not None: + self.after_cancel(self._status_after_id) + except Exception: + pass + self.status_var.set(text) + + def _clear(): + try: + self.status_var.set("Ready") + except Exception: + pass + + self._status_after_id = self.after(timeout_ms, _clear) + except Exception: + # Fallback: set the var and do not schedule clear + try: + self.status_var.set(text) + except Exception: + pass + + +class StatusBar(ttk.Frame): + """Status bar widget containing connection indicators and rate/status text. + + Exposes small API so MainView can delegate status updates without managing + layout details. + """ + + def __init__(self, parent): + super().__init__(parent, relief=tk.SUNKEN) + + ttk.Label(self, text="Target:").pack(side=tk.LEFT, padx=(5, 2)) + self.target_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0) + self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 10)) + self._draw_status_indicator(self.target_status_canvas, "#e74c3c") + + ttk.Label(self, text="LRU:").pack(side=tk.LEFT, padx=(5, 2)) + self.lru_status_canvas = tk.Canvas(self, width=16, height=16, highlightthickness=0) + self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 10)) + self._draw_status_indicator(self.lru_status_canvas, "#e74c3c") + + self.status_var = tk.StringVar(value="Ready") + ttk.Label(self, textvariable=self.status_var, anchor=tk.W).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5) + + # Small rate indicator showing incoming real-state rate and PPI update rate + try: + self.rate_status_var = tk.StringVar(value="") + ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(side=tk.RIGHT, padx=(4, 8)) + except Exception: + self.rate_status_var = None + + # Id for scheduled status clear; used by show_status_message + self._status_after_id: Optional[str] = None + + def _draw_status_indicator(self, canvas, color): + try: + canvas.delete("all") + canvas.create_oval(2, 2, 14, 14, fill=color, outline="black") + except Exception: + pass + + def set_target_connected(self, is_connected: bool): + color = "#2ecc40" if is_connected else "#e74c3c" + try: + self._draw_status_indicator(self.target_status_canvas, color) + except Exception: + pass + + def set_lru_connected(self, is_connected: bool): + color = "#2ecc40" if is_connected else "#e74c3c" + try: + self._draw_status_indicator(self.lru_status_canvas, color) + except Exception: + pass + + def show_status_message(self, text: str, timeout_ms: int = 3000): + try: + try: + if self._status_after_id is not None: + self.after_cancel(self._status_after_id) + except Exception: + pass + self.status_var.set(text) + + def _clear(): + try: + self.status_var.set("Ready") + except Exception: + pass + + self._status_after_id = self.after(timeout_ms, _clear) + except Exception: + # Fallback: set the var and do not schedule clear + try: + self.status_var.set(text) + except Exception: + pass diff --git a/target_simulator/simulation/simulation_controller.py b/target_simulator/simulation/simulation_controller.py index a2d86be..19b17fa 100644 --- a/target_simulator/simulation/simulation_controller.py +++ b/target_simulator/simulation/simulation_controller.py @@ -1,4 +1,12 @@ import time +import threading +from typing import Optional +from tkinter import messagebox + +from target_simulator.core.simulation_engine import SimulationEngine +from target_simulator.analysis.simulation_archive import SimulationArchive +import time +import threading from typing import Optional from tkinter import messagebox @@ -47,7 +55,6 @@ class SimulationController: except Exception: use_json = False - # Helper: wait until hub reports no active real targets (bounded) def _wait_for_clear(timeout_s: float = 3.0, poll_interval: float = 0.2) -> bool: waited = 0.0 while waited < timeout_s: @@ -210,6 +217,144 @@ class SimulationController: pass return False + def _reset_radar_state_no_ui(self, main_view) -> bool: + """Same logic as reset_radar_state but avoids any Tk UI calls (messagebox). + + This variant is safe to call from a background thread; failures are + logged and a boolean is returned so the caller can decide how to + notify the user on the main thread. + """ + target_comm = getattr(self.communicator_manager, "target_communicator", None) + if not target_comm or not getattr(target_comm, "is_open", False): + try: + self.logger.error("Cannot reset radar state: communicator is not connected.") + except Exception: + pass + return False + + try: + use_json = bool(getattr(target_comm, "_use_json_protocol", False)) + except Exception: + use_json = False + + def _wait_for_clear(timeout_s: float = 3.0, poll_interval: float = 0.2) -> bool: + waited = 0.0 + while waited < timeout_s: + try: + if not self.simulation_hub.has_active_real_targets(): + return True + except Exception: + try: + self.logger.debug("Error while querying simulation_hub during reset wait", exc_info=True) + except Exception: + pass + time.sleep(poll_interval) + waited += poll_interval + return False + + # Legacy textual path + if not use_json: + cmd = "tgtset /-s\n" + try: + self.logger.info("Sending legacy reset command: %s", cmd.strip()) + except Exception: + pass + try: + ok = target_comm.send_commands([cmd]) + except Exception: + try: + self.logger.exception("Failed to send legacy reset command") + except Exception: + pass + ok = False + + if not ok: + try: + self.logger.error("Failed to send reset command to the radar.") + except Exception: + pass + return False + + cleared = _wait_for_clear() + return bool(cleared) + + # JSON-capable communicator path + try: + json_reset = '{"CMD":"reset"}\n' + try: + self.logger.info("Sending JSON reset command: %s", json_reset.strip()) + except Exception: + pass + try: + ok = target_comm.send_commands([json_reset]) + except Exception: + try: + self.logger.exception("Failed to send JSON reset command") + except Exception: + pass + ok = False + + if ok: + cleared = _wait_for_clear() + if cleared: + try: + self.logger.info("JSON reset acknowledged: server cleared active flags.") + except Exception: + pass + return True + except Exception: + try: + self.logger.exception("Unexpected error while attempting JSON reset") + except Exception: + pass + + # 3) Build and send per-target JSON payloads that explicitly clear active flag + try: + try: + self.logger.info("Building per-target JSON reset payloads to clear active flags on all targets.") + except Exception: + pass + json_payloads = command_builder.build_json_reset_ids() + except Exception: + try: + self.logger.exception("Failed to build per-target JSON reset payloads") + except Exception: + pass + return False + + all_sent_ok = True + for i, payload in enumerate(json_payloads): + p = payload if payload.endswith("\n") else payload + "\n" + try: + self.logger.info("Sending JSON reset payload part %d/%d", i + 1, len(json_payloads)) + except Exception: + pass + try: + ok = target_comm.send_commands([p]) + except Exception: + try: + self.logger.exception("Exception while sending JSON reset payload part %d", i + 1) + except Exception: + pass + ok = False + if not ok: + all_sent_ok = False + try: + self.logger.error("Failed to send JSON reset payload part %d", i + 1) + except Exception: + pass + break + + if not all_sent_ok: + try: + self.logger.error("Failed to send one or more JSON reset payloads to the radar.") + except Exception: + pass + return False + + cleared = _wait_for_clear() + return bool(cleared) + def start_simulation(self, main_view): """Start live simulation using data from main_view (keeps compatibility).""" if main_view.is_simulation_running.get(): @@ -268,81 +413,193 @@ class SimulationController: except Exception: pass - # Reset radar state first - if not self.reset_radar_state(main_view): + # Do UI updates (disable/notify) and run heavy ops in background + def _background_start(): + # Reset radar state in background + ok = self._reset_radar_state_no_ui(main_view) + if not ok: + try: + self.logger.error("Aborting simulation start due to radar reset failure.") + except Exception: + pass + try: + main_view.after(0, lambda: messagebox.showerror("Reset Error", "Failed to reset radar before starting simulation.")) + except Exception: + pass + try: + main_view.after(0, lambda: main_view._update_button_states()) + except Exception: + pass + return + + time.sleep(1) + + # Send initial scenario try: - self.logger.error("Aborting simulation start due to radar reset failure.") + self.logger.info("Sending initial scenario state before starting live updates...") except Exception: pass - return + try: + sent = target_comm.send_scenario(main_view.scenario) + except Exception: + sent = False - time.sleep(1) + if not sent: + try: + self.logger.error("Failed to send initial scenario state. Aborting live simulation start.") + except Exception: + pass + try: + main_view.after(0, lambda: messagebox.showerror("Send Error", "Failed to send the initial scenario configuration. Cannot start live simulation.")) + except Exception: + pass + try: + main_view.after(0, lambda: main_view._update_button_states()) + except Exception: + pass + return + # Initialize engine + try: + engine = SimulationEngine(communicator=target_comm, simulation_hub=self.simulation_hub) + engine.set_time_multiplier(main_view.time_multiplier) + engine.set_update_interval(update_interval) + engine.load_scenario(main_view.scenario) + self.simulation_engine = engine + except Exception: + try: + self.logger.exception("Failed to initialize SimulationEngine") + except Exception: + pass + try: + main_view.after(0, lambda: messagebox.showerror("Engine Error", "Failed to initialize simulation engine.")) + except Exception: + pass + try: + main_view.after(0, lambda: main_view._update_button_states()) + except Exception: + pass + return + + # Compute durations, archive, and start engine thread + try: + durations = [getattr(t, "_total_duration_s", 0.0) for t in main_view.scenario.get_all_targets()] + total_sim_time = max(durations) if durations else 0.0 + except Exception: + total_sim_time = 0.0 + + try: + current_archive = SimulationArchive(main_view.scenario) + engine.archive = current_archive + if target_comm and hasattr(target_comm, "router"): + router = target_comm.router() + if router: + router.set_archive(current_archive) + except Exception: + current_archive = None + + try: + engine.start() + except Exception: + try: + self.logger.exception("Failed to start simulation engine thread") + except Exception: + pass + try: + main_view.after(0, lambda: messagebox.showerror("Start Error", "Failed to start simulation engine thread.")) + except Exception: + pass + try: + main_view.after(0, lambda: main_view._update_button_states()) + except Exception: + pass + return + + # Finalize on main thread + def _on_started(): + try: + main_view.simulation_engine = engine + self.current_archive = current_archive + main_view.current_archive = current_archive + main_view.total_sim_time = total_sim_time + main_view.sim_elapsed_time = 0.0 + try: + main_view.sim_slider_var.set(0.0) + except Exception: + pass + try: + main_view._update_simulation_progress_display() + except Exception: + pass + try: + if hasattr(main_view, "ppi_widget") and main_view.ppi_widget: + main_view.ppi_widget.clear_previews() + except Exception: + pass + main_view.is_simulation_running.set(True) + try: + main_view._update_button_states() + except Exception: + pass + except Exception: + pass + + try: + main_view.after(0, _on_started) + except Exception: + pass + + # UI: show status and disable controls before background work try: - self.logger.info("Sending initial scenario state before starting live updates...") + main_view.after(0, lambda: main_view.status_bar.show_status_message("Starting simulation...", timeout_ms=0)) except Exception: pass - if not target_comm.send_scenario(main_view.scenario): - try: - self.logger.error("Failed to send initial scenario state. Aborting live simulation start.") - messagebox.showerror( - "Send Error", - "Failed to send the initial scenario configuration. Cannot start live simulation.", - ) - except Exception: - pass + try: + main_view.after(0, lambda: main_view._update_button_states()) + except Exception: + pass + + t = threading.Thread(target=_background_start, daemon=True) + t.start() + + def stop_simulation(self, main_view): + if not main_view.is_simulation_running.get() or not getattr(main_view, "simulation_engine", None): return try: - self.logger.info("Initial scenario state sent successfully.") - except Exception: - pass + if main_view.current_archive: + try: + main_view.current_archive.save() + except Exception: + pass + main_view.current_archive = None + try: + target_comm = getattr(self.communicator_manager, "target_communicator", None) + if target_comm and hasattr(target_comm, "router"): + router = target_comm.router() + if router: + router.set_archive(None) + except Exception: + pass + try: + main_view._refresh_analysis_list() + except Exception: + pass - try: - self.logger.info("Starting live simulation...") - except Exception: - pass - - try: - main_view.scenario.reset_simulation() - except Exception: - pass - - # Create engine and start - try: - engine = SimulationEngine(communicator=target_comm, simulation_hub=self.simulation_hub) - engine.set_time_multiplier(main_view.time_multiplier) - engine.set_update_interval(update_interval) - engine.load_scenario(main_view.scenario) - main_view.simulation_engine = engine - self.simulation_engine = engine - except Exception: try: - self.logger.exception("Failed to initialize SimulationEngine") + main_view.simulation_engine.stop() except Exception: pass - return - - # Compute total duration - try: - durations = [getattr(t, "_total_duration_s", 0.0) for t in main_view.scenario.get_all_targets()] - main_view.total_sim_time = max(durations) if durations else 0.0 - except Exception: - main_view.total_sim_time = 0.0 - - # Reset slider and labels - main_view.sim_elapsed_time = 0.0 - try: - main_view.sim_slider_var.set(0.0) - except Exception: - pass - try: - main_view._update_simulation_progress_display() + main_view.simulation_engine = None + self.simulation_engine = None except Exception: pass - # Archive wiring + main_view.is_simulation_running.set(False) try: + main_view._update_button_states() + except Exception: + pass self.current_archive = SimulationArchive(main_view.scenario) self.simulation_engine.archive = self.current_archive if target_comm and hasattr(target_comm, "router"): diff --git a/tests/simulation/test_simulation_controller.py b/tests/simulation/test_simulation_controller.py new file mode 100644 index 0000000..f8c6af1 --- /dev/null +++ b/tests/simulation/test_simulation_controller.py @@ -0,0 +1,175 @@ +import types +import time + +import pytest + +from target_simulator.simulation.simulation_controller import SimulationController + + +class DummyVar: + def __init__(self, value=None): + self._v = value + + def get(self): + return self._v + + def set(self, v): + self._v = v + + +class DummyComm: + def __init__(self, use_json=False, send_results=None): + self.is_open = True + self._use_json_protocol = use_json + # send_results can be a list or a callable + self._send_calls = [] + self._send_results = send_results + + def send_commands(self, cmds): + self._send_calls.append(cmds) + if callable(self._send_results): + return self._send_results(cmds) + if isinstance(self._send_results, list): + if len(self._send_results) >= len(self._send_calls): + return self._send_results[len(self._send_calls) - 1] + return self._send_results[-1] + return True + + def send_scenario(self, scenario): + return True + + def router(self): + return None + + +class DummyCommManager: + def __init__(self, target_comm=None): + self.target_communicator = target_comm + + +class DummyHub: + def __init__(self, active_sequence=None): + # active_sequence: iterable of booleans returned by has_active_real_targets + self._seq = list(active_sequence) if active_sequence is not None else [] + + def has_active_real_targets(self): + if self._seq: + return self._seq.pop(0) + return False + + def reset(self): + pass + + +class DummyScenario: + def __init__(self): + class T: + _total_duration_s = 5.0 + + self.targets = [T()] + + def get_all_targets(self): + return self.targets + + def reset_simulation(self): + pass + + +class DummyMainView: + def __init__(self): + self.is_simulation_running = DummyVar(False) + self.update_time = DummyVar(1.0) + self.time_multiplier = 1.0 + self.scenario = DummyScenario() + self.ppi_widget = types.SimpleNamespace(clear_trails=lambda: None, clear_previews=lambda: None) + self.sim_slider_var = DummyVar(0.0) + self.simulation_engine = None + self.current_archive = None + + def _update_simulation_progress_display(self): + pass + + def _update_button_states(self): + pass + + def _refresh_analysis_list(self): + pass + + +def test_reset_returns_false_when_not_connected(): + hub = DummyHub() + cm = DummyCommManager(None) + controller = SimulationController(cm, hub, config_manager=None, logger=None) + mv = DummyMainView() + assert controller.reset_radar_state(mv) is False + + +def test_legacy_reset_success(monkeypatch): + # Legacy communicator: _use_json_protocol False + comm = DummyComm(use_json=False, send_results=[True]) + cm = DummyCommManager(comm) + hub = DummyHub(active_sequence=[False]) + controller = SimulationController(cm, hub, config_manager=None, logger=None) + mv = DummyMainView() + assert controller.reset_radar_state(mv) is True + + +def test_json_reset_fallback_to_per_target(monkeypatch): + # JSON communicator that fails simple reset but succeeds per-target + results = [False, True] # first send (json reset) -> False, second (per-target) -> True + + def send_results(cmds): + # pop result accordingly + return results.pop(0) + + comm = DummyComm(use_json=True, send_results=send_results) + cm = DummyCommManager(comm) + # hub returns False (cleared) after per-target + hub = DummyHub(active_sequence=[True, False]) + + # Patch command_builder.build_json_reset_ids to return two payloads + import target_simulator.simulation.simulation_controller as scmod + + monkeypatch.setattr(scmod.command_builder, "build_json_reset_ids", lambda: ["{\"id\":1}"]) + + controller = SimulationController(cm, hub, config_manager=None, logger=None) + mv = DummyMainView() + assert controller.reset_radar_state(mv) is True + + +def test_start_simulation_creates_engine_and_sets_running(monkeypatch): + # Patch SimulationEngine used in controller to a dummy engine + class DummyEngine: + def __init__(self, communicator=None, simulation_hub=None): + self.started = False + + def set_time_multiplier(self, v): + pass + + def set_update_interval(self, v): + pass + + def load_scenario(self, s): + pass + + def start(self): + self.started = True + + def stop(self): + self.started = False + + def is_running(self): + return self.started + + monkeypatch.setattr("target_simulator.simulation.simulation_controller.SimulationEngine", DummyEngine) + + comm = DummyComm(use_json=False, send_results=[True]) + cm = DummyCommManager(comm) + hub = DummyHub() + controller = SimulationController(cm, hub, config_manager=None, logger=None) + mv = DummyMainView() + # start simulation + controller.start_simulation(mv) + # engine should be created and main_view.simulation_engine set + assert mv.simulation_engine is not None + assert mv.is_simulation_running.get() is True