# target_simulator/gui/main_view.py """ Main view of the application, containing the primary window and widgets. """ import tkinter as tk from tkinter import ttk, scrolledtext, messagebox from queue import Queue, Empty from typing import Optional, Dict, Any, List import time import math import os import json from datetime import datetime # Use absolute imports for robustness and clarity from target_simulator.gui.ppi_display import PPIDisplay from target_simulator.gui.ppi_adapter import build_display_data from target_simulator.gui.connection_settings_window import ConnectionSettingsWindow from target_simulator.gui.radar_config_window import RadarConfigWindow from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame from target_simulator.gui.target_list_frame import TargetListFrame from target_simulator.gui.connection_panel import ConnectionPanel from target_simulator.gui.status_bar import StatusBar from target_simulator.core.communicator_interface import CommunicatorInterface from target_simulator.core.serial_communicator import SerialCommunicator from target_simulator.core.tftp_communicator import TFTPCommunicator from target_simulator.core.simulation_engine import SimulationEngine from target_simulator.core.models import Scenario, Target from target_simulator.utils.logger import get_logger, shutdown_logging_system from target_simulator.utils.config_manager import ConfigManager from target_simulator.gui.sfp_debug_window import SfpDebugWindow from target_simulator.gui.logger_panel import LoggerPanel from target_simulator.core.sfp_communicator import SFPCommunicator from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.gui.analysis_window import AnalysisWindow from target_simulator.core import command_builder from target_simulator.analysis.simulation_archive import SimulationArchive from target_simulator.communication.communicator_manager import CommunicatorManager from target_simulator.simulation.simulation_controller import SimulationController # --- Import Version Info FOR THE WRAPPER ITSELF --- try: # Use absolute import based on package name from target_simulator import _version as wrapper_version WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})" WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}" except ImportError: # This might happen if you run the wrapper directly from source # without generating its _version.py first (if you use that approach for the wrapper itself) WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)" WRAPPER_BUILD_INFO = "Wrapper build time unknown" # --- End Import Version Info --- # --- Constants for Version Generation --- DEFAULT_VERSION = "0.0.0+unknown" DEFAULT_COMMIT = "Unknown" DEFAULT_BRANCH = "Unknown" # --- End Constants --- GUI_QUEUE_POLL_INTERVAL_MS = 100 GUI_REFRESH_RATE_MS = 40 class MainView(tk.Tk): """The main application window.""" def __init__(self): super().__init__() self.logger = get_logger(__name__) self.config_manager = ConfigManager() self.current_archive: Optional[SimulationArchive] = None # --- Load Settings --- settings = self.config_manager.get_general_settings() self.scan_limit = settings.get("scan_limit", 60) self.max_range = settings.get("max_range", 100) self.connection_config = self.config_manager.get_connection_settings() # Defer establishing SFP receive connection until simulation start self.defer_sfp_connection = True # --- Initialize the data hub and analyzer --- self.simulation_hub = SimulationStateHub() self.performance_analyzer = PerformanceAnalyzer(self.simulation_hub) # Communicator manager handles communicator lifecycle self.communicator_manager = CommunicatorManager( simulation_hub=self.simulation_hub, logger=self.logger, defer_sfp_connection=True ) # Apply loaded connection settings to the manager try: self.communicator_manager.set_config(self.connection_config) except Exception: pass # Simulation controller encapsulates start/stop/reset orchestration try: self.simulation_controller = SimulationController( communicator_manager=self.communicator_manager, simulation_hub=self.simulation_hub, config_manager=self.config_manager, logger=self.logger, ) except Exception: self.simulation_controller = None # --- Core Logic Handlers --- self.target_communicator: Optional[CommunicatorInterface] = None self.lru_communicator: Optional[CommunicatorInterface] = None self.scenario = Scenario() self.current_scenario_name: Optional[str] = None self.sfp_debug_window: Optional[SfpDebugWindow] = None self.analysis_window: Optional[AnalysisWindow] = None # --- Simulation Engine --- self.simulation_engine: Optional[SimulationEngine] = None self.is_simulation_running = tk.BooleanVar(value=False) self.time_multiplier = 1.0 self.update_time = tk.DoubleVar(value=1.0) # Simulation progress tracking self.total_sim_time = 0.0 self.sim_elapsed_time = 0.0 self.sim_slider_var = tk.DoubleVar(value=0.0) self._slider_is_dragging = False # --- Window and UI Setup --- self.title(f"Radar Target Simulator") self.geometry(settings.get("geometry", "1200x900")) self.minsize(1024, 768) self._create_menubar() self._create_main_layout() self._create_statusbar() # Id for scheduled status clear; used by show_status_message self._status_after_id = None # --- Post-UI Initialization --- self._initialize_communicators() self._load_scenarios_into_ui() last_scenario = settings.get("last_selected_scenario") if last_scenario and last_scenario in self.config_manager.get_scenario_names(): self._on_load_scenario(last_scenario) self._update_window_title() self.protocol("WM_DELETE_WINDOW", self._on_closing) self.logger.info("MainView initialized successfully.") # Start the new rendering loop self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop) # Schedule periodic rate status updates (shows events/sec for real inputs and PPI updates) try: # Start after one second to allow initial state to settle self.after(1000, self._update_rate_status) except Exception: self.logger.exception("Failed to schedule rate status updater") def _create_main_layout(self): v_pane = ttk.PanedWindow(self, orient=tk.VERTICAL) v_pane.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) self.h_pane = ttk.PanedWindow(v_pane, orient=tk.HORIZONTAL) v_pane.add(self.h_pane, weight=4) # --- Right Pane (connection panel + PPI) --- right_container = ttk.Frame(self.h_pane) self.h_pane.add(right_container, weight=2) # Connection panel sits above the PPI on the right side and shows # current connection parameters and a centralized Connect/Disconnect button. try: self.connection_panel = ConnectionPanel(right_container, initial_config=self.connection_config) self.connection_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) # Wire handlers back to MainView try: self.connection_panel.set_connect_handler(self._on_connect_button) self.connection_panel.set_open_settings_handler(self._open_settings) except Exception: pass except Exception: # Fallback to inline panel if the new component fails for any reason conn_panel = ttk.LabelFrame(right_container, text="Connection") conn_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) self.conn_type_var = tk.StringVar( value=self.connection_config.get("target", {}).get("type", "-") ) self.conn_info_var = tk.StringVar( value=self._format_connection_summary( self.connection_config.get("target", {}) ) ) ttk.Label(conn_panel, text="Type:").pack(side=tk.LEFT, padx=(6, 2)) ttk.Label(conn_panel, textvariable=self.conn_type_var, width=10).pack(side=tk.LEFT) ttk.Label(conn_panel, textvariable=self.conn_info_var).pack(side=tk.LEFT, padx=(8, 4)) self.connect_button = ttk.Button(conn_panel, text="Connect", command=self._on_connect_button) self.connect_button.pack(side=tk.RIGHT, padx=(4, 6)) self.conn_settings_button = ttk.Button(conn_panel, text="Settings...", command=self._open_settings) self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0)) # Now the PPI widget below the connection panel self.ppi_widget = PPIDisplay( right_container, max_range_nm=self.max_range, scan_limit_deg=self.scan_limit ) self.ppi_widget.pack( side=tk.TOP, fill=tk.BOTH, expand=True, padx=2, pady=(2, 5) ) # Non-blocking simulation notice area placed directly under the PPI. # This shows informational messages like 'Simulation Finished' without # popping up a modal dialog. The user can dismiss the notice. try: self.simulation_notice_var = tk.StringVar(value="") notice_frame = ttk.Frame(right_container) notice_frame.pack(side=tk.TOP, fill=tk.X, padx=2, pady=(0, 5)) # Use a plain tk.Label so we can set a background color reliably # across themes to make the notice visible without being modal. self.simulation_notice_label = tk.Label( notice_frame, textvariable=self.simulation_notice_var, bg="#fff3cd", fg="#6a4b00", anchor="w", relief=tk.SOLID, bd=1, padx=6, pady=2, ) self.simulation_notice_label.pack(side=tk.LEFT, fill=tk.X, expand=True) self._dismiss_simulation_notice_btn = ttk.Button( notice_frame, text="Dismiss", command=lambda: self.simulation_notice_var.set(""), ) self._dismiss_simulation_notice_btn.pack(side=tk.RIGHT, padx=(6, 0)) except Exception: # Keep UI robust: if any of this fails, continue without the notice. self.simulation_notice_var = None # Reflect initial connection state (likely disconnected) try: if hasattr(self.ppi_widget, "update_connect_state"): self.ppi_widget.update_connect_state(False) except Exception: pass # --- Left Pane --- left_pane_container = ttk.Frame(self.h_pane) self.h_pane.add(left_pane_container, weight=1) left_notebook = ttk.Notebook(left_pane_container) left_notebook.pack(fill=tk.BOTH, expand=True) # --- TAB 1: SCENARIO CONFIG --- scenario_tab = ttk.Frame(left_notebook) left_notebook.add(scenario_tab, text="Scenario Config") self.scenario_controls = ScenarioControlsFrame( scenario_tab, main_view=self, load_scenario_command=self._on_load_scenario, save_as_command=self._on_save_scenario_as, delete_command=self._on_delete_scenario, new_scenario_command=self._on_new_scenario, ) self.scenario_controls.pack(fill=tk.X, expand=False, padx=5, pady=(5, 5)) self.target_list = TargetListFrame( scenario_tab, targets_changed_callback=self._on_targets_changed ) self.target_list.pack(fill=tk.BOTH, expand=True, padx=5) # --- TAB 2: SIMULATION --- simulation_tab = ttk.Frame(left_notebook) left_notebook.add(simulation_tab, text="Simulation") sim_scenario_frame = ttk.LabelFrame(simulation_tab, text="Scenario Control") sim_scenario_frame.pack(fill=tk.X, padx=5, pady=5, anchor="n") ttk.Label(sim_scenario_frame, text="Scenario:").pack( side=tk.LEFT, padx=(5, 5), pady=5 ) self.sim_scenario_combobox = ttk.Combobox( sim_scenario_frame, textvariable=self.scenario_controls.current_scenario, # Share the variable state="readonly", ) self.sim_scenario_combobox.pack(side=tk.LEFT, expand=True, fill=tk.X, pady=5) self.sim_scenario_combobox.bind( "<>", lambda event: self._on_load_scenario(self.sim_scenario_combobox.get()), ) # Extracted simulation controls into SimulationControls component try: from target_simulator.gui.simulation_controls import SimulationControls self.simulation_controls = SimulationControls(simulation_tab, self) self.simulation_controls.pack(fill=tk.X, padx=5, pady=10, anchor="n") # Preserve attribute names used throughout MainView for backward compatibility self.start_button = self.simulation_controls.start_button self.stop_button = self.simulation_controls.stop_button self.multiplier_combo = self.simulation_controls.multiplier_combo self.time_multiplier_var = self.simulation_controls.time_multiplier_var self.update_time_entry = self.simulation_controls.update_time_entry self.update_time = self.simulation_controls.update_time self.reset_button = self.simulation_controls.reset_button self.reset_radar_button = self.simulation_controls.reset_radar_button self.sim_slider = self.simulation_controls.sim_slider self.sim_elapsed_label = self.simulation_controls.sim_elapsed_label self.sim_total_label = self.simulation_controls.sim_total_label except Exception: # If the extracted component fails, fall back to original inline layout engine_frame = ttk.LabelFrame(simulation_tab, text="Live Simulation Engine") engine_frame.pack(fill=tk.X, padx=5, pady=10, anchor="n") # Use grid within engine_frame for a tidy multi-row layout that # doesn't force the window to expand horizontally and keeps the PPI # area visible. Configure columns so the middle spacer expands. for i in range(10): engine_frame.grid_columnconfigure(i, weight=0) # Give the spacer column (3) and the main left column (0) flexible weight engine_frame.grid_columnconfigure(0, weight=0) engine_frame.grid_columnconfigure(3, weight=1) self.start_button = ttk.Button( engine_frame, text="Start Live", command=self._on_start_simulation ) self.start_button.grid(row=0, column=0, sticky="w", padx=5, pady=5) self.stop_button = ttk.Button( engine_frame, text="Stop Live", command=self._on_stop_simulation, state=tk.DISABLED, ) self.stop_button.grid(row=0, column=1, sticky="w", padx=5, pady=5) spacer = ttk.Frame(engine_frame) spacer.grid(row=0, column=3, sticky="ew") # Display fixed speed indicator (1x). The real system runs at # 1x and the speed should not be changed at runtime, so show a # read-only label instead of an editable combobox. ttk.Label(engine_frame, text="Speed:").grid( row=0, column=4, sticky="e", padx=(10, 2), pady=5 ) self.time_multiplier_var = tk.StringVar(value="1x") # Keep attribute name `multiplier_combo` for backward # compatibility with other code that references it, but expose # a non-interactive label instead of a Combobox. self.multiplier_combo = ttk.Label(engine_frame, text="1x") self.multiplier_combo.grid(row=0, column=5, sticky="w", padx=(0, 5), pady=5) ttk.Label(engine_frame, text="Update Time (s):").grid( row=0, column=6, sticky="e", padx=(10, 2), pady=5 ) self.update_time_entry = ttk.Entry( engine_frame, textvariable=self.update_time, width=5 ) self.update_time_entry.grid(row=0, column=7, sticky="w", padx=(0, 5), pady=5) self.reset_button = ttk.Button( engine_frame, text="Reset State", command=self._on_reset_simulation ) self.reset_button.grid(row=0, column=8, sticky="e", padx=5, pady=5) self.reset_radar_button = ttk.Button( engine_frame, text="Reset Radar", command=self._reset_radar_state ) self.reset_radar_button.grid(row=0, column=9, sticky="e", padx=5, pady=5) progress_frame = ttk.Frame(engine_frame) progress_frame.grid(row=1, column=0, columnspan=10, sticky="ew", padx=5, pady=(6, 2)) self.sim_slider = ttk.Scale( progress_frame, orient=tk.HORIZONTAL, variable=self.sim_slider_var, from_=0.0, to=1.0, command=lambda v: None, ) progress_frame.grid_columnconfigure(0, weight=1) progress_frame.grid_columnconfigure(1, weight=0) self.sim_slider.grid(row=0, column=0, sticky="ew", padx=(4, 8)) try: self.sim_slider.bind( "", lambda e: setattr(self, "_slider_is_dragging", True) ) self.sim_slider.bind( "", lambda e: ( setattr(self, "_slider_is_dragging", False), self._on_seek(), ), ) except Exception: pass labels_frame = ttk.Frame(progress_frame) labels_frame.grid(row=0, column=1, sticky="e", padx=(4, 4)) self.sim_elapsed_label = ttk.Label(labels_frame, text="0.0s", width=8, anchor=tk.E) self.sim_elapsed_label.grid(row=0, column=0) slash_label = ttk.Label(labels_frame, text="/") slash_label.grid(row=0, column=1, padx=(2, 2)) self.sim_total_label = ttk.Label(labels_frame, text="0.0s", width=8, anchor=tk.W) self.sim_total_label.grid(row=0, column=2) # --- TAB 3: LRU SIMULATION --- lru_tab = ttk.Frame(left_notebook) left_notebook.add(lru_tab, text="LRU Simulation") cooling_frame = ttk.LabelFrame(lru_tab, text="Cooling Unit Status") cooling_frame.pack(fill=tk.X, padx=5, pady=5, anchor="n") ttk.Label(cooling_frame, text="Status:").pack(side=tk.LEFT, padx=5, pady=5) self.cooling_status_var = tk.StringVar(value="OK") cooling_combo = ttk.Combobox( cooling_frame, textvariable=self.cooling_status_var, values=["OK", "OVERHEATING", "FAULT"], state="readonly", ) cooling_combo.pack(side=tk.LEFT, expand=True, fill=tk.X, padx=5, pady=5) power_frame = ttk.LabelFrame(lru_tab, text="Power Supply Unit Status") power_frame.pack(fill=tk.X, padx=5, pady=5, anchor="n") ttk.Label(power_frame, text="Status:").pack(side=tk.LEFT, padx=5, pady=5) self.power_status_var = tk.StringVar(value="OK") power_combo = ttk.Combobox( power_frame, textvariable=self.power_status_var, values=["OK", "LOW_VOLTAGE", "FAULT"], state="readonly", ) power_combo.pack(side=tk.LEFT, expand=True, fill=tk.X, padx=5, pady=5) lru_action_frame = ttk.Frame(lru_tab) lru_action_frame.pack(fill=tk.X, padx=5, pady=10, anchor="n") send_lru_button = ttk.Button( lru_action_frame, text="Send LRU Status", command=self._on_send_lru_status ) send_lru_button.pack(side=tk.RIGHT) # --- TAB 4: Analysis --- analysis_tab = ttk.Frame(left_notebook) left_notebook.add(analysis_tab, text="Analysis") self._create_analysis_tab_widgets(analysis_tab) # Nuovo metodo # --- Bottom Pane (Logs) --- log_frame_container = ttk.LabelFrame(v_pane, text="Logs") v_pane.add(log_frame_container, weight=1) self.log_text_widget = scrolledtext.ScrolledText( log_frame_container, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9) ) self.log_text_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Ensure the log pane starts smaller so the rate/status bar remains visible # on lower-resolution screens. We compute the window geometry after # idle tasks have run (so sizes are realistic) and then move the sash # so the Logs pane has a small fixed minimum height (~80px) or ~18%. try: def _shrink_log_pane_once(event=None): # Run only once if getattr(self, "_log_pane_shrunk", False): return try: # Ensure geometry/layout is updated try: self.update_idletasks() except Exception: pass # Prefer the paned window's current height if available total_h = v_pane.winfo_height() or self.winfo_height() or 800 # If geometry isn't yet realized (very small), try again shortly # instead of forcing a potentially incorrect sash position that # could collapse the top pane. A small threshold avoids acting # on transient values produced during window manager setup. if total_h < 200: try: # Schedule a retry and do not mark as shrunk yet self.after(300, _shrink_log_pane_once) except Exception: pass return # Determine desired log pane height (min 60px, or ~18% of window) desired_log_h = max(60, int(total_h * 0.18)) # Limit logs to at most half the window to avoid swallowing the UI desired_log_h = min(desired_log_h, int(total_h * 0.5)) # Compute the desired top pane height so bottom gets desired_log_h desired_top_h = int(total_h - desired_log_h) # Ensure the top pane has a reasonable minimum height so it # doesn't collapse (use 120px as a safe minimum). desired_top_h = max(120, desired_top_h) # But also ensure we don't push the top larger than possible desired_top_h = min(desired_top_h, max(40, total_h - 40)) # Apply sash position (index 0 for the only sash in vertical pane) try: v_pane.sashpos(0, desired_top_h) # Only mark successful shrink once sashpos applied without # raising an exception. Some platforms may not support # sashpos until fully realized, so if sashpos raises we # leave the flag unset and allow the Configure event to # retry. setattr(self, "_log_pane_shrunk", True) except Exception: # Some platforms may not support sashpos until fully realized; # ignore and rely on later Configure event to attempt again. try: # Retry shortly instead of marking as done. self.after(300, _shrink_log_pane_once) except Exception: pass return except Exception: pass # Try shortly after init (gives Tk time to compute geometry). Use a # slightly longer delay to allow theme/layout to stabilise (some # nested widgets can delay final geometry on certain platforms). self.after(500, _shrink_log_pane_once) # Also bind to a single Configure event in case geometry wasn't ready def _on_config_once(ev): _shrink_log_pane_once() try: v_pane.unbind("", onconf_id) except Exception: pass onconf_id = v_pane.bind("", _on_config_once) except Exception: pass def _create_analysis_tab_widgets(self, parent): self.analysis_tree = ttk.Treeview( parent, columns=("datetime", "scenario", "duration"), show="headings" ) self.analysis_tree.heading("datetime", text="Date/Time") self.analysis_tree.heading("scenario", text="Scenario Name") self.analysis_tree.heading("duration", text="Duration (s)") self.analysis_tree.column("datetime", width=150) self.analysis_tree.column("scenario", width=200) self.analysis_tree.column("duration", width=80, anchor=tk.E) self.analysis_tree.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) btn_frame = ttk.Frame(parent) btn_frame.pack(fill=tk.X, padx=5, pady=5) ttk.Button( btn_frame, text="Refresh List", command=self._refresh_analysis_list ).pack(side=tk.LEFT) # Button to open the archive folder in the system file explorer ttk.Button( btn_frame, text="Open Archive Folder", command=self._open_archive_folder ).pack(side=tk.LEFT, padx=(6, 0)) ttk.Button( btn_frame, text="Analyze Selected", command=self._on_analyze_run ).pack(side=tk.RIGHT) # Popola la lista all'avvio self.after(100, self._refresh_analysis_list) def _create_menubar(self): menubar = tk.Menu(self) self.config(menu=menubar) settings_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Settings", menu=settings_menu) settings_menu.add_command(label="Connection...", command=self._open_settings) settings_menu.add_command( label="Radar Config...", command=self._open_radar_config ) debug_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Debug", menu=debug_menu) debug_menu.add_command( label="SFP Packet Inspector...", command=self._open_sfp_debug_window ) debug_menu.add_command( label="Logger Levels...", command=self._open_logger_panel ) def _create_statusbar(self): # Use the extracted StatusBar widget. Expose the same attributes # MainView previously provided so callers elsewhere continue to work. self.status_bar = StatusBar(self) # Try placing the status bar pinned to the bottom with a fixed height # so it remains visible even when other widgets request large minimum # sizes. Fall back to pack if place is not supported on some platforms # or if an error occurs. try: # The StatusBar already configures its height (24px by default). # Use place anchored to south-west so the bar stays at the bottom # and spans the full width of the window. self.status_bar.place(relx=0.0, rely=1.0, anchor="sw", relwidth=1.0, height=24) except Exception: self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) self.target_status_canvas = self.status_bar.target_status_canvas self.lru_status_canvas = self.status_bar.lru_status_canvas self.status_var = self.status_bar.status_var self.rate_status_var = getattr(self.status_bar, "rate_status_var", None) # Id used by show_status_message scheduling self._status_after_id = None def show_status_message(self, text: str, timeout_ms: int | None = 3000): """Show a status message in the main status bar. If timeout_ms is None the message is persistent until explicitly cleared or replaced. Otherwise the message will be cleared back to "Ready" after timeout_ms milliseconds. """ try: # Cancel previous scheduled clear if any try: if self._status_after_id is not None: self.after_cancel(self._status_after_id) self._status_after_id = None except Exception: pass # Set message self.status_var.set(text) # Schedule clear back to Ready if timeout provided if timeout_ms is not None: def _clear(): try: self.status_var.set("Ready") except Exception: pass try: self._status_after_id = self.after(timeout_ms, _clear) except Exception: # If scheduling fails, log and leave message as-is try: self.logger.exception("Failed to schedule status clear") except Exception: pass except Exception: # As a fallback, log the status try: self.logger.info(text) except Exception: pass def clear_status_message(self): """Clear status to the default 'Ready' and cancel any pending clears.""" try: try: if self._status_after_id is not None: self.after_cancel(self._status_after_id) except Exception: pass try: self._status_after_id = None except Exception: pass try: self.status_var.set("Ready") except Exception: pass except Exception: try: self.logger.exception("Failed to clear status message") except Exception: pass def _draw_status_indicator(self, canvas, color): canvas.delete("all") canvas.create_oval(2, 2, 14, 14, fill=color, outline="black") def _update_window_title(self): """Updates the window title based on the current scenario.""" base_title = (f"Radar Target Simulator- {WRAPPER_APP_VERSION_STRING}") if self.current_scenario_name: self.title(f"{base_title} - {self.current_scenario_name}") else: self.title(base_title) def _format_connection_summary(self, cfg: dict) -> str: """Return a short human-readable summary of the target connection config. This is used by the connection panel to show relevant info without opening the full settings dialog. """ try: t = cfg.get("type") if not t: return "-" if t == "sfp": sfp = cfg.get("sfp", {}) ip = sfp.get("ip") or sfp.get("host") or "?" # Support both single port and multiple ports (list/tuple) remote = sfp.get("port") or sfp.get("remote_port") if isinstance(remote, (list, tuple)): remote_str = ",".join(str(int(p)) for p in remote) else: try: remote_str = str(int(remote)) if remote is not None else "?" except Exception: remote_str = str(remote) local = sfp.get("local_port") if local is not None: try: local_str = str(int(local)) except Exception: local_str = str(local) return f"{ip} (remote:{remote_str} local:{local_str})" return f"{ip} (remote:{remote_str})" if t == "serial": s = cfg.get("serial", {}) port = s.get("port") or s.get("device") or "?" baud = s.get("baudrate") or s.get("baud") or "?" return f"{port} @{baud}" if t == "tftp": tftp = cfg.get("tftp", {}) host = tftp.get("host") or tftp.get("server") or "?" return f"{host}" return "-" except Exception: return "-" def _update_communicator_status(self, comm_name: str, is_connected: bool): canvas = ( self.target_status_canvas if comm_name == "Target" else self.lru_status_canvas ) color = "#2ecc40" if is_connected else "#e74c3c" self._draw_status_indicator(canvas, color) def _on_connection_state_change(self, is_connected: bool): """Callback for communicator connection state changes.""" self.logger.info( f"MainView received connection state change: Connected={is_connected}" ) self._update_communicator_status("Target", is_connected) # Update the PPI's internal state (visual only) and the debug window try: if hasattr(self.ppi_widget, "update_connect_state"): self.ppi_widget.update_connect_state(is_connected) except Exception: pass # Update centralized connect button text and status indicator try: # Prefer the new ConnectionPanel if present if hasattr(self, "connection_panel") and self.connection_panel: try: self.connection_panel.update_toggle_state(is_connected) except Exception: pass elif hasattr(self, "connect_button") and self.connect_button: self.connect_button.config( text="Disconnect" if is_connected else "Connect" ) except Exception: pass # Also update the debug window if it's open if self.sfp_debug_window and self.sfp_debug_window.winfo_exists(): if hasattr(self.sfp_debug_window, "update_toggle_state"): self.sfp_debug_window.update_toggle_state(is_connected) def _initialize_communicators(self): # Delegate communicator lifecycle to CommunicatorManager try: # Ensure manager knows the latest config try: self.communicator_manager.set_config(self.connection_config) except Exception: pass t_comm, t_connected, l_comm, l_connected = self.communicator_manager.initialize_communicators() self.target_communicator = t_comm self.lru_communicator = l_comm self._update_communicator_status("Target", bool(t_connected)) self._update_communicator_status("LRU", bool(l_connected)) # Ensure the manager notifies through MainView callback when state changes try: self.communicator_manager.add_connection_state_callback(self._on_connection_state_change) except Exception: pass except Exception: # Fallback to original inline initialization in case of problems try: if self.target_communicator and self.target_communicator.is_open: self.target_communicator.disconnect() if self.lru_communicator and self.lru_communicator.is_open: self.lru_communicator.disconnect() target_cfg = self.connection_config.get("target", {}) lru_cfg = self.connection_config.get("lru", {}) self.target_communicator, target_connected = self._setup_communicator(target_cfg, "Target") self._update_communicator_status("Target", target_connected) self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU") self._update_communicator_status("LRU", lru_connected) except Exception: self.logger.exception("Fallback communicator initialization failed") def _setup_communicator( self, config: dict, name: str ) -> tuple[Optional[CommunicatorInterface], bool]: comm_type = config.get("type") self.logger.info(f"Initializing {name} communicator of type: {comm_type}") communicator = None config_data = None if comm_type == "serial": communicator = SerialCommunicator() config_data = config.get("serial", {}) elif comm_type == "tftp": communicator = TFTPCommunicator() config_data = config.get("tftp", {}) elif comm_type == "sfp": # --- MODIFICATION: Do not pass update_queue --- communicator = SFPCommunicator(simulation_hub=self.simulation_hub) communicator.add_connection_state_callback(self._on_connection_state_change) config_data = config.get("sfp", {}) if self.defer_sfp_connection: # Return the communicator object but indicate it's not yet connected return communicator, False if communicator and config_data: if communicator.connect(config_data): return communicator, True self.logger.warning(f"Failed to initialize or connect {name} communicator.") return None, False def update_connection_settings(self, new_config: Dict[str, Any]): self.logger.info(f"Updating connection settings: {new_config}") self.connection_config = new_config self.config_manager.save_connection_settings(new_config) # Refresh connection summary in the connection panel try: if hasattr(self, "conn_type_var"): self.conn_type_var.set( self.connection_config.get("target", {}).get("type", "-") ) if hasattr(self, "conn_info_var"): self.conn_info_var.set( self._format_connection_summary( self.connection_config.get("target", {}) ) ) except Exception: pass # Update the connection panel UI if present try: if hasattr(self, "connection_panel") and self.connection_panel: try: self.connection_panel.update_summary(self.connection_config) except Exception: pass except Exception: pass # Update the communicator manager and reinitialize try: self.communicator_manager.set_config(new_config) self._initialize_communicators() except Exception: # Fallback: call the previous initialization path try: self._initialize_communicators() except Exception: pass def _open_settings(self): self.logger.info("Opening connection settings window.") ConnectionSettingsWindow(self, self.config_manager, self.connection_config) def _on_connect_button(self): self.logger.info("Connection toggle requested by user via PPI button.") try: # If communicator exists and is open, disconnect. if self.target_communicator and self.target_communicator.is_open: self.logger.info("Requesting disconnect.") self.target_communicator.disconnect() return # Otherwise, attempt to connect. self.logger.info("Requesting connect.") # Ensure we have a communicator instance. if not self.target_communicator: self.logger.info( "No target communicator instance. Initializing communicators." ) self._initialize_communicators() # If it's still null after init, we can't proceed. if not self.target_communicator: self.logger.error("Failed to create target communicator on demand.") messagebox.showerror("Error", "Could not create communicator.") return # Now, connect using the existing or new instance. cfg = self.connection_config.get("target", {}) sfp_cfg = cfg.get("sfp") if cfg.get("type") == "sfp" and sfp_cfg: if not self.target_communicator.connect(sfp_cfg): self.logger.error("Failed to connect target communicator.") messagebox.showerror( "Connection Failed", "Could not connect to target. Check settings and logs.", ) else: self.logger.warning( "Connection attempt without valid SFP config. Running full re-initialization." ) self._initialize_communicators() except Exception: self.logger.exception("Unhandled exception in _on_connect_button") def _reset_radar_state(self) -> bool: # Delegates to SimulationController if available try: if hasattr(self, "simulation_controller") and self.simulation_controller: return self.simulation_controller.reset_radar_state(self) except Exception: # If controller fails, fall back to previous inline behavior try: self.logger.exception("SimulationController reset failed; falling back to inline reset.") except Exception: pass # Fallback: call controller.reset_radar_state via attribute to reuse logic try: if hasattr(self, "simulation_controller") and self.simulation_controller: return self.simulation_controller.reset_radar_state(self) except Exception: pass # If no controller available, return False conservatively try: messagebox.showerror("Reset Error", "Unable to perform radar reset (controller unavailable).") except Exception: pass return False def _on_start_simulation(self): # Prevent duplicate start attempts: use a simple re-entrancy guard. if getattr(self, "_start_in_progress_main", False): try: self.logger.info("Start already in progress; ignoring duplicate request.") except Exception: pass return self._start_in_progress_main = True try: # Show a persistent 'starting' message while the controller/engine prepares try: self.show_status_message("Starting simulation...", timeout_ms=None) except Exception: pass # Delegate to SimulationController if available try: if hasattr(self, "simulation_controller") and self.simulation_controller: return self.simulation_controller.start_simulation(self) except Exception: try: self.logger.exception("SimulationController start failed; falling back to inline start.") except Exception: pass # If controller is not present or failed, attempt no-op fallback try: messagebox.showerror("Start Error", "Unable to start simulation (controller unavailable).") # If start failed, clear the starting message so the status bar isn't stuck try: self.clear_status_message() except Exception: pass except Exception: pass finally: # Clear the guard so future explicit retries are allowed. The # actual button state will be managed by _update_button_states # based on `is_simulation_running`. try: self._start_in_progress_main = False except Exception: pass def _on_stop_simulation(self): try: if hasattr(self, "simulation_controller") and self.simulation_controller: return self.simulation_controller.stop_simulation(self) except Exception: try: self.logger.exception("SimulationController stop failed; falling back to inline stop.") except Exception: pass try: messagebox.showerror("Stop Error", "Unable to stop simulation (controller unavailable).") except Exception: pass def _on_simulation_finished(self): try: if hasattr(self, "simulation_controller") and self.simulation_controller: try: result = self.simulation_controller.on_simulation_finished(self) except Exception: result = None else: result = None except Exception: try: self.logger.exception("SimulationController on_finished failed; falling back to inline finished handler.") except Exception: pass result = None # Ensure UI reflects finished state regardless of controller handling try: # Mark simulation as not running try: self.is_simulation_running.set(False) except Exception: pass # Show a short transient message informing the user try: self.show_status_message("Simulation finished", timeout_ms=5000) except Exception: pass except Exception: try: self.logger.error("Unable to handle simulation finished (controller unavailable).") except Exception: pass return result def _on_reset_simulation(self): self.logger.info("Resetting scenario to initial state.") # Show a brief 'starting' message during the reset phase so users # see that a reset/action is in progress (matches requested UX). try: self.show_status_message("Starting simulation...", timeout_ms=1500) except Exception: pass if self.is_simulation_running.get(): self._on_stop_simulation() self.scenario.reset_simulation() self._update_all_views() # After reset complete, clear or set Ready so the status bar does not # remain stuck on 'Starting simulation...' try: self.show_status_message("Ready", timeout_ms=1500) except Exception: try: self.clear_status_message() except Exception: pass def _process_gui_queue(self): """ Processes a batch of updates from the GUI queue to keep the UI responsive without getting stuck in an infinite loop. """ MAX_UPDATES_PER_CYCLE = 100 # Process up to 100 messages per call try: for _ in range(MAX_UPDATES_PER_CYCLE): try: update = self.gui_update_queue.get_nowait() if update == "SIMULATION_FINISHED": self.logger.info("Simulation finished signal received.") self._on_simulation_finished() try: self.sim_elapsed_time = self.total_sim_time self.sim_slider_var.set( 1.0 if self.total_sim_time > 0 else 0.0 ) except Exception: pass self._update_simulation_progress_display() elif isinstance(update, list): if len(update) == 0: # Hub refresh notification (real data arrived). display_data = self._build_display_data_from_hub() self.ppi_widget.update_real_targets( display_data.get("real", []) ) try: if ( hasattr(self, "simulation_hub") and self.simulation_hub is not None and hasattr( self.ppi_widget, "update_antenna_azimuth" ) ): if hasattr( self.simulation_hub, "get_antenna_azimuth" ): az_deg, az_ts = ( self.simulation_hub.get_antenna_azimuth() ) else: az_deg, az_ts = ( self.simulation_hub.get_platform_azimuth() ) if az_deg is not None: self.ppi_widget.update_antenna_azimuth( az_deg, timestamp=az_ts ) except Exception: self.logger.debug( "Failed to propagate antenna azimuth to PPI", exc_info=True, ) else: # This is an update with simulated targets from the engine. simulated_targets: List[Target] = update self.target_list.update_target_list(simulated_targets) self.ppi_widget.update_simulated_targets(simulated_targets) # Update simulation progress bar try: if ( self.simulation_engine and self.simulation_engine.scenario ): times = [ getattr(t, "_sim_time_s", 0.0) for t in self.simulation_engine.scenario.get_all_targets() ] self.sim_elapsed_time = max(times) if times else 0.0 if self.total_sim_time > 0 and not getattr( self, "_slider_is_dragging", False ): progress_frac = min( 1.0, max( 0.0, self.sim_elapsed_time / self.total_sim_time, ), ) self.sim_slider_var.set(progress_frac) self._update_simulation_progress_display() except Exception: self.logger.debug( "Progress UI update failed", exc_info=True ) except Empty: # Queue is empty, we can stop processing for this cycle. break finally: # Always reschedule the next poll. try: self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_gui_queue) except Exception: # This can happen on shutdown, just ignore. pass def _update_button_states(self): is_running = self.is_simulation_running.get() # Determine if analysis data exists. Ensure boolean type. has_data_to_analyze = ( bool(self.simulation_hub.get_all_target_ids()) if self.simulation_hub else False ) # Enable Analysis only when simulation is NOT running and there is data # to analyze (i.e., after a completed run or after receiving real data). analysis_state = ( tk.NORMAL if (not is_running and has_data_to_analyze) else tk.DISABLED ) # If a start is currently in progress (either via the controller # path or via the SimulationControls immediate handler), keep the # Start button disabled to avoid duplicate starts even though # `is_simulation_running` may not yet be True. start_in_progress_flag = False try: if getattr(self, "_start_in_progress_main", False): start_in_progress_flag = True except Exception: start_in_progress_flag = start_in_progress_flag try: sc = getattr(self, "simulation_controls", None) if sc and getattr(sc, "_start_in_progress", False): start_in_progress_flag = True except Exception: pass state = tk.DISABLED if (is_running or start_in_progress_flag) else tk.NORMAL self.reset_radar_button.config(state=state) self.start_button.config(state=tk.DISABLED if is_running else tk.NORMAL) self.stop_button.config(state=tk.NORMAL if is_running else tk.DISABLED) # Analysis tab has its own controls; nothing to update here. # multiplier_combo may be a non-interactive label after the # speed-combobox removal. Attempt to set state if supported, # otherwise ignore errors so UI updates remain robust. try: if hasattr(self, "multiplier_combo") and self.multiplier_combo is not None: try: self.multiplier_combo.config( state="readonly" if not is_running else tk.DISABLED ) except Exception: # Some widget types (e.g., ttk.Label) don't accept # a 'state' option; ignore in that case. pass except Exception: pass self.scenario_controls.new_button.config(state=state) self.scenario_controls.save_button.config(state=state) self.scenario_controls.save_as_button.config(state=state) self.scenario_controls.delete_button.config(state=state) self.scenario_controls.scenario_combobox.config( state="readonly" if not is_running else tk.DISABLED ) self.target_list.add_button.config(state=state) self.target_list.remove_button.config(state=state) self.target_list.edit_button.config(state=state) self.target_list.tree.config(selectmode="browse" if not is_running else "none") def _on_time_multiplier_changed(self, event=None): """Handles changes to the time multiplier selection.""" try: multiplier_str = self.time_multiplier_var.get().replace("x", "") self.time_multiplier = float(multiplier_str) if self.simulation_engine and self.simulation_engine.is_running(): self.simulation_engine.set_time_multiplier(self.time_multiplier) except ValueError: self.logger.error( f"Invalid time multiplier value: {self.time_multiplier_var.get()}" ) self.time_multiplier = 1.0 def _update_simulation_progress_display(self): """Updates the elapsed/total time label from internal state.""" try: elapsed = self.sim_elapsed_time total = self.total_sim_time # Update separate labels for elapsed and total time try: self.sim_elapsed_label.config(text=f"{elapsed:.1f}s") self.sim_total_label.config(text=f"{total:.1f}s") except Exception: # Fallback for older layouts if hasattr(self, "sim_time_label"): self.sim_time_label.config(text=f"{elapsed:.1f}s / {total:.1f}s") except Exception: pass def _update_rate_status(self): """ Periodically update the small status label with recent rates: - real input events/sec (from SimulationStateHub) - PPI real-update calls/sec (from PPIDisplay) """ try: real_rate = 0.0 packet_rate = 0.0 ppi_rate = 0.0 try: if self.simulation_hub and hasattr( self.simulation_hub, "get_real_rate" ): real_rate = float(self.simulation_hub.get_real_rate(1.0)) # Prefer packet rate if available (packets/sec vs events/sec) if self.simulation_hub and hasattr( self.simulation_hub, "get_packet_rate" ): packet_rate = float(self.simulation_hub.get_packet_rate(1.0)) except Exception: real_rate = 0.0 packet_rate = 0.0 try: if ( hasattr(self, "ppi_widget") and self.ppi_widget and hasattr(self.ppi_widget, "get_real_update_rate") ): ppi_rate = float(self.ppi_widget.get_real_update_rate(1.0)) except Exception: ppi_rate = 0.0 if getattr(self, "rate_status_var", None) is not None: try: # Show both packet-level rate and per-target event rate so # users can distinguish network throughput (packets/sec) # from per-target updates (events/sec). if packet_rate > 0.0: self.rate_status_var.set( f"pkt in: {packet_rate:.1f} pkt/s | ev in: {real_rate:.1f} ev/s | ppi upd: {ppi_rate:.1f} upd/s" ) else: self.rate_status_var.set( f"real in: {real_rate:.1f} ev/s | ppi upd: {ppi_rate:.1f} upd/s" ) except Exception: pass except Exception: # Swallow any unexpected error — status bar shouldn't raise try: self.logger.exception("Error while updating rate status") except Exception: pass finally: # Reschedule try: self.after(1000, self._update_rate_status) except Exception: pass def _on_seek(self): """Called when the user releases the progress slider to seek.""" try: if not self.simulation_engine or not self.simulation_engine.scenario: return frac = float(self.sim_slider_var.get()) # Compute the new time and clamp new_time = max(0.0, min(self.total_sim_time, frac * self.total_sim_time)) # Ask engine to seek to this new time try: self.simulation_engine.set_simulation_time(new_time) # Immediately update internal elapsed time and label self.sim_elapsed_time = new_time self._update_simulation_progress_display() except Exception: self.logger.exception("Failed to seek simulation time.") except Exception: self.logger.exception("Error in _on_seek handler.") def _on_targets_changed(self, targets: List[Target]): """Callback executed when the target list is modified by the user.""" # 1. Update the internal scenario object self.scenario.targets = {t.target_id: t for t in targets} # 2. Update the PPI display with the latest target list # Pass an explicit dict so PPIDisplay treats this as a simulated-only # update and does not accidentally clear real (server) targets. self.ppi_widget.update_simulated_targets(targets) # 3. Automatically save the changes to the current scenario file if self.current_scenario_name: self.logger.info( f"Targets changed for scenario '{self.current_scenario_name}'. Saving changes." ) self.config_manager.save_scenario( self.current_scenario_name, self.scenario.to_dict() ) else: self.logger.warning( "Targets changed, but no scenario is currently loaded. Changes are not saved." ) def _update_all_views(self, targets_to_display: Optional[List[Target]] = None): self._update_window_title() if targets_to_display is None: targets_to_display = self.scenario.get_all_targets() self.target_list.update_target_list(targets_to_display) # Use an explicit dict to indicate these are simulated scenario targets. self.ppi_widget.update_simulated_targets(targets_to_display) def _load_scenarios_into_ui(self): scenario_names = self.config_manager.get_scenario_names() self.scenario_controls.update_scenario_list( scenario_names, self.current_scenario_name ) self.sim_scenario_combobox["values"] = scenario_names def _on_load_scenario(self, scenario_name: str): if self.is_simulation_running.get(): self._on_stop_simulation() self.logger.info(f"Loading scenario: {scenario_name}") scenario_data = self.config_manager.get_scenario(scenario_name) if scenario_data: try: # Clear any previously simulated-only data so the new scenario's # simulated targets are shown cleanly. We intentionally preserve # 'real' target data if present (do not call hub.reset()). try: if hasattr(self, "simulation_hub") and self.simulation_hub: self.simulation_hub.clear_simulated_data() except Exception: pass # Also clear PPI trails and simulated target visual state before # loading the new scenario to avoid leaving stale markers. try: if hasattr(self, "ppi_widget") and self.ppi_widget: self.ppi_widget.clear_trails() self.ppi_widget.update_simulated_targets([]) except Exception: pass self.scenario = Scenario.from_dict(scenario_data) self.current_scenario_name = scenario_name # Update target list UI with loaded scenario's targets self.target_list.update_target_list(self.scenario.get_all_targets()) self._update_all_views() self.sim_scenario_combobox.set(scenario_name) # If not connected to a real target, show a trajectory preview # for the loaded scenario so the user can see initial positions # and planned paths before starting the simulation. try: connected = bool( self.target_communicator and getattr(self.target_communicator, "is_open", False) ) except Exception: connected = False if not connected and hasattr(self, "ppi_widget") and self.ppi_widget: try: # Draw per-target dashed trajectories as preview self.ppi_widget.draw_scenario_preview(self.scenario) except Exception: pass except Exception as e: self.logger.error( f"Failed to parse scenario data for '{scenario_name}': {e}", exc_info=True, ) messagebox.showerror( "Load Error", f"Could not load scenario '{scenario_name}'.\n{e}" ) else: self.logger.warning( f"Attempted to load a non-existent scenario: {scenario_name}" ) def _on_save_scenario(self, scenario_name: str): self.logger.info(f"Saving scenario: {scenario_name}") self.scenario.targets = {t.target_id: t for t in self.target_list.get_targets()} self.config_manager.save_scenario(scenario_name, self.scenario.to_dict()) self.current_scenario_name = scenario_name self._load_scenarios_into_ui() self._update_window_title() messagebox.showinfo( "Success", f"Scenario '{scenario_name}' saved successfully.", parent=self ) def _on_save_scenario_as(self, scenario_name: str): self.scenario.targets = {t.target_id: t for t in self.target_list.get_targets()} self._on_save_scenario(scenario_name) def _on_new_scenario(self, scenario_name: str): scenario_names = self.config_manager.get_scenario_names() if scenario_name in scenario_names: messagebox.showinfo( "Duplicate Scenario", f"Scenario '{scenario_name}' already exists. Loading it instead.", parent=self, ) self._on_load_scenario(scenario_name) return self.logger.info(f"Creating new scenario: {scenario_name}") self.scenario = Scenario(name=scenario_name) self.current_scenario_name = scenario_name # Do NOT persist the new empty scenario automatically. Persisting should # happen explicitly when the user presses Save. This avoids accidental # overwrites of the on-disk `scenarios.json` when a user creates a # placeholder and then quits or when tests manipulate in-memory state. # # Update UI so the new scenario appears selected in the combobox even # though it's not yet saved to disk: build a temporary list of names # that includes the new name and update the controls. try: # Update main views (clears target list display) self._update_all_views() # Build combobox values including the new (unsaved) scenario name names = list(self.config_manager.get_scenario_names()) if scenario_name not in names: names = names + [scenario_name] # Update scenario list UI and select the new scenario try: self.scenario_controls.update_scenario_list( names, select_scenario=scenario_name ) except Exception: # Fallback for older UI: directly set combobox values try: self.sim_scenario_combobox["values"] = names except Exception: pass try: self.scenario_controls.current_scenario.set(scenario_name) self.sim_scenario_combobox.set(scenario_name) except Exception: pass except Exception: # Never allow UI update failures to raise here self.logger.exception("Failed to create new unsaved scenario in UI") def _on_delete_scenario(self, scenario_name: str): self.logger.info(f"Deleting scenario: {scenario_name}") self.config_manager.delete_scenario(scenario_name) if self.current_scenario_name == scenario_name: self.current_scenario_name = None self.scenario = Scenario() self._update_all_views() self._load_scenarios_into_ui() def _on_send_lru_status(self): # Implementation from your code pass def _on_send_initial_state(self): """Sends the full scenario using tgtinit for initial setup.""" if self.target_communicator and self.target_communicator.is_open: if self.scenario and self.scenario.get_all_targets(): # reset_simulation() assicura che i dati inviati siano quelli iniziali self.scenario.reset_simulation() self.target_communicator.send_scenario(self.scenario) messagebox.showinfo( "Scenario Sent", "Initial state of the scenario sent to the radar.", parent=self, ) else: messagebox.showinfo( "Empty Scenario", "Cannot send an empty scenario.", parent=self ) else: messagebox.showerror( "Not Connected", "Target communicator is not connected. Please check settings.", parent=self, ) def _on_reset_targets(self): # Implementation from your code pass def _open_radar_config(self): self.logger.info("Opening radar config window.") dialog = RadarConfigWindow( self, current_scan_limit=self.scan_limit, current_max_range=self.max_range ) # wait_window è già gestito all'interno di RadarConfigWindow, # quindi il codice prosegue solo dopo la sua chiusura. if dialog.scan_limit is not None and dialog.max_range is not None: # Check if values have actually changed to avoid unnecessary redraws if ( self.scan_limit != dialog.scan_limit or self.max_range != dialog.max_range ): self.logger.info("Radar configuration changed. Applying new settings.") self.scan_limit = dialog.scan_limit self.max_range = dialog.max_range # --- LOGICA MODIFICATA --- # Non distruggere il widget, ma riconfiguralo. self.ppi_widget.reconfigure_radar( max_range_nm=self.max_range, scan_limit_deg=self.scan_limit ) self.logger.info(f"Scan limit set to: ±{self.scan_limit} degrees") self.logger.info(f"Max range set to: {self.max_range} NM") # Non è necessario chiamare _update_all_views() perché # reconfigure_radar forza già un ridisegno completo. else: self.logger.info( "Radar configuration confirmed, but no changes were made." ) def _on_closing(self): self.logger.info("Application shutting down.") if self.is_simulation_running.get(): self._on_stop_simulation() # Merge current runtime general settings with any existing saved # settings so we don't clobber unrelated keys (e.g., logger_panel). try: existing = self.config_manager.get_general_settings() or {} except Exception: existing = {} settings_to_save = dict(existing) settings_to_save.update( { "scan_limit": self.scan_limit, "max_range": self.max_range, "geometry": self.winfo_geometry(), "last_selected_scenario": self.current_scenario_name, } ) # Save merged general settings and connection settings separately try: self.config_manager.save_general_settings(settings_to_save) except Exception: # Fallback: try to write the minimal dict if merge/save fails try: self.config_manager.save_general_settings( { "scan_limit": self.scan_limit, "max_range": self.max_range, "geometry": self.winfo_geometry(), "last_selected_scenario": self.current_scenario_name, } ) except Exception: pass try: self.config_manager.save_connection_settings(self.connection_config) except Exception: pass if self.target_communicator: if hasattr(self.target_communicator, "remove_connection_state_callback"): self.target_communicator.remove_connection_state_callback( self._on_connection_state_change ) if self.target_communicator.is_open: self.target_communicator.disconnect() if self.lru_communicator and self.lru_communicator.is_open: self.lru_communicator.disconnect() # Stop resource monitor thread if present try: if hasattr(self, "status_bar") and self.status_bar: try: if hasattr(self.status_bar, "stop_resource_monitor"): self.status_bar.stop_resource_monitor() except Exception: pass except Exception: pass shutdown_logging_system() self.destroy() def _open_sfp_debug_window(self): """Opens the SFP debug window, ensuring only one instance exists.""" if self.sfp_debug_window and self.sfp_debug_window.winfo_exists(): self.sfp_debug_window.lift() self.sfp_debug_window.focus_force() self.logger.info("SFP Packet Inspector window is already open.") return self.logger.info("Opening SFP Packet Inspector window...") self.sfp_debug_window = SfpDebugWindow(self) def _open_logger_panel(self): """Open the LoggerPanel to inspect/change logger levels at runtime.""" try: # Create transient dialog attached to main window LoggerPanel(self) except Exception: # Avoid crashing the UI if the panel fails to open self.logger.exception("Failed to open LoggerPanel") def _on_reset_simulation(self): self.logger.info("Resetting scenario to initial state.") if self.is_simulation_running.get(): self._on_stop_simulation() self.logger.info("Resetting simulation data hub and PPI trails.") self.simulation_hub.reset() self.ppi_widget.clear_trails() self.scenario.reset_simulation() self._update_all_views() def _build_display_data_from_hub(self) -> Dict[str, List[Target]]: # Delegate to ppi_adapter.build_display_data for consistent behavior try: return build_display_data( self.simulation_hub, scenario=getattr(self, "scenario", None), engine=getattr(self, "simulation_engine", None), ppi_widget=getattr(self, "ppi_widget", None), logger=getattr(self, "logger", None), ) except Exception: try: self.logger.exception("Failed to build display data via ppi_adapter") except Exception: pass return {"simulated": [], "real": []} def _open_analysis_window(self): """Opens the performance analysis window, ensuring only one instance exists.""" if self.analysis_window and self.analysis_window.winfo_exists(): self.analysis_window.lift() self.analysis_window.focus_force() self.logger.info("Analysis window is already open.") return self.logger.info("Opening performance analysis window...") self.analysis_window = AnalysisWindow( self, analyzer=self.performance_analyzer, hub=self.simulation_hub ) def _gui_refresh_loop(self): """ Main GUI refresh loop. Runs at a fixed rate, pulls the latest data from the hub, and updates the PPI display. """ # Check if the simulation has finished sim_was_running = self.is_simulation_running.get() sim_is_running_now = ( self.simulation_engine is not None and self.simulation_engine.is_running() ) # Transition: started -> set running status if (not sim_was_running) and sim_is_running_now: try: # Mark internal flag and show persistent running message try: self.is_simulation_running.set(True) except Exception: pass try: self.show_status_message("Simulation running", timeout_ms=None) except Exception: pass except Exception: pass # Transition: stopped if sim_was_running and not sim_is_running_now: self._on_simulation_finished() # Update PPI with the latest data from the hub display_data = self._build_display_data_from_hub() self.ppi_widget.update_simulated_targets(display_data.get("simulated", [])) self.ppi_widget.update_real_targets(display_data.get("real", [])) # Update antenna azimuth try: if self.simulation_hub and hasattr( self.simulation_hub, "get_antenna_azimuth" ): az_deg, az_ts = self.simulation_hub.get_antenna_azimuth() if az_deg is not None: self.ppi_widget.update_antenna_azimuth(az_deg, timestamp=az_ts) except Exception: pass # Update progress bar if the simulation is running if sim_is_running_now: try: if self.simulation_engine and self.simulation_engine.scenario: times = [ getattr(t, "_sim_time_s", 0.0) for t in self.simulation_engine.scenario.get_all_targets() ] self.sim_elapsed_time = max(times) if times else 0.0 if self.total_sim_time > 0 and not self._slider_is_dragging: progress = min(1.0, self.sim_elapsed_time / self.total_sim_time) self.sim_slider_var.set(progress) self._update_simulation_progress_display() except Exception: self.logger.debug("Progress UI update failed", exc_info=True) # Reschedule the next refresh cycle self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop) def _refresh_analysis_list(self): self.analysis_tree.delete(*self.analysis_tree.get_children()) archive_folder = SimulationArchive.ARCHIVE_FOLDER if not os.path.exists(archive_folder): return runs = [] for filename in os.listdir(archive_folder): if filename.endswith(".json"): filepath = os.path.join(archive_folder, filename) try: # Leggiamo solo i metadati per non caricare tutto il file with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) metadata = data.get("metadata", {}) # Usiamo il timestamp del nome del file per l'ordinamento dt_str = filename.split("_")[0] run_info = { "datetime": datetime.strptime(dt_str, "%Y%m%d").strftime( "%Y-%m-%d" ) + " " + filename.split("_")[1].replace(".json", ""), "scenario": metadata.get("scenario_name", "N/A"), "duration": f"{metadata.get('duration_seconds', 0):.1f}", "filepath": filepath, } runs.append(run_info) except Exception as e: self.logger.warning( f"Impossibile leggere l'archivio {filename}: {e}" ) # Ordina dal più recente al più vecchio for run in sorted(runs, key=lambda r: r["datetime"], reverse=True): self.analysis_tree.insert( "", tk.END, values=(run["datetime"], run["scenario"], run["duration"]), iid=run["filepath"], ) def _open_archive_folder(self): """Open the simulation archive folder in the system file explorer.""" archive_folder = SimulationArchive.ARCHIVE_FOLDER try: # Ensure the folder exists os.makedirs(archive_folder, exist_ok=True) # On Windows, os.startfile opens the folder in Explorer try: os.startfile(archive_folder) return except Exception: # Fallback to using explorer.exe via subprocess import subprocess try: subprocess.run(["explorer", os.path.abspath(archive_folder)]) return except Exception: pass except Exception as e: self.logger.exception("Failed to open archive folder: %s", e) # If we get here, show an error to the user try: messagebox.showerror( "Error", f"Could not open archive folder: {archive_folder}\nSee logs for details.", ) except Exception: # If even showing a messagebox fails, log and continue try: self.logger.error("Could not open archive folder: %s", archive_folder) except Exception: pass def _on_analyze_run(self): selected_item = self.analysis_tree.focus() if not selected_item: messagebox.showinfo( "Nessuna Selezione", "Seleziona una simulazione da analizzare." ) return archive_filepath = selected_item # L'IID è il filepath # Apri la finestra di analisi passando il percorso del file # (dovremo modificare AnalysisWindow per accettarlo) AnalysisWindow(self, archive_filepath=archive_filepath)