1007 lines
43 KiB
Python
1007 lines
43 KiB
Python
# target_simulator/gui/main_view.py
|
|
|
|
"""
|
|
Main view of the application.
|
|
|
|
This module implements the Tkinter-based main window (``MainView``) which
|
|
coordinates the GUI, the simulation controller, communicators and the
|
|
central SimulationStateHub. Responsibilities include:
|
|
|
|
- constructing and laying out the primary widgets (PPI, scenario editor,
|
|
connection panel, controls),
|
|
- initializing communicators and the simulation engine,
|
|
- providing a periodic GUI refresh loop that pulls data from the
|
|
``SimulationStateHub`` and updates visual components,
|
|
- wiring user actions (start/stop simulation, load/save scenario,
|
|
connect/disconnect) to controller logic.
|
|
|
|
The module exposes the ``MainView`` class which is the application's main
|
|
Tkinter ``Tk`` instance. The class is designed to be instantiated and run
|
|
via ``python -m target_simulator`` which calls ``MainView.mainloop()``.
|
|
|
|
Note: the GUI mutates shared state (e.g., ``simulation_hub``) and therefore
|
|
must run on the main thread. Methods on this class generally have side
|
|
effects (updating widgets, storing settings, starting/stopping threads) and
|
|
do not return useful values.
|
|
"""
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext, messagebox
|
|
from queue import Queue, Empty
|
|
from typing import Optional, Dict, Any, List
|
|
import time
|
|
import math
|
|
import os
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
# Use absolute imports for robustness and clarity
|
|
from target_simulator.gui.ppi_display import PPIDisplay
|
|
from target_simulator.gui.ppi_adapter import build_display_data
|
|
from target_simulator.gui.connection_settings_window import ConnectionSettingsWindow
|
|
from target_simulator.gui.radar_config_window import RadarConfigWindow
|
|
from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame
|
|
from target_simulator.gui.target_list_frame import TargetListFrame
|
|
from target_simulator.gui.connection_panel import ConnectionPanel
|
|
from target_simulator.gui.status_bar import StatusBar
|
|
from target_simulator.gui.simulation_controls import SimulationControls
|
|
|
|
from target_simulator.core.communicator_interface import CommunicatorInterface
|
|
from target_simulator.core.serial_communicator import SerialCommunicator
|
|
from target_simulator.core.tftp_communicator import TFTPCommunicator
|
|
from target_simulator.core.simulation_engine import SimulationEngine
|
|
from target_simulator.core.models import Scenario, Target
|
|
|
|
from target_simulator.utils.logger import get_logger, shutdown_logging_system
|
|
from target_simulator.utils.config_manager import ConfigManager
|
|
from target_simulator.gui.sfp_debug_window import SfpDebugWindow
|
|
from target_simulator.gui.logger_panel import LoggerPanel
|
|
from target_simulator.core.sfp_communicator import SFPCommunicator
|
|
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
|
|
from target_simulator.gui.analysis_window import AnalysisWindow
|
|
from target_simulator.core import command_builder
|
|
from target_simulator.analysis.simulation_archive import SimulationArchive
|
|
from target_simulator.communication.communicator_manager import CommunicatorManager
|
|
from target_simulator.simulation.simulation_controller import SimulationController
|
|
from target_simulator.gui.external_profiler_window import ExternalProfilerWindow
|
|
|
|
from target_simulator.gui.sync_tool_window import SyncToolWindow
|
|
|
|
# --- Import Version Info FOR THE WRAPPER ITSELF ---
|
|
try:
|
|
from target_simulator import _version as wrapper_version
|
|
|
|
WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})"
|
|
except ImportError:
|
|
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
|
|
|
|
GUI_REFRESH_RATE_MS = 100 #40 default 25 fps
|
|
|
|
|
|
class MainView(tk.Tk):
|
|
"""
|
|
Main application window and controller.
|
|
|
|
This class composes the primary UI and wires it to application logic:
|
|
communicators, the SimulationController and SimulationEngine. It owns
|
|
a :class:`SimulationStateHub` instance used application-wide.
|
|
|
|
Key responsibilities:
|
|
- build and layout widgets (PPI display, scenario controls, simulation controls),
|
|
- initialize communicators via :class:`CommunicatorManager`,
|
|
- start/stop simulations via :class:`SimulationController`,
|
|
- periodically refresh GUI elements from the simulation hub.
|
|
|
|
Threading/side-effects:
|
|
- Instantiating MainView will start Tk's mainloop when ``mainloop()`` is
|
|
called; GUI updates must run on the main thread.
|
|
- Many methods update application state and widgets; they return ``None``.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.logger = get_logger(__name__)
|
|
self.config_manager = ConfigManager()
|
|
|
|
self.current_archive: Optional[SimulationArchive] = None
|
|
|
|
# --- Load Settings ---
|
|
settings = self.config_manager.get_general_settings()
|
|
self.scan_limit = settings.get("scan_limit", 60)
|
|
self.max_range = settings.get("max_range", 100)
|
|
self.connection_config = self.config_manager.get_connection_settings()
|
|
|
|
target_sfp_cfg = self.connection_config.get("target", {}).get("sfp", {})
|
|
self.prediction_offset_ms = target_sfp_cfg.get("prediction_offset_ms", 0.0)
|
|
|
|
# --- Initialize the data hub and controllers ---
|
|
self.simulation_hub = SimulationStateHub()
|
|
|
|
self.communicator_manager = CommunicatorManager(
|
|
simulation_hub=self.simulation_hub,
|
|
logger=self.logger,
|
|
defer_sfp_connection=True,
|
|
)
|
|
self.communicator_manager.set_config(self.connection_config)
|
|
|
|
self.simulation_controller = SimulationController(
|
|
communicator_manager=self.communicator_manager,
|
|
simulation_hub=self.simulation_hub,
|
|
config_manager=self.config_manager,
|
|
logger=self.logger,
|
|
)
|
|
|
|
# --- Core Logic Handlers ---
|
|
self.target_communicator: Optional[CommunicatorInterface] = None
|
|
self.lru_communicator: Optional[CommunicatorInterface] = None
|
|
self.scenario = Scenario()
|
|
self.current_scenario_name: Optional[str] = None
|
|
self.sfp_debug_window: Optional[SfpDebugWindow] = None
|
|
self.analysis_window: Optional[AnalysisWindow] = None
|
|
|
|
# --- Simulation Engine ---
|
|
self.simulation_engine: Optional[SimulationEngine] = None
|
|
self.is_simulation_running = tk.BooleanVar(value=False)
|
|
self._start_in_progress_main = False # Guard for start button
|
|
self.time_multiplier = 1.0
|
|
self.update_time = tk.DoubleVar(value=1.0)
|
|
|
|
# Simulation progress tracking
|
|
self.total_sim_time = 0.0
|
|
self.sim_elapsed_time = 0.0
|
|
self.sim_slider_var = tk.DoubleVar(value=0.0)
|
|
self._slider_is_dragging = False
|
|
|
|
# --- Window and UI Setup ---
|
|
self.title(f"Radar Target Simulator")
|
|
self.geometry(settings.get("geometry", "1200x900"))
|
|
self.minsize(1024, 768)
|
|
|
|
self._create_menubar()
|
|
self._create_main_layout()
|
|
self._create_statusbar()
|
|
self._status_after_id = None
|
|
|
|
# --- Post-UI Initialization ---
|
|
self._initialize_communicators()
|
|
self._load_scenarios_into_ui()
|
|
# Determine initial scenario to load. Prefer last_selected_scenario
|
|
# from settings if it exists and is valid; otherwise use the
|
|
# combobox selection (which `update_scenario_list` set to the
|
|
# first available scenario when no explicit selection was provided).
|
|
last_scenario = settings.get("last_selected_scenario")
|
|
scenario_to_load = None
|
|
try:
|
|
available = self.config_manager.get_scenario_names()
|
|
if last_scenario and last_scenario in available:
|
|
scenario_to_load = last_scenario
|
|
else:
|
|
# fallback to whatever the scenario_controls combobox currently
|
|
# has selected (update_scenario_list should have set it)
|
|
if hasattr(self, "scenario_controls") and getattr(
|
|
self.scenario_controls, "current_scenario", None
|
|
):
|
|
val = self.scenario_controls.current_scenario.get()
|
|
if val:
|
|
scenario_to_load = val
|
|
except Exception:
|
|
scenario_to_load = None
|
|
|
|
if scenario_to_load:
|
|
self._on_load_scenario(scenario_to_load)
|
|
|
|
self._update_window_title()
|
|
self.protocol("WM_DELETE_WINDOW", self._on_closing)
|
|
self.logger.info("MainView initialized successfully.")
|
|
|
|
# Allow overriding the GUI refresh interval from settings.json.
|
|
# Support both `GUI_REFRESH_RATE_MS` and `gui_refresh_rate_ms` keys
|
|
try:
|
|
configured = settings.get("GUI_REFRESH_RATE_MS", settings.get("gui_refresh_rate_ms", None))
|
|
self.gui_refresh_rate_ms = int(configured) if configured is not None else GUI_REFRESH_RATE_MS
|
|
except Exception:
|
|
self.gui_refresh_rate_ms = GUI_REFRESH_RATE_MS
|
|
|
|
self.after(self.gui_refresh_rate_ms, self._gui_refresh_loop)
|
|
self.after(1000, self._update_rate_status)
|
|
self.after(1000, self._update_latency_status)
|
|
|
|
def _create_main_layout(self):
|
|
"""Construct the main window layout and compose child widgets.
|
|
|
|
This method creates the primary panes, instantiates child widgets such
|
|
as the `PPIDisplay`, `ScenarioControlsFrame`, `TargetListFrame` and
|
|
`SimulationControls`, and places the log pane at the bottom.
|
|
"""
|
|
v_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
|
|
v_pane.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
|
|
self.h_pane = ttk.PanedWindow(v_pane, orient=tk.HORIZONTAL)
|
|
v_pane.add(self.h_pane, weight=4)
|
|
|
|
# --- Right Pane (connection panel + PPI) ---
|
|
right_container = ttk.Frame(self.h_pane)
|
|
self.h_pane.add(right_container, weight=2)
|
|
|
|
self.connection_panel = ConnectionPanel(
|
|
right_container, initial_config=self.connection_config
|
|
)
|
|
self.connection_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2))
|
|
self.connection_panel.set_connect_handler(self._on_connect_button)
|
|
self.connection_panel.set_open_settings_handler(self._open_settings)
|
|
|
|
self.ppi_widget = PPIDisplay(
|
|
right_container, max_range_nm=self.max_range, scan_limit_deg=self.scan_limit
|
|
)
|
|
self.ppi_widget.pack(
|
|
side=tk.TOP, fill=tk.BOTH, expand=True, padx=2, pady=(2, 5)
|
|
)
|
|
|
|
# --- Left Pane ---
|
|
left_pane_container = ttk.Frame(self.h_pane)
|
|
self.h_pane.add(left_pane_container, weight=1)
|
|
|
|
# Place the ScenarioControlsFrame above the notebook so it is always visible
|
|
# regardless of which tab is selected.
|
|
self.scenario_controls = ScenarioControlsFrame(
|
|
left_pane_container,
|
|
main_view=self,
|
|
load_scenario_command=self._on_load_scenario,
|
|
save_as_command=self._on_save_scenario_as,
|
|
delete_command=self._on_delete_scenario,
|
|
new_scenario_command=self._on_new_scenario,
|
|
)
|
|
self.scenario_controls.pack(fill=tk.X, expand=False, padx=5, pady=(5, 5))
|
|
|
|
left_notebook = ttk.Notebook(left_pane_container)
|
|
left_notebook.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# --- TAB 1: Editing scenario (keeps scenario-related tools in the main
|
|
# controls area above the notebook; the tab itself is used for editing)
|
|
scenario_tab = ttk.Frame(left_notebook)
|
|
left_notebook.add(scenario_tab, text="Editing scenario")
|
|
|
|
self.target_list = TargetListFrame(
|
|
scenario_tab, targets_changed_callback=self._on_targets_changed
|
|
)
|
|
self.target_list.pack(fill=tk.BOTH, expand=True, padx=5)
|
|
|
|
# --- TAB 2: SIMULATION ---
|
|
simulation_tab = ttk.Frame(left_notebook)
|
|
left_notebook.add(simulation_tab, text="Simulation")
|
|
|
|
self.simulation_controls = SimulationControls(simulation_tab, self)
|
|
self.simulation_controls.pack(fill=tk.X, padx=5, pady=10, anchor="n")
|
|
|
|
# Make controls directly accessible for backward compatibility
|
|
self.start_button = self.simulation_controls.start_button
|
|
self.stop_button = self.simulation_controls.stop_button
|
|
self.reset_radar_button = self.simulation_controls.reset_radar_button
|
|
|
|
# --- TAB 3: LRU SIMULATION (unchanged) ---
|
|
lru_tab = ttk.Frame(left_notebook)
|
|
left_notebook.add(lru_tab, text="LRU Simulation")
|
|
# ... widgets for LRU ...
|
|
|
|
# --- TAB 4: Analysis ---
|
|
analysis_tab = ttk.Frame(left_notebook)
|
|
left_notebook.add(analysis_tab, text="Analysis")
|
|
self._create_analysis_tab_widgets(analysis_tab)
|
|
|
|
# --- Bottom Pane (Logs) ---
|
|
log_frame_container = ttk.LabelFrame(v_pane, text="Logs")
|
|
v_pane.add(log_frame_container, weight=1)
|
|
self.log_text_widget = scrolledtext.ScrolledText(
|
|
log_frame_container, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9)
|
|
)
|
|
self.log_text_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
|
|
def _create_analysis_tab_widgets(self, parent):
|
|
"""Create widgets for the Analysis tab (treeview + controls).
|
|
|
|
Args:
|
|
parent (tk.Widget): the parent container for the analysis tab.
|
|
"""
|
|
# Place the tree inside a container so we can attach scrollbars neatly
|
|
tree_container = ttk.Frame(parent)
|
|
tree_container.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
|
|
|
|
# Vertical scrollbar
|
|
vscroll = ttk.Scrollbar(tree_container, orient=tk.VERTICAL)
|
|
|
|
self.analysis_tree = ttk.Treeview(
|
|
tree_container,
|
|
columns=("datetime", "scenario", "duration"),
|
|
show="headings",
|
|
yscrollcommand=vscroll.set,
|
|
)
|
|
|
|
# Configure vertical scrollbar to control the tree
|
|
vscroll.config(command=self.analysis_tree.yview)
|
|
|
|
self.analysis_tree.heading("datetime", text="Date/Time")
|
|
self.analysis_tree.heading("scenario", text="Scenario Name")
|
|
self.analysis_tree.heading("duration", text="Duration (s)")
|
|
self.analysis_tree.column("datetime", width=150)
|
|
self.analysis_tree.column("scenario", width=200)
|
|
self.analysis_tree.column("duration", width=80, anchor=tk.E)
|
|
|
|
# Bind double-click to open analysis
|
|
self.analysis_tree.bind("<Double-Button-1>", lambda e: self._on_analyze_run())
|
|
|
|
# Use grid inside the container to place tree + scrollbars
|
|
tree_container.grid_rowconfigure(0, weight=1)
|
|
tree_container.grid_columnconfigure(0, weight=1)
|
|
|
|
self.analysis_tree.grid(row=0, column=0, sticky="nsew")
|
|
vscroll.grid(row=0, column=1, sticky="ns")
|
|
|
|
btn_frame = ttk.Frame(parent)
|
|
btn_frame.pack(fill=tk.X, padx=5, pady=5)
|
|
ttk.Button(
|
|
btn_frame, text="Refresh List", command=self._refresh_analysis_list
|
|
).pack(side=tk.LEFT)
|
|
ttk.Button(
|
|
btn_frame, text="Open Archive Folder", command=self._open_archive_folder
|
|
).pack(side=tk.LEFT, padx=(6, 0))
|
|
ttk.Button(
|
|
btn_frame, text="Analyze Selected", command=self._on_analyze_run
|
|
).pack(side=tk.RIGHT)
|
|
|
|
self.after(100, self._refresh_analysis_list)
|
|
|
|
def _create_menubar(self):
|
|
"""Create and attach the top-level menubar with Settings and Debug menus."""
|
|
menubar = tk.Menu(self)
|
|
self.config(menu=menubar)
|
|
settings_menu = tk.Menu(menubar, tearoff=0)
|
|
menubar.add_cascade(label="Settings", menu=settings_menu)
|
|
settings_menu.add_command(label="Connection...", command=self._open_settings)
|
|
settings_menu.add_command(
|
|
label="Radar Config...", command=self._open_radar_config
|
|
)
|
|
|
|
debug_menu = tk.Menu(menubar, tearoff=0)
|
|
menubar.add_cascade(label="Debug", menu=debug_menu)
|
|
debug_menu.add_command(
|
|
label="SFP Packet Inspector...", command=self._open_sfp_debug_window
|
|
)
|
|
debug_menu.add_command(
|
|
label="Logger Levels...", command=self._open_logger_panel
|
|
)
|
|
debug_menu.add_separator()
|
|
debug_menu.add_command(label="Sync Tool...", command=self._open_sync_tool)
|
|
debug_menu.add_command(label="External Profiler...", command=self._open_external_profiler)
|
|
|
|
def _create_statusbar(self):
|
|
"""Create and place the application's status bar widget and expose vars."""
|
|
self.status_bar = StatusBar(self)
|
|
self.status_bar.place(relx=0.0, rely=1.0, anchor="sw", relwidth=1.0, height=24)
|
|
self.status_var = self.status_bar.status_var
|
|
self.rate_status_var = self.status_bar.rate_status_var
|
|
self.latency_status_var = self.status_bar.latency_status_var
|
|
self._status_after_id = None
|
|
|
|
def show_status_message(self, text: str, timeout_ms: Optional[int] = 3000):
|
|
"""Show a transient status message in the status bar.
|
|
|
|
Args:
|
|
text (str): Text to display.
|
|
timeout_ms (Optional[int]): How long (ms) the message remains visible.
|
|
"""
|
|
if hasattr(self, "status_bar") and self.status_bar:
|
|
self.status_bar.show_status_message(text, timeout_ms=timeout_ms)
|
|
|
|
def clear_status_message(self):
|
|
"""Clear the current status message and set a default 'Ready' state."""
|
|
if hasattr(self, "status_bar") and self.status_bar:
|
|
self.status_bar.show_status_message("Ready", timeout_ms=0)
|
|
|
|
def _update_window_title(self):
|
|
"""Update the main window title to reflect current scenario and version."""
|
|
base_title = f"Radar Target Simulator- {WRAPPER_APP_VERSION_STRING}"
|
|
if self.current_scenario_name:
|
|
self.title(f"{base_title} - {self.current_scenario_name}")
|
|
else:
|
|
self.title(base_title)
|
|
|
|
def _on_connection_state_change(self, is_connected: bool):
|
|
"""Handle connection state changes propagated from the CommunicatorManager.
|
|
|
|
Updates the connection panel, status bar and any open debug windows.
|
|
"""
|
|
self.logger.info(
|
|
f"MainView received connection state change: Connected={is_connected}"
|
|
)
|
|
if hasattr(self, "status_bar"):
|
|
self.status_bar.set_target_connected(is_connected)
|
|
if hasattr(self, "connection_panel"):
|
|
self.connection_panel.update_toggle_state(is_connected)
|
|
if self.sfp_debug_window and self.sfp_debug_window.winfo_exists():
|
|
if hasattr(self.sfp_debug_window, "update_toggle_state"):
|
|
self.sfp_debug_window.update_toggle_state(is_connected)
|
|
|
|
def _initialize_communicators(self):
|
|
"""Initialize communicators via the CommunicatorManager and wire callbacks.
|
|
|
|
Returns nothing; updates `self.target_communicator` and `self.lru_communicator`.
|
|
"""
|
|
t_comm, t_connected, l_comm, l_connected = (
|
|
self.communicator_manager.initialize_communicators()
|
|
)
|
|
self.target_communicator = t_comm
|
|
self.lru_communicator = l_comm
|
|
if hasattr(self, "status_bar"):
|
|
self.status_bar.set_target_connected(t_connected)
|
|
self.status_bar.set_lru_connected(l_connected)
|
|
self.communicator_manager.add_connection_state_callback(
|
|
self._on_connection_state_change
|
|
)
|
|
|
|
def update_connection_settings(self, new_config: Dict[str, Any]):
|
|
"""Apply and persist new connection settings supplied by the settings dialog.
|
|
|
|
Args:
|
|
new_config (Dict[str, Any]): New configuration dictionary for target/lru.
|
|
"""
|
|
self.logger.info(f"Updating connection settings: {new_config}")
|
|
self.connection_config = new_config
|
|
self.config_manager.save_connection_settings(new_config)
|
|
|
|
target_sfp_cfg = self.connection_config.get("target", {}).get("sfp", {})
|
|
self.prediction_offset_ms = target_sfp_cfg.get("prediction_offset_ms", 0.0)
|
|
|
|
if hasattr(self, "connection_panel"):
|
|
self.connection_panel.update_summary(self.connection_config)
|
|
|
|
self.communicator_manager.set_config(new_config)
|
|
self._initialize_communicators()
|
|
|
|
def _open_settings(self):
|
|
"""Open the modal ConnectionSettingsWindow to edit connection config."""
|
|
self.logger.info("Opening connection settings window.")
|
|
ConnectionSettingsWindow(self, self.config_manager, self.connection_config)
|
|
|
|
def _on_connect_button(self):
|
|
"""Toggle connection: attempt connect or issue disconnect request."""
|
|
self.logger.info("Connection toggle requested by user.")
|
|
if self.target_communicator and self.target_communicator.is_open:
|
|
self.communicator_manager.disconnect_target()
|
|
else:
|
|
if not self.communicator_manager.connect_target():
|
|
messagebox.showerror(
|
|
"Connection Failed",
|
|
"Could not connect. Check settings and logs.",
|
|
)
|
|
|
|
def _reset_radar_state(self):
|
|
self.simulation_controller.reset_radar_state(self)
|
|
|
|
def _on_start_simulation(self):
|
|
"""Request start of the currently loaded scenario via SimulationController."""
|
|
if self._start_in_progress_main:
|
|
self.logger.info("Start already in progress; ignoring duplicate request.")
|
|
return
|
|
# Ensure the scenario currently selected in the ScenarioControlsFrame
|
|
# is loaded before starting the simulation. This makes the UI's
|
|
# visible combobox the single source-of-truth for the selected scenario.
|
|
try:
|
|
if hasattr(self, "scenario_controls") and getattr(
|
|
self.scenario_controls, "current_scenario", None
|
|
):
|
|
sel = self.scenario_controls.current_scenario.get()
|
|
if sel and sel != self.current_scenario_name:
|
|
self._on_load_scenario(sel)
|
|
except Exception:
|
|
# Non-fatal: proceed with existing scenario if anything goes wrong
|
|
pass
|
|
|
|
self._start_in_progress_main = True
|
|
self.simulation_controller.start_simulation(self)
|
|
|
|
def _on_stop_simulation(self):
|
|
"""Request the simulation controller to stop the running simulation."""
|
|
self.simulation_controller.stop_simulation(self)
|
|
|
|
def _on_simulation_finished(self):
|
|
self.simulation_controller.on_simulation_finished(self)
|
|
|
|
def _on_reset_simulation(self):
|
|
"""Reset the simulation and related UI to the scenario's initial state."""
|
|
self.logger.info("Resetting scenario to initial state.")
|
|
self.simulation_controls.hide_notice()
|
|
if self.is_simulation_running.get():
|
|
self._on_stop_simulation()
|
|
|
|
self.simulation_hub.reset()
|
|
self.ppi_widget.clear_trails()
|
|
self.ppi_widget.clear_finished_trajectory_markers()
|
|
self.scenario.reset_simulation()
|
|
self._update_all_views()
|
|
self.show_status_message("Scenario reset to initial state.", timeout_ms=3000)
|
|
|
|
def _update_button_states(self):
|
|
"""Enable/disable UI buttons based on simulation and start-in-progress state."""
|
|
is_running = self.is_simulation_running.get()
|
|
start_in_progress = self._start_in_progress_main
|
|
|
|
start_state = tk.DISABLED if (is_running or start_in_progress) else tk.NORMAL
|
|
stop_state = tk.NORMAL if is_running else tk.DISABLED
|
|
|
|
controls_state = (
|
|
"readonly" if not (is_running or start_in_progress) else tk.DISABLED
|
|
)
|
|
button_controls_state = (
|
|
tk.NORMAL if not (is_running or start_in_progress) else tk.DISABLED
|
|
)
|
|
|
|
self.start_button.config(state=start_state)
|
|
self.stop_button.config(state=stop_state)
|
|
self.reset_radar_button.config(state=button_controls_state)
|
|
|
|
self.scenario_controls.new_button.config(state=button_controls_state)
|
|
self.scenario_controls.save_button.config(state=button_controls_state)
|
|
self.scenario_controls.save_as_button.config(state=button_controls_state)
|
|
self.scenario_controls.delete_button.config(state=button_controls_state)
|
|
self.scenario_controls.scenario_combobox.config(state=controls_state)
|
|
|
|
self.target_list.add_button.config(state=button_controls_state)
|
|
self.target_list.remove_button.config(state=button_controls_state)
|
|
self.target_list.edit_button.config(state=button_controls_state)
|
|
self.target_list.tree.config(
|
|
selectmode="browse" if not (is_running or start_in_progress) else "none"
|
|
)
|
|
|
|
def _update_simulation_progress_display(self):
|
|
"""Update the labels that show elapsed and total simulation time."""
|
|
try:
|
|
elapsed = self.sim_elapsed_time
|
|
total = self.total_sim_time
|
|
self.simulation_controls.sim_elapsed_label.config(text=f"{elapsed:.1f}s")
|
|
self.simulation_controls.sim_total_label.config(text=f"{total:.1f}s")
|
|
except Exception:
|
|
pass
|
|
|
|
def _update_rate_status(self):
|
|
try:
|
|
real_rate = (
|
|
self.simulation_hub.get_real_rate(1.0) if self.simulation_hub else 0.0
|
|
)
|
|
packet_rate = (
|
|
self.simulation_hub.get_packet_rate(1.0) if self.simulation_hub else 0.0
|
|
)
|
|
ppi_rate = (
|
|
self.ppi_widget.get_real_update_rate(1.0)
|
|
if hasattr(self, "ppi_widget")
|
|
else 0.0
|
|
)
|
|
|
|
if self.rate_status_var:
|
|
if packet_rate > 0.0:
|
|
self.rate_status_var.set(
|
|
f"pkt in: {packet_rate:.1f} pkt/s | ev in: {real_rate:.1f} ev/s | ppi upd: {ppi_rate:.1f} upd/s"
|
|
)
|
|
else:
|
|
self.rate_status_var.set(
|
|
f"real in: {real_rate:.1f} ev/s | ppi upd: {ppi_rate:.1f} upd/s"
|
|
)
|
|
except Exception as e:
|
|
self.logger.debug(f"Error updating rate status: {e}")
|
|
finally:
|
|
self.after(1000, self._update_rate_status)
|
|
|
|
def _on_seek(self):
|
|
"""Seek to a new time within the current simulation when slider is moved."""
|
|
if not self.simulation_engine or not self.simulation_engine.scenario:
|
|
return
|
|
frac = float(self.simulation_controls.sim_slider_var.get())
|
|
new_time = max(0.0, min(self.total_sim_time, frac * self.total_sim_time))
|
|
self.simulation_engine.set_simulation_time(new_time)
|
|
self.sim_elapsed_time = new_time
|
|
self._update_simulation_progress_display()
|
|
|
|
def _on_targets_changed(self, targets: List[Target]):
|
|
"""Callback invoked when the target list is modified in the UI.
|
|
|
|
Updates the in-memory scenario targets and persists the scenario if
|
|
it has a name.
|
|
"""
|
|
self.scenario.targets = {t.target_id: t for t in targets}
|
|
self.ppi_widget.update_simulated_targets(targets)
|
|
if self.current_scenario_name:
|
|
self.config_manager.save_scenario(
|
|
self.current_scenario_name, self.scenario.to_dict()
|
|
)
|
|
|
|
def _update_all_views(self, targets_to_display: Optional[List[Target]] = None):
|
|
"""Refresh all UI views (title, target list, PPI display) with current data."""
|
|
self._update_window_title()
|
|
if targets_to_display is None:
|
|
targets_to_display = self.scenario.get_all_targets()
|
|
self.target_list.update_target_list(targets_to_display)
|
|
self.ppi_widget.update_simulated_targets(targets_to_display)
|
|
|
|
def _load_scenarios_into_ui(self):
|
|
scenario_names = self.config_manager.get_scenario_names()
|
|
self.scenario_controls.update_scenario_list(
|
|
scenario_names, self.current_scenario_name
|
|
)
|
|
if hasattr(self, "simulation_controls") and hasattr(
|
|
self.simulation_controls, "sim_scenario_combobox"
|
|
):
|
|
self.simulation_controls.sim_scenario_combobox["values"] = scenario_names
|
|
|
|
def _on_load_scenario(self, scenario_name: str):
|
|
if self.is_simulation_running.get():
|
|
self._on_stop_simulation()
|
|
|
|
self.logger.info(f"Loading scenario: {scenario_name}")
|
|
scenario_data = self.config_manager.get_scenario(scenario_name)
|
|
if scenario_data:
|
|
self.simulation_hub.clear_simulated_data()
|
|
self.ppi_widget.clear_trails()
|
|
self.ppi_widget.update_simulated_targets([])
|
|
|
|
self.scenario = Scenario.from_dict(scenario_data)
|
|
self.current_scenario_name = scenario_name
|
|
# Ensure the always-visible ScenarioControlsFrame combobox reflects
|
|
# the scenario that was just loaded (keeps title and combobox in sync)
|
|
try:
|
|
if (
|
|
hasattr(self, "scenario_controls")
|
|
and getattr(self.scenario_controls, "current_scenario", None)
|
|
is not None
|
|
):
|
|
self.scenario_controls.current_scenario.set(scenario_name)
|
|
# Also update the combobox widget value explicitly
|
|
try:
|
|
self.scenario_controls.scenario_combobox.set(scenario_name)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
self.target_list.update_target_list(self.scenario.get_all_targets())
|
|
self._update_all_views()
|
|
if hasattr(self, "simulation_controls") and hasattr(
|
|
self.simulation_controls, "sim_scenario_combobox"
|
|
):
|
|
self.simulation_controls.sim_scenario_combobox.set(scenario_name)
|
|
|
|
self.ppi_widget.draw_scenario_preview(self.scenario)
|
|
else:
|
|
self.logger.warning(
|
|
f"Attempted to load a non-existent scenario: {scenario_name}"
|
|
)
|
|
|
|
def _on_save_scenario(self, scenario_name: str):
|
|
self.logger.info(f"Saving scenario: {scenario_name}")
|
|
self.scenario.name = scenario_name
|
|
self.scenario.targets = {t.target_id: t for t in self.target_list.get_targets()}
|
|
self.config_manager.save_scenario(scenario_name, self.scenario.to_dict())
|
|
self.current_scenario_name = scenario_name
|
|
self._load_scenarios_into_ui()
|
|
self._update_window_title()
|
|
messagebox.showinfo(
|
|
"Success", f"Scenario '{scenario_name}' saved.", parent=self
|
|
)
|
|
|
|
def _on_save_scenario_as(self, scenario_name: str):
|
|
self._on_save_scenario(scenario_name)
|
|
|
|
def _on_new_scenario(self, scenario_name: str):
|
|
if scenario_name in self.config_manager.get_scenario_names():
|
|
messagebox.showinfo(
|
|
"Duplicate", f"Scenario '{scenario_name}' already exists.", parent=self
|
|
)
|
|
self._on_load_scenario(scenario_name)
|
|
return
|
|
|
|
self.logger.info(f"Creating new scenario: {scenario_name}")
|
|
self.scenario = Scenario(name=scenario_name)
|
|
self.current_scenario_name = scenario_name
|
|
self._update_all_views()
|
|
|
|
names = list(self.config_manager.get_scenario_names()) + [scenario_name]
|
|
self.scenario_controls.update_scenario_list(
|
|
names, select_scenario=scenario_name
|
|
)
|
|
if hasattr(self, "simulation_controls") and hasattr(
|
|
self.simulation_controls, "sim_scenario_combobox"
|
|
):
|
|
self.simulation_controls.sim_scenario_combobox["values"] = names
|
|
self.simulation_controls.sim_scenario_combobox.set(scenario_name)
|
|
|
|
def _on_delete_scenario(self, scenario_name: str):
|
|
self.logger.info(f"Deleting scenario: {scenario_name}")
|
|
self.config_manager.delete_scenario(scenario_name)
|
|
if self.current_scenario_name == scenario_name:
|
|
self.current_scenario_name = None
|
|
self.scenario = Scenario()
|
|
self._update_all_views()
|
|
self._load_scenarios_into_ui()
|
|
|
|
def _on_send_lru_status(self):
|
|
pass
|
|
|
|
def _open_radar_config(self):
|
|
dialog = RadarConfigWindow(
|
|
self, current_scan_limit=self.scan_limit, current_max_range=self.max_range
|
|
)
|
|
if dialog.scan_limit is not None and dialog.max_range is not None:
|
|
if (
|
|
self.scan_limit != dialog.scan_limit
|
|
or self.max_range != dialog.max_range
|
|
):
|
|
self.scan_limit = dialog.scan_limit
|
|
self.max_range = dialog.max_range
|
|
self.ppi_widget.reconfigure_radar(
|
|
max_range_nm=self.max_range, scan_limit_deg=self.scan_limit
|
|
)
|
|
|
|
def _on_closing(self):
|
|
"""Handles the window closing event."""
|
|
self.logger.info("Application shutting down.")
|
|
|
|
# Stop the simulation if it's running
|
|
if self.is_simulation_running.get():
|
|
self._on_stop_simulation()
|
|
|
|
# Save window geometry and last scenario
|
|
settings_to_save = {
|
|
"scan_limit": self.scan_limit,
|
|
"max_range": self.max_range,
|
|
"geometry": self.winfo_geometry(),
|
|
"last_selected_scenario": self.current_scenario_name,
|
|
}
|
|
self.config_manager.save_general_settings(settings_to_save)
|
|
|
|
# --- MODIFIED PART START ---
|
|
# Gracefully shut down all communicators using the manager
|
|
if self.communicator_manager:
|
|
self.communicator_manager.shutdown()
|
|
# --- MODIFIED PART END ---
|
|
|
|
# Stop other background tasks
|
|
if hasattr(self, "status_bar") and self.status_bar:
|
|
self.status_bar.stop_resource_monitor()
|
|
|
|
shutdown_logging_system()
|
|
self.destroy()
|
|
|
|
def _open_sfp_debug_window(self):
|
|
if self.sfp_debug_window and self.sfp_debug_window.winfo_exists():
|
|
self.sfp_debug_window.lift()
|
|
return
|
|
self.sfp_debug_window = SfpDebugWindow(self)
|
|
|
|
def _open_logger_panel(self):
|
|
LoggerPanel(self)
|
|
|
|
def _build_display_data_from_hub(self) -> Dict[str, List[Target]]:
|
|
return build_display_data(
|
|
self.simulation_hub,
|
|
scenario=self.scenario,
|
|
engine=self.simulation_engine,
|
|
ppi_widget=self.ppi_widget,
|
|
logger=self.logger,
|
|
)
|
|
|
|
def _gui_refresh_loop(self):
|
|
sim_is_running_now = (
|
|
self.simulation_engine is not None and self.simulation_engine.is_running()
|
|
)
|
|
|
|
if self.is_simulation_running.get() and not sim_is_running_now:
|
|
self._on_simulation_finished()
|
|
|
|
# Update PPI with the latest data from the hub
|
|
display_data = self._build_display_data_from_hub()
|
|
self.ppi_widget.update_simulated_targets(display_data.get("simulated", []))
|
|
self.ppi_widget.update_real_targets(display_data.get("real", []))
|
|
|
|
if self.simulation_hub:
|
|
# Update antenna sweep line
|
|
if self.ppi_widget.animate_antenna_var.get():
|
|
az_deg, _ = self.simulation_hub.get_antenna_azimuth()
|
|
try:
|
|
# Debug: record the value read from the hub and hub identity
|
|
# Log solo se l'azimuth è valido (non None)
|
|
if az_deg is not None:
|
|
self.logger.debug(
|
|
"MainView: read antenna az from hub -> az=%s hub_id=%s",
|
|
az_deg,
|
|
id(self._hub),
|
|
)
|
|
except Exception:
|
|
pass
|
|
if az_deg is not None:
|
|
self.ppi_widget.render_antenna_line(az_deg)
|
|
else:
|
|
self.ppi_widget.render_antenna_line(None) # Hide if no data
|
|
else:
|
|
self.ppi_widget.render_antenna_line(None) # Hide if animation is off
|
|
|
|
# Update ownship state for both PPI orientation and status display
|
|
ownship_state = self.simulation_hub.get_ownship_state()
|
|
if ownship_state:
|
|
ownship_heading = ownship_state.get("heading_deg", 0.0)
|
|
self.ppi_widget.update_ownship_state(ownship_heading)
|
|
self.simulation_controls.update_ownship_display(ownship_state)
|
|
else:
|
|
# Ensure display is cleared if no ownship data is present
|
|
ownship_state = {}
|
|
self.simulation_controls.update_ownship_display({})
|
|
|
|
# Update the new active targets table, providing ownship state for context
|
|
self.simulation_controls.update_targets_table(
|
|
display_data.get("simulated", []), ownship_state
|
|
)
|
|
|
|
if sim_is_running_now:
|
|
if self.simulation_engine and self.simulation_engine.scenario:
|
|
times = [
|
|
getattr(t, "_sim_time_s", 0.0)
|
|
for t in self.simulation_engine.scenario.get_all_targets()
|
|
]
|
|
self.sim_elapsed_time = max(times) if times else 0.0
|
|
|
|
if self.total_sim_time > 0 and not self._slider_is_dragging:
|
|
progress = min(1.0, self.sim_elapsed_time / self.total_sim_time)
|
|
self.simulation_controls.sim_slider_var.set(progress)
|
|
|
|
self._update_simulation_progress_display()
|
|
|
|
if hasattr(self, "ppi_widget") and self.ppi_widget.canvas:
|
|
self.ppi_widget.canvas.draw_idle()
|
|
|
|
# Schedule next GUI refresh using configured interval
|
|
try:
|
|
self.after(self.gui_refresh_rate_ms, self._gui_refresh_loop)
|
|
except Exception:
|
|
# Fallback to module-level default if instance attr missing
|
|
self.after(GUI_REFRESH_RATE_MS, self._gui_refresh_loop)
|
|
|
|
def _refresh_analysis_list(self):
|
|
"""Refreshes the list of archived simulation runs in the analysis tab."""
|
|
self.analysis_tree.delete(*self.analysis_tree.get_children())
|
|
archive_folder = SimulationArchive.ARCHIVE_FOLDER
|
|
if not os.path.exists(archive_folder):
|
|
return
|
|
|
|
runs = []
|
|
for filename in os.listdir(archive_folder):
|
|
# --- MODIFIED PART START ---
|
|
# Skip performance files, only list main simulation archives
|
|
if filename.endswith((".perf.csv", ".perf.json")):
|
|
continue
|
|
# --- MODIFIED PART END ---
|
|
|
|
if filename.endswith(".json"):
|
|
filepath = os.path.join(archive_folder, filename)
|
|
try:
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
metadata = data.get("metadata", {})
|
|
|
|
# Extract timestamp from filename for sorting
|
|
try:
|
|
dt_str = filename.split("_")[0]
|
|
time_str = filename.split("_")[1]
|
|
datetime_obj = datetime.strptime(
|
|
f"{dt_str}_{time_str}", "%Y%m%d_%H%M%S"
|
|
)
|
|
except (IndexError, ValueError):
|
|
# Fallback to file modification time if filename parsing fails
|
|
datetime_obj = datetime.fromtimestamp(
|
|
os.path.getmtime(filepath)
|
|
)
|
|
|
|
run_info = {
|
|
"datetime_obj": datetime_obj,
|
|
"scenario": metadata.get("scenario_name", "N/A"),
|
|
"duration": f"{metadata.get('duration_seconds', 0):.1f}",
|
|
"filepath": filepath,
|
|
}
|
|
runs.append(run_info)
|
|
except Exception as e:
|
|
self.logger.warning(
|
|
f"Could not read or parse archive {filename}: {e}"
|
|
)
|
|
|
|
# Sort runs by datetime descending (most recent first)
|
|
for run in sorted(runs, key=lambda r: r["datetime_obj"], reverse=True):
|
|
self.analysis_tree.insert(
|
|
"",
|
|
tk.END,
|
|
values=(
|
|
run["datetime_obj"].strftime("%Y-%m-%d %H:%M:%S"),
|
|
run["scenario"],
|
|
run["duration"],
|
|
),
|
|
iid=run["filepath"], # Use filepath as a unique identifier for the row
|
|
)
|
|
|
|
def _open_archive_folder(self):
|
|
archive_folder = SimulationArchive.ARCHIVE_FOLDER
|
|
try:
|
|
os.makedirs(archive_folder, exist_ok=True)
|
|
if sys.platform == "win32":
|
|
os.startfile(os.path.abspath(archive_folder))
|
|
elif sys.platform == "darwin":
|
|
import subprocess
|
|
|
|
subprocess.run(["open", os.path.abspath(archive_folder)])
|
|
else:
|
|
import subprocess
|
|
|
|
subprocess.run(["xdg-open", os.path.abspath(archive_folder)])
|
|
except Exception as e:
|
|
self.logger.exception(f"Failed to open archive folder: {e}")
|
|
messagebox.showerror(
|
|
"Error", f"Could not open archive folder automatically."
|
|
)
|
|
|
|
def _on_analyze_run(self):
|
|
selected_item = self.analysis_tree.focus()
|
|
if not selected_item:
|
|
messagebox.showinfo(
|
|
"No Selection", "Please select a simulation run to analyze."
|
|
)
|
|
return
|
|
archive_filepath = selected_item
|
|
AnalysisWindow(self, archive_filepath=archive_filepath)
|
|
|
|
def _update_latency_status(self):
|
|
"""Periodically updates the latency display in the status bar."""
|
|
try:
|
|
latency_s = 0.0
|
|
if self.target_communicator and hasattr(self.target_communicator, "router"):
|
|
router = self.target_communicator.router()
|
|
if router and hasattr(router, "get_estimated_latency_s"):
|
|
latency_s = router.get_estimated_latency_s()
|
|
|
|
# Update the status bar display
|
|
if hasattr(self, "latency_status_var") and self.latency_status_var:
|
|
if latency_s > 0:
|
|
latency_ms = latency_s * 1000
|
|
self.latency_status_var.set(f"Latency: {latency_ms:.1f} ms")
|
|
else:
|
|
self.latency_status_var.set("") # Clear if no latency
|
|
|
|
# Update the simulation engine's prediction horizon if it's running
|
|
if self.simulation_engine and self.simulation_engine.is_running():
|
|
# Total horizon = automatic latency + manual offset
|
|
total_horizon_s = latency_s + (self.prediction_offset_ms / 1000.0)
|
|
self.simulation_engine.set_prediction_horizon(total_horizon_s)
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Error updating latency status: {e}")
|
|
finally:
|
|
# Schedule the next update
|
|
self.after(1000, self._update_latency_status)
|
|
|
|
def _open_sync_tool(self):
|
|
"""Apre la finestra dello strumento di sincronizzazione."""
|
|
# Controlla se il communicator target è di tipo SFP
|
|
if isinstance(self.target_communicator, SFPCommunicator):
|
|
# CORREZIONE: Ottieni il router direttamente dal communicator
|
|
router = self.target_communicator.router()
|
|
if router:
|
|
SyncToolWindow(self, self.target_communicator, router)
|
|
else:
|
|
messagebox.showwarning(
|
|
"Router Not Found",
|
|
"SFP communicator is active, but its payload router could not be found.",
|
|
parent=self,
|
|
)
|
|
else:
|
|
messagebox.showwarning(
|
|
"Not Available",
|
|
"Sync Tool requires an active SFP communicator.",
|
|
parent=self,
|
|
)
|
|
|
|
def _open_external_profiler(self):
|
|
"""Apre la finestra dello strumento di profiling esterno."""
|
|
ExternalProfilerWindow(self)
|