# target_simulator/gui/main_view.py """ Main view of the application. This module implements the Tkinter-based main window (``MainView``) which coordinates the GUI, the simulation controller, communicators and the central SimulationStateHub. Responsibilities include: - constructing and laying out the primary widgets (PPI, scenario editor, connection panel, controls), - initializing communicators and the simulation engine, - providing a periodic GUI refresh loop that pulls data from the ``SimulationStateHub`` and updates visual components, - wiring user actions (start/stop simulation, load/save scenario, connect/disconnect) to controller logic. The module exposes the ``MainView`` class which is the application's main Tkinter ``Tk`` instance. The class is designed to be instantiated and run via ``python -m target_simulator`` which calls ``MainView.mainloop()``. Note: the GUI mutates shared state (e.g., ``simulation_hub``) and therefore must run on the main thread. Methods on this class generally have side effects (updating widgets, storing settings, starting/stopping threads) and do not return useful values. """ import tkinter as tk from tkinter import ttk, scrolledtext, messagebox from queue import Queue, Empty from typing import Optional, Dict, Any, List import time import math import os import json import sys from datetime import datetime # Use absolute imports for robustness and clarity from target_simulator.gui.ppi_display import PPIDisplay from target_simulator.gui.ppi_adapter import build_display_data from target_simulator.gui.connection_settings_window import ConnectionSettingsWindow from target_simulator.gui.radar_config_window import RadarConfigWindow from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame from target_simulator.gui.target_list_frame import TargetListFrame from target_simulator.gui.connection_panel import ConnectionPanel from target_simulator.gui.status_bar import StatusBar from target_simulator.gui.simulation_controls import SimulationControls from target_simulator.core.communicator_interface import CommunicatorInterface from target_simulator.core.serial_communicator import SerialCommunicator from target_simulator.core.tftp_communicator import TFTPCommunicator from target_simulator.core.simulation_engine import SimulationEngine from target_simulator.core.models import Scenario, Target from target_simulator.utils.logger import get_logger, shutdown_logging_system from target_simulator.utils.config_manager import ConfigManager from target_simulator.gui.sfp_debug_window import SfpDebugWindow from target_simulator.gui.logger_panel import LoggerPanel from target_simulator.core.sfp_communicator import SFPCommunicator from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.gui.analysis_window import AnalysisWindow from target_simulator.core import command_builder from target_simulator.analysis.simulation_archive import SimulationArchive from target_simulator.communication.communicator_manager import CommunicatorManager from target_simulator.simulation.simulation_controller import SimulationController # --- Import Version Info FOR THE WRAPPER ITSELF --- try: from target_simulator import _version as wrapper_version WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})" except ImportError: WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)" GUI_REFRESH_RATE_MS = 40 class MainView(tk.Tk): """ Main application window and controller. This class composes the primary UI and wires it to application logic: communicators, the SimulationController and SimulationEngine. It owns a :class:`SimulationStateHub` instance used application-wide. Key responsibilities: - build and layout widgets (PPI display, scenario controls, simulation controls), - initialize communicators via :class:`CommunicatorManager`, - start/stop simulations via :class:`SimulationController`, - periodically refresh GUI elements from the simulation hub. Threading/side-effects: - Instantiating MainView will start Tk's mainloop when ``mainloop()`` is called; GUI updates must run on the main thread. - Many methods update application state and widgets; they return ``None``. """ def __init__(self): super().__init__() self.logger = get_logger(__name__) self.config_manager = ConfigManager() self.current_archive: Optional[SimulationArchive] = None # --- Load Settings --- settings = self.config_manager.get_general_settings() self.scan_limit = settings.get("scan_limit", 60) self.max_range = settings.get("max_range", 100) self.connection_config = self.config_manager.get_connection_settings() target_sfp_cfg = self.connection_config.get("target", {}).get("sfp", {}) self.prediction_offset_ms = target_sfp_cfg.get("prediction_offset_ms", 0.0) # --- Initialize the data hub and controllers --- self.simulation_hub = SimulationStateHub() self.communicator_manager = CommunicatorManager( simulation_hub=self.simulation_hub, logger=self.logger, defer_sfp_connection=True, ) self.communicator_manager.set_config(self.connection_config) self.simulation_controller = SimulationController( communicator_manager=self.communicator_manager, simulation_hub=self.simulation_hub, config_manager=self.config_manager, logger=self.logger, ) # --- Core Logic Handlers --- self.target_communicator: Optional[CommunicatorInterface] = None self.lru_communicator: Optional[CommunicatorInterface] = None self.scenario = Scenario() self.current_scenario_name: Optional[str] = None self.sfp_debug_window: Optional[SfpDebugWindow] = None self.analysis_window: Optional[AnalysisWindow] = None # --- Simulation Engine --- self.simulation_engine: Optional[SimulationEngine] = None self.is_simulation_running = tk.BooleanVar(value=False) self._start_in_progress_main = False # Guard for start button self.time_multiplier = 1.0 self.update_time = tk.DoubleVar(value=1.0) # Simulation progress tracking self.total_sim_time = 0.0 self.sim_elapsed_time = 0.0 self.sim_slider_var = tk.DoubleVar(value=0.0) self._slider_is_dragging = False # --- Window and UI Setup --- self.title(f"Radar Target Simulator") self.geometry(settings.get("geometry", "1200x900")) self.minsize(1024, 768) self._create_menubar() self._create_main_layout() self._create_statusbar() self._status_after_id = None # --- Post-UI Initialization --- self._initialize_communicators() self._load_scenarios_into_ui() # Determine initial scenario to load. Prefer last_selected_scenario # from settings if it exists and is valid; otherwise use the # combobox selection (which `update_scenario_list` set to the # first available scenario when no explicit selection was provided). last_scenario = settings.get("last_selected_scenario") scenario_to_load = None try: available = self.config_manager.get_scenario_names() if last_scenario and last_scenario in available: scenario_to_load = last_scenario else: # fallback to whatever the scenario_controls combobox currently # has selected (update_scenario_list should have set it) if hasattr(self, "scenario_controls") and getattr( self.scenario_controls, "current_scenario", None ): val = self.scenario_controls.current_scenario.get() if val: scenario_to_load = val except Exception: scenario_to_load = None if scenario_to_load: self._on_load_scenario(scenario_to_load) self._update_window_title() self.protocol("WM_DELETE_WINDOW", self._on_closing) self.logger.info("MainView initialized successfully.") self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop) self.after(1000, self._update_rate_status) self.after(1000, self._update_latency_status) def _create_main_layout(self): """Construct the main window layout and compose child widgets. This method creates the primary panes, instantiates child widgets such as the `PPIDisplay`, `ScenarioControlsFrame`, `TargetListFrame` and `SimulationControls`, and places the log pane at the bottom. """ v_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) v_pane.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.h_pane = ttk.PanedWindow(v_pane, orient=tk.HORIZONTAL) v_pane.add(self.h_pane, weight=4) # --- Right Pane (connection panel + PPI) --- right_container = ttk.Frame(self.h_pane) self.h_pane.add(right_container, weight=2) self.connection_panel = ConnectionPanel( right_container, initial_config=self.connection_config ) self.connection_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) self.connection_panel.set_connect_handler(self._on_connect_button) self.connection_panel.set_open_settings_handler(self._open_settings) self.ppi_widget = PPIDisplay( right_container, max_range_nm=self.max_range, scan_limit_deg=self.scan_limit ) self.ppi_widget.pack( side=tk.TOP, fill=tk.BOTH, expand=True, padx=2, pady=(2, 5) ) # --- Left Pane --- left_pane_container = ttk.Frame(self.h_pane) self.h_pane.add(left_pane_container, weight=1) # Place the ScenarioControlsFrame above the notebook so it is always visible # regardless of which tab is selected. self.scenario_controls = ScenarioControlsFrame( left_pane_container, main_view=self, load_scenario_command=self._on_load_scenario, save_as_command=self._on_save_scenario_as, delete_command=self._on_delete_scenario, new_scenario_command=self._on_new_scenario, ) self.scenario_controls.pack(fill=tk.X, expand=False, padx=5, pady=(5, 5)) left_notebook = ttk.Notebook(left_pane_container) left_notebook.pack(fill=tk.BOTH, expand=True) # --- TAB 1: Editing scenario (keeps scenario-related tools in the main # controls area above the notebook; the tab itself is used for editing) scenario_tab = ttk.Frame(left_notebook) left_notebook.add(scenario_tab, text="Editing scenario") self.target_list = TargetListFrame( scenario_tab, targets_changed_callback=self._on_targets_changed ) self.target_list.pack(fill=tk.BOTH, expand=True, padx=5) # --- TAB 2: SIMULATION --- simulation_tab = ttk.Frame(left_notebook) left_notebook.add(simulation_tab, text="Simulation") self.simulation_controls = SimulationControls(simulation_tab, self) self.simulation_controls.pack(fill=tk.X, padx=5, pady=10, anchor="n") # Make controls directly accessible for backward compatibility self.start_button = self.simulation_controls.start_button self.stop_button = self.simulation_controls.stop_button self.reset_radar_button = self.simulation_controls.reset_radar_button # --- TAB 3: LRU SIMULATION (unchanged) --- lru_tab = ttk.Frame(left_notebook) left_notebook.add(lru_tab, text="LRU Simulation") # ... widgets for LRU ... # --- TAB 4: Analysis --- analysis_tab = ttk.Frame(left_notebook) left_notebook.add(analysis_tab, text="Analysis") self._create_analysis_tab_widgets(analysis_tab) # --- Bottom Pane (Logs) --- log_frame_container = ttk.LabelFrame(v_pane, text="Logs") v_pane.add(log_frame_container, weight=1) self.log_text_widget = scrolledtext.ScrolledText( log_frame_container, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9) ) self.log_text_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) def _create_analysis_tab_widgets(self, parent): """Create widgets for the Analysis tab (treeview + controls). Args: parent (tk.Widget): the parent container for the analysis tab. """ # Place the tree inside a container so we can attach scrollbars neatly tree_container = ttk.Frame(parent) tree_container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Vertical scrollbar vscroll = ttk.Scrollbar(tree_container, orient=tk.VERTICAL) self.analysis_tree = ttk.Treeview( tree_container, columns=("datetime", "scenario", "duration"), show="headings", yscrollcommand=vscroll.set, ) # Configure vertical scrollbar to control the tree vscroll.config(command=self.analysis_tree.yview) self.analysis_tree.heading("datetime", text="Date/Time") self.analysis_tree.heading("scenario", text="Scenario Name") self.analysis_tree.heading("duration", text="Duration (s)") self.analysis_tree.column("datetime", width=150) self.analysis_tree.column("scenario", width=200) self.analysis_tree.column("duration", width=80, anchor=tk.E) # Bind double-click to open analysis self.analysis_tree.bind("", lambda e: self._on_analyze_run()) # Use grid inside the container to place tree + scrollbars tree_container.grid_rowconfigure(0, weight=1) tree_container.grid_columnconfigure(0, weight=1) self.analysis_tree.grid(row=0, column=0, sticky="nsew") vscroll.grid(row=0, column=1, sticky="ns") btn_frame = ttk.Frame(parent) btn_frame.pack(fill=tk.X, padx=5, pady=5) ttk.Button( btn_frame, text="Refresh List", command=self._refresh_analysis_list ).pack(side=tk.LEFT) ttk.Button( btn_frame, text="Open Archive Folder", command=self._open_archive_folder ).pack(side=tk.LEFT, padx=(6, 0)) ttk.Button( btn_frame, text="Analyze Selected", command=self._on_analyze_run ).pack(side=tk.RIGHT) self.after(100, self._refresh_analysis_list) def _create_menubar(self): """Create and attach the top-level menubar with Settings and Debug menus.""" menubar = tk.Menu(self) self.config(menu=menubar) settings_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Settings", menu=settings_menu) settings_menu.add_command(label="Connection...", command=self._open_settings) settings_menu.add_command( label="Radar Config...", command=self._open_radar_config ) debug_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Debug", menu=debug_menu) debug_menu.add_command( label="SFP Packet Inspector...", command=self._open_sfp_debug_window ) debug_menu.add_command( label="Logger Levels...", command=self._open_logger_panel ) def _create_statusbar(self): """Create and place the application's status bar widget and expose vars.""" self.status_bar = StatusBar(self) self.status_bar.place(relx=0.0, rely=1.0, anchor="sw", relwidth=1.0, height=24) self.status_var = self.status_bar.status_var self.rate_status_var = self.status_bar.rate_status_var self.latency_status_var = self.status_bar.latency_status_var self._status_after_id = None def show_status_message(self, text: str, timeout_ms: Optional[int] = 3000): """Show a transient status message in the status bar. Args: text (str): Text to display. timeout_ms (Optional[int]): How long (ms) the message remains visible. """ if hasattr(self, "status_bar") and self.status_bar: self.status_bar.show_status_message(text, timeout_ms=timeout_ms) def clear_status_message(self): """Clear the current status message and set a default 'Ready' state.""" if hasattr(self, "status_bar") and self.status_bar: self.status_bar.show_status_message("Ready", timeout_ms=0) def _update_window_title(self): """Update the main window title to reflect current scenario and version.""" base_title = f"Radar Target Simulator- {WRAPPER_APP_VERSION_STRING}" if self.current_scenario_name: self.title(f"{base_title} - {self.current_scenario_name}") else: self.title(base_title) def _on_connection_state_change(self, is_connected: bool): """Handle connection state changes propagated from the CommunicatorManager. Updates the connection panel, status bar and any open debug windows. """ self.logger.info( f"MainView received connection state change: Connected={is_connected}" ) if hasattr(self, "status_bar"): self.status_bar.set_target_connected(is_connected) if hasattr(self, "connection_panel"): self.connection_panel.update_toggle_state(is_connected) if self.sfp_debug_window and self.sfp_debug_window.winfo_exists(): if hasattr(self.sfp_debug_window, "update_toggle_state"): self.sfp_debug_window.update_toggle_state(is_connected) def _initialize_communicators(self): """Initialize communicators via the CommunicatorManager and wire callbacks. Returns nothing; updates `self.target_communicator` and `self.lru_communicator`. """ t_comm, t_connected, l_comm, l_connected = ( self.communicator_manager.initialize_communicators() ) self.target_communicator = t_comm self.lru_communicator = l_comm if hasattr(self, "status_bar"): self.status_bar.set_target_connected(t_connected) self.status_bar.set_lru_connected(l_connected) self.communicator_manager.add_connection_state_callback( self._on_connection_state_change ) def update_connection_settings(self, new_config: Dict[str, Any]): """Apply and persist new connection settings supplied by the settings dialog. Args: new_config (Dict[str, Any]): New configuration dictionary for target/lru. """ self.logger.info(f"Updating connection settings: {new_config}") self.connection_config = new_config self.config_manager.save_connection_settings(new_config) target_sfp_cfg = self.connection_config.get("target", {}).get("sfp", {}) self.prediction_offset_ms = target_sfp_cfg.get("prediction_offset_ms", 0.0) if hasattr(self, "connection_panel"): self.connection_panel.update_summary(self.connection_config) self.communicator_manager.set_config(new_config) self._initialize_communicators() def _open_settings(self): """Open the modal ConnectionSettingsWindow to edit connection config.""" self.logger.info("Opening connection settings window.") ConnectionSettingsWindow(self, self.config_manager, self.connection_config) def _on_connect_button(self): """Toggle connection: attempt connect or issue disconnect request.""" self.logger.info("Connection toggle requested by user.") if self.target_communicator and self.target_communicator.is_open: self.communicator_manager.disconnect_target() else: if not self.communicator_manager.connect_target(): messagebox.showerror( "Connection Failed", "Could not connect. Check settings and logs.", ) def _reset_radar_state(self): self.simulation_controller.reset_radar_state(self) def _on_start_simulation(self): """Request start of the currently loaded scenario via SimulationController.""" if self._start_in_progress_main: self.logger.info("Start already in progress; ignoring duplicate request.") return # Ensure the scenario currently selected in the ScenarioControlsFrame # is loaded before starting the simulation. This makes the UI's # visible combobox the single source-of-truth for the selected scenario. try: if hasattr(self, "scenario_controls") and getattr( self.scenario_controls, "current_scenario", None ): sel = self.scenario_controls.current_scenario.get() if sel and sel != self.current_scenario_name: self._on_load_scenario(sel) except Exception: # Non-fatal: proceed with existing scenario if anything goes wrong pass self._start_in_progress_main = True self.simulation_controller.start_simulation(self) def _on_stop_simulation(self): """Request the simulation controller to stop the running simulation.""" self.simulation_controller.stop_simulation(self) def _on_simulation_finished(self): self.simulation_controller.on_simulation_finished(self) def _on_reset_simulation(self): """Reset the simulation and related UI to the scenario's initial state.""" self.logger.info("Resetting scenario to initial state.") self.simulation_controls.hide_notice() if self.is_simulation_running.get(): self._on_stop_simulation() self.simulation_hub.reset() self.ppi_widget.clear_trails() self.scenario.reset_simulation() self._update_all_views() self.show_status_message("Scenario reset to initial state.", timeout_ms=3000) def _update_button_states(self): """Enable/disable UI buttons based on simulation and start-in-progress state.""" is_running = self.is_simulation_running.get() start_in_progress = self._start_in_progress_main start_state = tk.DISABLED if (is_running or start_in_progress) else tk.NORMAL stop_state = tk.NORMAL if is_running else tk.DISABLED controls_state = ( "readonly" if not (is_running or start_in_progress) else tk.DISABLED ) button_controls_state = ( tk.NORMAL if not (is_running or start_in_progress) else tk.DISABLED ) self.start_button.config(state=start_state) self.stop_button.config(state=stop_state) self.reset_radar_button.config(state=button_controls_state) self.scenario_controls.new_button.config(state=button_controls_state) self.scenario_controls.save_button.config(state=button_controls_state) self.scenario_controls.save_as_button.config(state=button_controls_state) self.scenario_controls.delete_button.config(state=button_controls_state) self.scenario_controls.scenario_combobox.config(state=controls_state) self.target_list.add_button.config(state=button_controls_state) self.target_list.remove_button.config(state=button_controls_state) self.target_list.edit_button.config(state=button_controls_state) self.target_list.tree.config( selectmode="browse" if not (is_running or start_in_progress) else "none" ) def _update_simulation_progress_display(self): """Update the labels that show elapsed and total simulation time.""" try: elapsed = self.sim_elapsed_time total = self.total_sim_time self.simulation_controls.sim_elapsed_label.config(text=f"{elapsed:.1f}s") self.simulation_controls.sim_total_label.config(text=f"{total:.1f}s") except Exception: pass def _update_rate_status(self): try: real_rate = ( self.simulation_hub.get_real_rate(1.0) if self.simulation_hub else 0.0 ) packet_rate = ( self.simulation_hub.get_packet_rate(1.0) if self.simulation_hub else 0.0 ) ppi_rate = ( self.ppi_widget.get_real_update_rate(1.0) if hasattr(self, "ppi_widget") else 0.0 ) if self.rate_status_var: if packet_rate > 0.0: self.rate_status_var.set( f"pkt in: {packet_rate:.1f} pkt/s | ev in: {real_rate:.1f} ev/s | ppi upd: {ppi_rate:.1f} upd/s" ) else: self.rate_status_var.set( f"real in: {real_rate:.1f} ev/s | ppi upd: {ppi_rate:.1f} upd/s" ) except Exception as e: self.logger.debug(f"Error updating rate status: {e}") finally: self.after(1000, self._update_rate_status) def _on_seek(self): """Seek to a new time within the current simulation when slider is moved.""" if not self.simulation_engine or not self.simulation_engine.scenario: return frac = float(self.simulation_controls.sim_slider_var.get()) new_time = max(0.0, min(self.total_sim_time, frac * self.total_sim_time)) self.simulation_engine.set_simulation_time(new_time) self.sim_elapsed_time = new_time self._update_simulation_progress_display() def _on_targets_changed(self, targets: List[Target]): """Callback invoked when the target list is modified in the UI. Updates the in-memory scenario targets and persists the scenario if it has a name. """ self.scenario.targets = {t.target_id: t for t in targets} self.ppi_widget.update_simulated_targets(targets) if self.current_scenario_name: self.config_manager.save_scenario( self.current_scenario_name, self.scenario.to_dict() ) def _update_all_views(self, targets_to_display: Optional[List[Target]] = None): """Refresh all UI views (title, target list, PPI display) with current data.""" self._update_window_title() if targets_to_display is None: targets_to_display = self.scenario.get_all_targets() self.target_list.update_target_list(targets_to_display) self.ppi_widget.update_simulated_targets(targets_to_display) def _load_scenarios_into_ui(self): scenario_names = self.config_manager.get_scenario_names() self.scenario_controls.update_scenario_list( scenario_names, self.current_scenario_name ) if hasattr(self, "simulation_controls") and hasattr( self.simulation_controls, "sim_scenario_combobox" ): self.simulation_controls.sim_scenario_combobox["values"] = scenario_names def _on_load_scenario(self, scenario_name: str): if self.is_simulation_running.get(): self._on_stop_simulation() self.logger.info(f"Loading scenario: {scenario_name}") scenario_data = self.config_manager.get_scenario(scenario_name) if scenario_data: self.simulation_hub.clear_simulated_data() self.ppi_widget.clear_trails() self.ppi_widget.update_simulated_targets([]) self.scenario = Scenario.from_dict(scenario_data) self.current_scenario_name = scenario_name # Ensure the always-visible ScenarioControlsFrame combobox reflects # the scenario that was just loaded (keeps title and combobox in sync) try: if ( hasattr(self, "scenario_controls") and getattr(self.scenario_controls, "current_scenario", None) is not None ): self.scenario_controls.current_scenario.set(scenario_name) # Also update the combobox widget value explicitly try: self.scenario_controls.scenario_combobox.set(scenario_name) except Exception: pass except Exception: pass self.target_list.update_target_list(self.scenario.get_all_targets()) self._update_all_views() if hasattr(self, "simulation_controls") and hasattr( self.simulation_controls, "sim_scenario_combobox" ): self.simulation_controls.sim_scenario_combobox.set(scenario_name) self.ppi_widget.draw_scenario_preview(self.scenario) else: self.logger.warning( f"Attempted to load a non-existent scenario: {scenario_name}" ) def _on_save_scenario(self, scenario_name: str): self.logger.info(f"Saving scenario: {scenario_name}") self.scenario.name = scenario_name self.scenario.targets = {t.target_id: t for t in self.target_list.get_targets()} self.config_manager.save_scenario(scenario_name, self.scenario.to_dict()) self.current_scenario_name = scenario_name self._load_scenarios_into_ui() self._update_window_title() messagebox.showinfo( "Success", f"Scenario '{scenario_name}' saved.", parent=self ) def _on_save_scenario_as(self, scenario_name: str): self._on_save_scenario(scenario_name) def _on_new_scenario(self, scenario_name: str): if scenario_name in self.config_manager.get_scenario_names(): messagebox.showinfo( "Duplicate", f"Scenario '{scenario_name}' already exists.", parent=self ) self._on_load_scenario(scenario_name) return self.logger.info(f"Creating new scenario: {scenario_name}") self.scenario = Scenario(name=scenario_name) self.current_scenario_name = scenario_name self._update_all_views() names = list(self.config_manager.get_scenario_names()) + [scenario_name] self.scenario_controls.update_scenario_list( names, select_scenario=scenario_name ) if hasattr(self, "simulation_controls") and hasattr( self.simulation_controls, "sim_scenario_combobox" ): self.simulation_controls.sim_scenario_combobox["values"] = names self.simulation_controls.sim_scenario_combobox.set(scenario_name) def _on_delete_scenario(self, scenario_name: str): self.logger.info(f"Deleting scenario: {scenario_name}") self.config_manager.delete_scenario(scenario_name) if self.current_scenario_name == scenario_name: self.current_scenario_name = None self.scenario = Scenario() self._update_all_views() self._load_scenarios_into_ui() def _on_send_lru_status(self): pass def _open_radar_config(self): dialog = RadarConfigWindow( self, current_scan_limit=self.scan_limit, current_max_range=self.max_range ) if dialog.scan_limit is not None and dialog.max_range is not None: if ( self.scan_limit != dialog.scan_limit or self.max_range != dialog.max_range ): self.scan_limit = dialog.scan_limit self.max_range = dialog.max_range self.ppi_widget.reconfigure_radar( max_range_nm=self.max_range, scan_limit_deg=self.scan_limit ) def _on_closing(self): """Handles the window closing event.""" self.logger.info("Application shutting down.") # Stop the simulation if it's running if self.is_simulation_running.get(): self._on_stop_simulation() # Save window geometry and last scenario settings_to_save = { "scan_limit": self.scan_limit, "max_range": self.max_range, "geometry": self.winfo_geometry(), "last_selected_scenario": self.current_scenario_name, } self.config_manager.save_general_settings(settings_to_save) # --- MODIFIED PART START --- # Gracefully shut down all communicators using the manager if self.communicator_manager: self.communicator_manager.shutdown() # --- MODIFIED PART END --- # Stop other background tasks if hasattr(self, "status_bar") and self.status_bar: self.status_bar.stop_resource_monitor() shutdown_logging_system() self.destroy() def _open_sfp_debug_window(self): if self.sfp_debug_window and self.sfp_debug_window.winfo_exists(): self.sfp_debug_window.lift() return self.sfp_debug_window = SfpDebugWindow(self) def _open_logger_panel(self): LoggerPanel(self) def _build_display_data_from_hub(self) -> Dict[str, List[Target]]: return build_display_data( self.simulation_hub, scenario=self.scenario, engine=self.simulation_engine, ppi_widget=self.ppi_widget, logger=self.logger, ) def _gui_refresh_loop(self): sim_is_running_now = ( self.simulation_engine is not None and self.simulation_engine.is_running() ) if self.is_simulation_running.get() and not sim_is_running_now: self._on_simulation_finished() # Update PPI with the latest data from the hub display_data = self._build_display_data_from_hub() self.ppi_widget.update_simulated_targets(display_data.get("simulated", [])) self.ppi_widget.update_real_targets(display_data.get("real", [])) if self.simulation_hub: # Update antenna sweep line if self.ppi_widget.animate_antenna_var.get(): az_deg, _ = self.simulation_hub.get_antenna_azimuth() try: # Debug: record the value read from the hub and hub identity self.logger.debug( "MainView: read antenna az from hub -> az=%s hub_id=%s", az_deg, id(self.simulation_hub), ) except Exception: pass if az_deg is not None: self.ppi_widget.render_antenna_line(az_deg) else: self.ppi_widget.render_antenna_line(None) # Hide if no data else: self.ppi_widget.render_antenna_line(None) # Hide if animation is off # Update ownship state for both PPI orientation and status display ownship_state = self.simulation_hub.get_ownship_state() if ownship_state: ownship_heading = ownship_state.get("heading_deg", 0.0) self.ppi_widget.update_ownship_state(ownship_heading) self.simulation_controls.update_ownship_display(ownship_state) else: # Ensure display is cleared if no ownship data is present ownship_state = {} self.simulation_controls.update_ownship_display({}) # Update the new active targets table, providing ownship state for context self.simulation_controls.update_targets_table( display_data.get("simulated", []), ownship_state ) if sim_is_running_now: if self.simulation_engine and self.simulation_engine.scenario: times = [ getattr(t, "_sim_time_s", 0.0) for t in self.simulation_engine.scenario.get_all_targets() ] self.sim_elapsed_time = max(times) if times else 0.0 if self.total_sim_time > 0 and not self._slider_is_dragging: progress = min(1.0, self.sim_elapsed_time / self.total_sim_time) self.simulation_controls.sim_slider_var.set(progress) self._update_simulation_progress_display() if hasattr(self, "ppi_widget") and self.ppi_widget.canvas: self.ppi_widget.canvas.draw_idle() self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop) def _refresh_analysis_list(self): """Refreshes the list of archived simulation runs in the analysis tab.""" self.analysis_tree.delete(*self.analysis_tree.get_children()) archive_folder = SimulationArchive.ARCHIVE_FOLDER if not os.path.exists(archive_folder): return runs = [] for filename in os.listdir(archive_folder): # --- MODIFIED PART START --- # Skip performance files, only list main simulation archives if filename.endswith((".perf.csv", ".perf.json")): continue # --- MODIFIED PART END --- if filename.endswith(".json"): filepath = os.path.join(archive_folder, filename) try: with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) metadata = data.get("metadata", {}) # Extract timestamp from filename for sorting try: dt_str = filename.split("_")[0] time_str = filename.split("_")[1] datetime_obj = datetime.strptime( f"{dt_str}_{time_str}", "%Y%m%d_%H%M%S" ) except (IndexError, ValueError): # Fallback to file modification time if filename parsing fails datetime_obj = datetime.fromtimestamp( os.path.getmtime(filepath) ) run_info = { "datetime_obj": datetime_obj, "scenario": metadata.get("scenario_name", "N/A"), "duration": f"{metadata.get('duration_seconds', 0):.1f}", "filepath": filepath, } runs.append(run_info) except Exception as e: self.logger.warning( f"Could not read or parse archive {filename}: {e}" ) # Sort runs by datetime descending (most recent first) for run in sorted(runs, key=lambda r: r["datetime_obj"], reverse=True): self.analysis_tree.insert( "", tk.END, values=( run["datetime_obj"].strftime("%Y-%m-%d %H:%M:%S"), run["scenario"], run["duration"], ), iid=run["filepath"], # Use filepath as a unique identifier for the row ) def _open_archive_folder(self): archive_folder = SimulationArchive.ARCHIVE_FOLDER try: os.makedirs(archive_folder, exist_ok=True) if sys.platform == "win32": os.startfile(os.path.abspath(archive_folder)) elif sys.platform == "darwin": import subprocess subprocess.run(["open", os.path.abspath(archive_folder)]) else: import subprocess subprocess.run(["xdg-open", os.path.abspath(archive_folder)]) except Exception as e: self.logger.exception(f"Failed to open archive folder: {e}") messagebox.showerror( "Error", f"Could not open archive folder automatically." ) def _on_analyze_run(self): selected_item = self.analysis_tree.focus() if not selected_item: messagebox.showinfo( "No Selection", "Please select a simulation run to analyze." ) return archive_filepath = selected_item AnalysisWindow(self, archive_filepath=archive_filepath) def _update_latency_status(self): """Periodically updates the latency display in the status bar.""" try: latency_s = 0.0 if self.target_communicator and hasattr(self.target_communicator, "router"): router = self.target_communicator.router() if router and hasattr(router, "get_estimated_latency_s"): latency_s = router.get_estimated_latency_s() # Update the status bar display if hasattr(self, "latency_status_var") and self.latency_status_var: if latency_s > 0: latency_ms = latency_s * 1000 self.latency_status_var.set(f"Latency: {latency_ms:.1f} ms") else: self.latency_status_var.set("") # Clear if no latency # Update the simulation engine's prediction horizon if it's running if self.simulation_engine and self.simulation_engine.is_running(): # Total horizon = automatic latency + manual offset total_horizon_s = latency_s + (self.prediction_offset_ms / 1000.0) self.simulation_engine.set_prediction_horizon(total_horizon_s) except Exception as e: self.logger.debug(f"Error updating latency status: {e}") finally: # Schedule the next update self.after(1000, self._update_latency_status)