S1005403_RisCC/target_simulator/gui/main_view.py

1259 lines
55 KiB
Python

# target_simulator/gui/main_view.py
"""
Main view of the application, containing the primary window and widgets.
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
from queue import Queue, Empty
from typing import Optional, Dict, Any, List
import time
# Use absolute imports for robustness and clarity
from target_simulator.gui.ppi_display import PPIDisplay
from target_simulator.gui.connection_settings_window import ConnectionSettingsWindow
from target_simulator.gui.radar_config_window import RadarConfigWindow
from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame
from target_simulator.gui.target_list_frame import TargetListFrame
from target_simulator.core.communicator_interface import CommunicatorInterface
from target_simulator.core.serial_communicator import SerialCommunicator
from target_simulator.core.tftp_communicator import TFTPCommunicator
from target_simulator.core.simulation_engine import SimulationEngine
from target_simulator.core.models import Scenario, Target
from target_simulator.utils.logger import get_logger, shutdown_logging_system
from target_simulator.utils.config_manager import ConfigManager
from target_simulator.gui.sfp_debug_window import SfpDebugWindow
from target_simulator.core.sfp_communicator import SFPCommunicator
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.gui.analysis_window import AnalysisWindow
from target_simulator.core import command_builder
GUI_QUEUE_POLL_INTERVAL_MS = 100
class MainView(tk.Tk):
"""The main application window."""
def __init__(self):
super().__init__()
self.logger = get_logger(__name__)
self.config_manager = ConfigManager()
# --- Load Settings ---
settings = self.config_manager.get_general_settings()
self.scan_limit = settings.get("scan_limit", 60)
self.max_range = settings.get("max_range", 100)
self.connection_config = self.config_manager.get_connection_settings()
# Defer establishing SFP receive connection until simulation start
self.defer_sfp_connection = True
# --- Initialize the data hub and analyzer ---
self.simulation_hub = SimulationStateHub()
self.performance_analyzer = PerformanceAnalyzer(self.simulation_hub)
# --- Core Logic Handlers ---
self.target_communicator: Optional[CommunicatorInterface] = None
self.lru_communicator: Optional[CommunicatorInterface] = None
self.scenario = Scenario()
self.current_scenario_name: Optional[str] = None
self.sfp_debug_window: Optional[SfpDebugWindow] = None
self.analysis_window: Optional[AnalysisWindow] = None
# --- Simulation Engine ---
self.simulation_engine: Optional[SimulationEngine] = None
self.gui_update_queue = Queue()
self.is_simulation_running = tk.BooleanVar(value=False)
self.time_multiplier = 1.0
self.update_time = tk.DoubleVar(value=1.0)
# Simulation progress tracking
self.total_sim_time = 0.0
self.sim_elapsed_time = 0.0
self.sim_slider_var = tk.DoubleVar(value=0.0)
self._slider_is_dragging = False
# --- Window and UI Setup ---
self.title("Radar Target Simulator")
self.geometry(settings.get("geometry", "1200x1024"))
self.minsize(1024, 768)
self._create_menubar()
self._create_main_layout()
self._create_statusbar()
# --- Post-UI Initialization ---
self._initialize_communicators()
self._load_scenarios_into_ui()
last_scenario = settings.get("last_selected_scenario")
if last_scenario and last_scenario in self.config_manager.get_scenario_names():
self._on_load_scenario(last_scenario)
self._update_window_title()
self.protocol("WM_DELETE_WINDOW", self._on_closing)
self.logger.info("MainView initialized successfully.")
# Always poll the GUI update queue so the main PPI receives real-time
# hub updates even when the live simulation engine is not running.
try:
self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_gui_queue)
except Exception:
self.logger.exception("Failed to schedule GUI queue polling")
def _create_main_layout(self):
v_pane = ttk.PanedWindow(self, orient=tk.VERTICAL)
v_pane.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.h_pane = ttk.PanedWindow(v_pane, orient=tk.HORIZONTAL)
v_pane.add(self.h_pane, weight=4)
# --- Right Pane (connection panel + PPI) ---
right_container = ttk.Frame(self.h_pane)
self.h_pane.add(right_container, weight=2)
# Connection panel sits above the PPI on the right side and shows
# current connection parameters and a centralized Connect/Disconnect button.
conn_panel = ttk.LabelFrame(right_container, text="Connection")
conn_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2))
# Display current connection summary (type / brief params)
self.conn_type_var = tk.StringVar(value=self.connection_config.get("target", {}).get("type", "-"))
self.conn_info_var = tk.StringVar(value=self._format_connection_summary(self.connection_config.get("target", {})))
ttk.Label(conn_panel, text="Type:").pack(side=tk.LEFT, padx=(6, 2))
ttk.Label(conn_panel, textvariable=self.conn_type_var, width=10).pack(side=tk.LEFT)
ttk.Label(conn_panel, textvariable=self.conn_info_var).pack(side=tk.LEFT, padx=(8, 4))
# Connect / Disconnect button centralised here
self.connect_button = ttk.Button(conn_panel, text="Connect", command=self._on_connect_button)
self.connect_button.pack(side=tk.RIGHT, padx=(4, 6))
# Open settings quick button
self.conn_settings_button = ttk.Button(conn_panel, text="Settings...", command=self._open_settings)
self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0))
# Now the PPI widget below the connection panel
self.ppi_widget = PPIDisplay(
right_container, max_range_nm=self.max_range, scan_limit_deg=self.scan_limit
)
self.ppi_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=2, pady=(2, 5))
# Reflect initial connection state (likely disconnected)
try:
if hasattr(self.ppi_widget, "update_connect_state"):
self.ppi_widget.update_connect_state(False)
except Exception:
pass
# --- Left Pane ---
left_pane_container = ttk.Frame(self.h_pane)
self.h_pane.add(left_pane_container, weight=1)
left_notebook = ttk.Notebook(left_pane_container)
left_notebook.pack(fill=tk.BOTH, expand=True)
# --- TAB 1: SCENARIO CONFIG ---
scenario_tab = ttk.Frame(left_notebook)
left_notebook.add(scenario_tab, text="Scenario Config")
self.scenario_controls = ScenarioControlsFrame(
scenario_tab,
main_view=self,
load_scenario_command=self._on_load_scenario,
save_as_command=self._on_save_scenario_as,
delete_command=self._on_delete_scenario,
new_scenario_command=self._on_new_scenario,
)
self.scenario_controls.pack(fill=tk.X, expand=False, padx=5, pady=(5, 5))
self.target_list = TargetListFrame(
scenario_tab, targets_changed_callback=self._on_targets_changed
)
self.target_list.pack(fill=tk.BOTH, expand=True, padx=5)
# --- TAB 2: SIMULATION ---
simulation_tab = ttk.Frame(left_notebook)
left_notebook.add(simulation_tab, text="Simulation")
sim_scenario_frame = ttk.LabelFrame(simulation_tab, text="Scenario Control")
sim_scenario_frame.pack(fill=tk.X, padx=5, pady=5, anchor="n")
ttk.Label(sim_scenario_frame, text="Scenario:").pack(
side=tk.LEFT, padx=(5, 5), pady=5
)
self.sim_scenario_combobox = ttk.Combobox(
sim_scenario_frame,
textvariable=self.scenario_controls.current_scenario, # Share the variable
state="readonly",
)
self.sim_scenario_combobox.pack(side=tk.LEFT, expand=True, fill=tk.X, pady=5)
self.sim_scenario_combobox.bind(
"<<ComboboxSelected>>",
lambda event: self._on_load_scenario(self.sim_scenario_combobox.get()),
)
engine_frame = ttk.LabelFrame(simulation_tab, text="Live Simulation Engine")
engine_frame.pack(fill=tk.X, padx=5, pady=10, anchor="n")
# Use grid within engine_frame for a tidy multi-row layout that
# doesn't force the window to expand horizontally and keeps the PPI
# area visible. Configure columns so the middle spacer expands.
for i in range(10):
engine_frame.grid_columnconfigure(i, weight=0)
# Give the spacer column (3) and the main left column (0) flexible weight
engine_frame.grid_columnconfigure(0, weight=0)
engine_frame.grid_columnconfigure(3, weight=1)
self.start_button = ttk.Button(
engine_frame, text="Start Live", command=self._on_start_simulation
)
self.start_button.grid(row=0, column=0, sticky="w", padx=5, pady=5)
self.stop_button = ttk.Button(
engine_frame,
text="Stop Live",
command=self._on_stop_simulation,
state=tk.DISABLED,
)
self.stop_button.grid(row=0, column=1, sticky="w", padx=5, pady=5)
self.analysis_button = ttk.Button(
engine_frame,
text="Show Analysis",
command=self._open_analysis_window,
state=tk.DISABLED,
)
self.analysis_button.grid(row=0, column=2, sticky="w", padx=5, pady=5)
# spacer to push the following controls to the right
spacer = ttk.Frame(engine_frame)
spacer.grid(row=0, column=3, sticky="ew")
ttk.Label(engine_frame, text="Speed:").grid(row=0, column=4, sticky="e", padx=(10, 2), pady=5)
self.time_multiplier_var = tk.StringVar(value="1x")
self.multiplier_combo = ttk.Combobox(
engine_frame,
textvariable=self.time_multiplier_var,
values=["1x", "2x", "4x", "10x", "20x"],
state="readonly",
width=4,
)
self.multiplier_combo.grid(row=0, column=5, sticky="w", padx=(0, 5), pady=5)
self.multiplier_combo.bind(
"<<ComboboxSelected>>", self._on_time_multiplier_changed
)
ttk.Label(engine_frame, text="Update Time (s):").grid(row=0, column=6, sticky="e", padx=(10, 2), pady=5)
self.update_time_entry = ttk.Entry(
engine_frame, textvariable=self.update_time, width=5
)
self.update_time_entry.grid(row=0, column=7, sticky="w", padx=(0, 5), pady=5)
self.reset_button = ttk.Button(
engine_frame, text="Reset State", command=self._on_reset_simulation
)
self.reset_button.grid(row=0, column=8, sticky="e", padx=5, pady=5)
self.reset_radar_button = ttk.Button(
engine_frame, text="Reset Radar", command=self._reset_radar_state
)
self.reset_radar_button.grid(row=0, column=9, sticky="e", padx=5, pady=5)
# --- Simulation progress bar / slider ---
# Place the progress frame on its own row below the control buttons
progress_frame = ttk.Frame(engine_frame)
# Place the progress frame on a dedicated grid row below the controls
progress_frame.grid(row=1, column=0, columnspan=10, sticky="ew", padx=5, pady=(6, 2))
self.sim_slider = ttk.Scale(
progress_frame,
orient=tk.HORIZONTAL,
variable=self.sim_slider_var,
from_=0.0,
to=1.0,
command=lambda v: None,
# let grid manage length via sticky and column weights
)
# configure progress_frame grid so slider expands and labels stay compact
progress_frame.grid_columnconfigure(0, weight=1)
progress_frame.grid_columnconfigure(1, weight=0)
self.sim_slider.grid(row=0, column=0, sticky="ew", padx=(4, 8))
# Bind press/release to support seeking
try:
self.sim_slider.bind("<ButtonPress-1>", lambda e: setattr(self, '_slider_is_dragging', True))
self.sim_slider.bind("<ButtonRelease-1>", lambda e: (setattr(self, '_slider_is_dragging', False), self._on_seek()))
except Exception:
pass
# Time labels showing elapsed and total separately at the end of the bar
labels_frame = ttk.Frame(progress_frame)
labels_frame.grid(row=0, column=1, sticky="e", padx=(4, 4))
self.sim_elapsed_label = ttk.Label(labels_frame, text="0.0s", width=8, anchor=tk.E)
self.sim_elapsed_label.grid(row=0, column=0)
slash_label = ttk.Label(labels_frame, text="/")
slash_label.grid(row=0, column=1, padx=(2, 2))
self.sim_total_label = ttk.Label(labels_frame, text="0.0s", width=8, anchor=tk.W)
self.sim_total_label.grid(row=0, column=2)
# --- TAB 3: LRU SIMULATION ---
lru_tab = ttk.Frame(left_notebook)
left_notebook.add(lru_tab, text="LRU Simulation")
cooling_frame = ttk.LabelFrame(lru_tab, text="Cooling Unit Status")
cooling_frame.pack(fill=tk.X, padx=5, pady=5, anchor="n")
ttk.Label(cooling_frame, text="Status:").pack(side=tk.LEFT, padx=5, pady=5)
self.cooling_status_var = tk.StringVar(value="OK")
cooling_combo = ttk.Combobox(
cooling_frame,
textvariable=self.cooling_status_var,
values=["OK", "OVERHEATING", "FAULT"],
state="readonly",
)
cooling_combo.pack(side=tk.LEFT, expand=True, fill=tk.X, padx=5, pady=5)
power_frame = ttk.LabelFrame(lru_tab, text="Power Supply Unit Status")
power_frame.pack(fill=tk.X, padx=5, pady=5, anchor="n")
ttk.Label(power_frame, text="Status:").pack(side=tk.LEFT, padx=5, pady=5)
self.power_status_var = tk.StringVar(value="OK")
power_combo = ttk.Combobox(
power_frame,
textvariable=self.power_status_var,
values=["OK", "LOW_VOLTAGE", "FAULT"],
state="readonly",
)
power_combo.pack(side=tk.LEFT, expand=True, fill=tk.X, padx=5, pady=5)
lru_action_frame = ttk.Frame(lru_tab)
lru_action_frame.pack(fill=tk.X, padx=5, pady=10, anchor="n")
send_lru_button = ttk.Button(
lru_action_frame, text="Send LRU Status", command=self._on_send_lru_status
)
send_lru_button.pack(side=tk.RIGHT)
# --- Bottom Pane (Logs) ---
log_frame_container = ttk.LabelFrame(v_pane, text="Logs")
v_pane.add(log_frame_container, weight=1)
self.log_text_widget = scrolledtext.ScrolledText(
log_frame_container, state=tk.DISABLED, wrap=tk.WORD, font=("Consolas", 9)
)
self.log_text_widget.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
def _create_menubar(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
settings_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Settings", menu=settings_menu)
settings_menu.add_command(label="Connection...", command=self._open_settings)
settings_menu.add_command(
label="Radar Config...", command=self._open_radar_config
)
debug_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Debug", menu=debug_menu)
debug_menu.add_command(
label="SFP Packet Inspector...", command=self._open_sfp_debug_window
)
def _create_statusbar(self):
status_bar = ttk.Frame(self, relief=tk.SUNKEN)
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
ttk.Label(status_bar, text="Target:").pack(side=tk.LEFT, padx=(5, 2))
self.target_status_canvas = tk.Canvas(
status_bar, width=16, height=16, highlightthickness=0
)
self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.target_status_canvas, "#e74c3c")
ttk.Label(status_bar, text="LRU:").pack(side=tk.LEFT, padx=(5, 2))
self.lru_status_canvas = tk.Canvas(
status_bar, width=16, height=16, highlightthickness=0
)
self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 10))
self._draw_status_indicator(self.lru_status_canvas, "#e74c3c")
self.status_var = tk.StringVar(value="Ready")
ttk.Label(status_bar, textvariable=self.status_var, anchor=tk.W).pack(
side=tk.LEFT, fill=tk.X, expand=True, padx=5
)
def _draw_status_indicator(self, canvas, color):
canvas.delete("all")
canvas.create_oval(2, 2, 14, 14, fill=color, outline="black")
def _update_window_title(self):
"""Updates the window title based on the current scenario."""
base_title = "Radar Target Simulator"
if self.current_scenario_name:
self.title(f"{base_title} - {self.current_scenario_name}")
else:
self.title(base_title)
def _format_connection_summary(self, cfg: dict) -> str:
"""Return a short human-readable summary of the target connection config.
This is used by the connection panel to show relevant info without
opening the full settings dialog.
"""
try:
t = cfg.get("type")
if not t:
return "-"
if t == "sfp":
sfp = cfg.get("sfp", {})
ip = sfp.get("ip") or sfp.get("host") or "?"
# Support both single port and multiple ports (list/tuple)
remote = sfp.get("port") or sfp.get("remote_port")
if isinstance(remote, (list, tuple)):
remote_str = ",".join(str(int(p)) for p in remote)
else:
try:
remote_str = str(int(remote)) if remote is not None else "?"
except Exception:
remote_str = str(remote)
local = sfp.get("local_port")
if local is not None:
try:
local_str = str(int(local))
except Exception:
local_str = str(local)
return f"{ip} (remote:{remote_str} local:{local_str})"
return f"{ip} (remote:{remote_str})"
if t == "serial":
s = cfg.get("serial", {})
port = s.get("port") or s.get("device") or "?"
baud = s.get("baudrate") or s.get("baud") or "?"
return f"{port} @{baud}"
if t == "tftp":
tftp = cfg.get("tftp", {})
host = tftp.get("host") or tftp.get("server") or "?"
return f"{host}"
return "-"
except Exception:
return "-"
def _update_communicator_status(self, comm_name: str, is_connected: bool):
canvas = (
self.target_status_canvas
if comm_name == "Target"
else self.lru_status_canvas
)
color = "#2ecc40" if is_connected else "#e74c3c"
self._draw_status_indicator(canvas, color)
def _on_connection_state_change(self, is_connected: bool):
"""Callback for communicator connection state changes."""
self.logger.info(f"MainView received connection state change: Connected={is_connected}")
self._update_communicator_status("Target", is_connected)
# Update the PPI's internal state (visual only) and the debug window
try:
if hasattr(self.ppi_widget, 'update_connect_state'):
self.ppi_widget.update_connect_state(is_connected)
except Exception:
pass
# Update centralized connect button text and status indicator
try:
if hasattr(self, 'connect_button') and self.connect_button:
self.connect_button.config(text="Disconnect" if is_connected else "Connect")
except Exception:
pass
# Also update the debug window if it's open
if self.sfp_debug_window and self.sfp_debug_window.winfo_exists():
if hasattr(self.sfp_debug_window, 'update_toggle_state'):
self.sfp_debug_window.update_toggle_state(is_connected)
def _initialize_communicators(self):
# Disconnect any existing connections
if self.target_communicator and self.target_communicator.is_open:
self.target_communicator.disconnect()
if self.lru_communicator and self.lru_communicator.is_open:
self.lru_communicator.disconnect()
target_cfg = self.connection_config.get("target", {})
lru_cfg = self.connection_config.get("lru", {})
# Initialize Target Communicator
self.target_communicator, target_connected = self._setup_communicator(
target_cfg, "Target"
)
self._update_communicator_status("Target", target_connected)
# Initialize LRU Communicator
self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU")
self._update_communicator_status("LRU", lru_connected)
def _setup_communicator(
self, config: dict, name: str
) -> tuple[Optional[CommunicatorInterface], bool]:
comm_type = config.get("type")
self.logger.info(f"Initializing {name} communicator of type: {comm_type}")
communicator = None
config_data = None
if comm_type == "serial":
communicator = SerialCommunicator()
config_data = config.get("serial", {})
elif comm_type == "tftp":
communicator = TFTPCommunicator()
config_data = config.get("tftp", {})
elif comm_type == "sfp":
# --- MODIFICATION: Pass the hub and GUI update queue to the communicator ---
communicator = SFPCommunicator(simulation_hub=self.simulation_hub, update_queue=self.gui_update_queue)
communicator.add_connection_state_callback(self._on_connection_state_change)
config_data = config.get("sfp", {})
if self.defer_sfp_connection:
# Return the communicator object but indicate it's not yet connected
return communicator, False
if communicator and config_data:
if communicator.connect(config_data):
return communicator, True
self.logger.warning(f"Failed to initialize or connect {name} communicator.")
return None, False
def update_connection_settings(self, new_config: Dict[str, Any]):
self.logger.info(f"Updating connection settings: {new_config}")
self.connection_config = new_config
self.config_manager.save_connection_settings(new_config)
# Refresh connection summary in the connection panel
try:
if hasattr(self, 'conn_type_var'):
self.conn_type_var.set(self.connection_config.get("target", {}).get("type", "-"))
if hasattr(self, 'conn_info_var'):
self.conn_info_var.set(self._format_connection_summary(self.connection_config.get("target", {})))
except Exception:
pass
self._initialize_communicators()
def _open_settings(self):
self.logger.info("Opening connection settings window.")
ConnectionSettingsWindow(self, self.config_manager, self.connection_config)
def _on_connect_button(self):
self.logger.info("Connection toggle requested by user via PPI button.")
try:
# If communicator exists and is open, disconnect.
if self.target_communicator and self.target_communicator.is_open:
self.logger.info("Requesting disconnect.")
self.target_communicator.disconnect()
return
# Otherwise, attempt to connect.
self.logger.info("Requesting connect.")
# Ensure we have a communicator instance.
if not self.target_communicator:
self.logger.info("No target communicator instance. Initializing communicators.")
self._initialize_communicators()
# If it's still null after init, we can't proceed.
if not self.target_communicator:
self.logger.error("Failed to create target communicator on demand.")
messagebox.showerror("Error", "Could not create communicator.")
return
# Now, connect using the existing or new instance.
cfg = self.connection_config.get("target", {})
sfp_cfg = cfg.get("sfp")
if cfg.get("type") == "sfp" and sfp_cfg:
if not self.target_communicator.connect(sfp_cfg):
self.logger.error("Failed to connect target communicator.")
messagebox.showerror("Connection Failed", "Could not connect to target. Check settings and logs.")
else:
self.logger.warning("Connection attempt without valid SFP config. Running full re-initialization.")
self._initialize_communicators()
except Exception:
self.logger.exception("Unhandled exception in _on_connect_button")
def _reset_radar_state(self) -> bool:
"""
Sends commands to the radar to deactivate all possible targets, effectively
clearing its state before a new simulation starts.
Returns:
True if the reset commands were sent successfully, False otherwise.
"""
if not self.target_communicator or not self.target_communicator.is_open:
self.logger.error("Cannot reset radar state: communicator is not connected.")
messagebox.showerror("Connection Error", "Cannot reset radar: Not Connected.")
return False
self.logger.info("Sending reset commands to deactivate all radar targets...")
# Prefer an atomic reset command to deactivate all targets on the server
# instead of sending many individual tgtinit commands which is slow and
# can cause dropped messages. The server understands 'tgtset /-s' which
# clears all targets atomically.
# Build the atomic reset command. Use command_builder.tgtset if available,
# otherwise build the raw command string.
try:
# Some command_builder implementations may provide build_tgtset; use it if present
if hasattr(command_builder, "build_tgtset"):
reset_command = command_builder.build_tgtset("/-s")
else:
# Fallback: raw command string
reset_command = "tgtset /-s"
except Exception:
# In case command_builder raises for unexpected inputs, fallback to raw
self.logger.exception("Error while building atomic reset command; falling back to raw string.")
reset_command = "tgtset /-s"
# Some radar servers require adjusting internal parameters to accept
# large multi-target operations. Send a preparatory command to set
# the server t_rows parameter before issuing the atomic tgtset reset.
prep_command = "$mex.t_rows=80"
commands_to_send = [prep_command, reset_command]
# Send the preparatory command followed by the atomic reset using the
# communicator's send_commands API which accepts a list of commands.
if not self.target_communicator.send_commands(commands_to_send):
self.logger.error("Failed to send preparatory/reset commands to the radar.")
messagebox.showerror("Reset Error", "Failed to send reset command to the radar.")
return False
self.logger.info("Successfully sent preparatory and atomic reset commands: %s", commands_to_send)
# Poll the simulation hub for up to a short timeout to ensure the server
# processed the reset and returned no active targets.
timeout_s = 3.0
poll_interval = 0.2
waited = 0.0
while waited < timeout_s:
# If there are no real target entries in the hub, assume reset succeeded
if not self.simulation_hub.get_all_target_ids():
self.logger.info("Radar reported zero active targets after reset.")
return True
time.sleep(poll_interval)
waited += poll_interval
# If we reach here, the hub still reports targets — treat as failure
self.logger.error("Radar did not clear targets after reset within timeout.")
messagebox.showerror("Reset Error", "Radar did not clear targets after reset.")
return False
def _on_start_simulation(self):
if self.is_simulation_running.get():
self.logger.info("Simulation is already running.")
return
# If communicator exists but connection was deferred, try to connect now
if self.target_communicator and not self.target_communicator.is_open:
try:
sfp_cfg = self.connection_config.get("target", {}).get("sfp", {})
if sfp_cfg:
self.logger.info("Attempting deferred/auto connect for target communicator before starting simulation.")
if not self.target_communicator.connect(sfp_cfg):
messagebox.showerror(
"Not Connected",
"Target communicator is not connected. Please check settings and connect.",
)
return
except Exception:
self.logger.exception("Exception while attempting auto-connect for target communicator.")
messagebox.showerror(
"Not Connected",
"Target communicator is not connected. Please check settings and connect.",
)
return
if not self.target_communicator or not self.target_communicator.is_open:
messagebox.showerror(
"Not Connected",
"Target communicator is not connected. Please check settings and connect.",
)
return
if not self.scenario or not self.scenario.get_all_targets():
messagebox.showinfo(
"Empty Scenario", "Cannot start simulation with an empty scenario."
)
return
try:
update_interval = self.update_time.get()
if update_interval <= 0:
messagebox.showwarning(
"Invalid Input", "Update time must be a positive number."
)
return
except tk.TclError:
messagebox.showwarning(
"Invalid Input", "Update time must be a valid number."
)
return
# Reset data hub and PPI trails before starting
self.logger.info("Resetting simulation data hub and PPI trails.")
self.simulation_hub.reset()
self.ppi_widget.clear_trails()
# If SFP reception was deferred, establish SFP connection now so we
# can receive server state updates during simulation.
if hasattr(self.target_communicator, "connect") and not self.target_communicator.is_open:
try:
# Try to connect with stored config; if connect fails, we abort
sfp_cfg = self.connection_config.get("target", {}).get("sfp", {})
if sfp_cfg:
self.logger.info("Deferred SFP connect: attempting to connect for simulation start.")
if not self.target_communicator.connect(sfp_cfg):
self.logger.error("Failed to connect SFP communicator at simulation start.")
messagebox.showerror("Connection Error", "Failed to establish SFP receive connection.")
return
except Exception:
self.logger.exception("Exception while attempting deferred SFP connect.")
messagebox.showerror("Connection Error", "Exception while establishing SFP receive connection.")
return
if not self._reset_radar_state():
self.logger.error("Aborting simulation start due to radar reset failure.")
return
self.logger.info(
"Sending initial scenario state before starting live updates..."
)
if not self.target_communicator.send_scenario(self.scenario):
self.logger.error(
"Failed to send initial scenario state. Aborting live simulation start."
)
messagebox.showerror(
"Send Error",
"Failed to send the initial scenario configuration. Cannot start live simulation.",
)
return
self.logger.info("Initial scenario state sent successfully.")
self.logger.info("Starting live simulation...")
self.is_simulation_running.set(True)
self._update_button_states()
self.scenario.reset_simulation()
# --- MODIFICATION HERE ---
self.simulation_engine = SimulationEngine(
communicator=self.target_communicator,
update_queue=self.gui_update_queue,
simulation_hub=self.simulation_hub, # Pass the hub to the engine
)
# --- END MODIFICATION ---
self.simulation_engine.set_time_multiplier(self.time_multiplier)
self.simulation_engine.set_update_interval(update_interval)
self.simulation_engine.load_scenario(self.scenario)
# Initialize simulation progress tracking
try:
durations = [getattr(t, '_total_duration_s', 0.0) for t in self.scenario.get_all_targets()]
self.total_sim_time = max(durations) if durations else 0.0
except Exception:
self.total_sim_time = 0.0
# Reset slider and label
self.sim_elapsed_time = 0.0
try:
self.sim_slider_var.set(0.0)
except Exception:
pass
self._update_simulation_progress_display()
self.simulation_engine.start()
self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_gui_queue)
def _on_stop_simulation(self):
if not self.is_simulation_running.get() or not self.simulation_engine:
return
self.logger.info("Stopping live simulation...")
self.simulation_engine.stop()
self.simulation_engine = None
# Also disconnect the target communicator (SFP) so we stop receiving from server
try:
if self.target_communicator and getattr(self.target_communicator, "is_open", False):
self.logger.info("Disconnecting target communicator (SFP) after simulation stop.")
try:
self.target_communicator.disconnect()
except Exception:
self.logger.exception("Error while disconnecting target communicator.")
# Update visual status
self._update_communicator_status("Target", False)
try:
self.ppi_widget.update_connect_state(False)
except Exception:
pass
except Exception:
self.logger.exception("Unexpected error while attempting to disconnect target communicator.")
self.is_simulation_running.set(False)
self._update_button_states()
def _on_reset_simulation(self):
self.logger.info("Resetting scenario to initial state.")
if self.is_simulation_running.get():
self._on_stop_simulation()
self.scenario.reset_simulation()
self._update_all_views()
def _process_gui_queue(self):
try:
# We process one update at a time to keep the GUI responsive
update = self.gui_update_queue.get_nowait()
try:
self.logger.debug(f"MainView: dequeued GUI update (type={type(update)}) from queue id={id(self.gui_update_queue)}")
except Exception:
pass
if update == "SIMULATION_FINISHED":
self.logger.info("Simulation finished signal received.")
# Ensure engine is stopped and UI reset
self._on_stop_simulation()
# Reset progress UI to final state
try:
self.sim_elapsed_time = self.total_sim_time
self.sim_slider_var.set(1.0 if self.total_sim_time > 0 else 0.0)
except Exception:
pass
self._update_simulation_progress_display()
elif isinstance(update, list):
# The engine normally enqueues a List[Target] (simulated targets).
# However, the simulation payload handler uses an empty list []
# as a lightweight notification that real states were added to
# the hub. Distinguish the two cases:
if len(update) == 0:
# Empty-list used as a hub refresh notification. Do not
# clear the target list; just rebuild the PPI from the hub.
try:
self.logger.debug("MainView: received hub refresh notification from GUI queue.")
except Exception:
pass
display_data = self._build_display_data_from_hub()
try:
self.ppi_widget.update_targets(display_data)
except Exception:
self.logger.exception("Failed to update PPI widget from hub display data")
else:
# This update is the list of simulated targets from the engine
simulated_targets: List[Target] = update
# Update the target list view with detailed simulated data
self.target_list.update_target_list(simulated_targets)
# For the PPI, build the comparative data structure from the hub
display_data = self._build_display_data_from_hub()
self.ppi_widget.update_targets(display_data)
# Update progress using target times from scenario
try:
# Use the engine's scenario simulated time as elapsed if available
if self.simulation_engine and self.simulation_engine.scenario:
# Derive elapsed as the max of target sim times
times = [getattr(t, '_sim_time_s', 0.0) for t in self.simulation_engine.scenario.get_all_targets()]
self.sim_elapsed_time = max(times) if times else 0.0
else:
self.sim_elapsed_time += 0.0
# Update slider only if user is not interacting with it
if self.total_sim_time > 0 and not getattr(self, '_slider_is_dragging', False):
progress_frac = min(1.0, max(0.0, self.sim_elapsed_time / self.total_sim_time))
self.sim_slider_var.set(progress_frac)
self._update_simulation_progress_display()
except Exception:
# Do not allow progress UI failures to interrupt GUI updates
self.logger.debug("Progress UI update failed", exc_info=True)
except Empty:
# If the queue is empty, we don't need to do anything
pass
finally:
# Always continue polling the GUI update queue so we can show
# real-time server updates on the PPI even when the live
# simulation engine is not running.
try:
self.after(GUI_QUEUE_POLL_INTERVAL_MS, self._process_gui_queue)
except Exception:
pass
def _update_button_states(self):
is_running = self.is_simulation_running.get()
has_data_to_analyze = self.simulation_hub.get_all_target_ids() if self.simulation_hub else False
analysis_state = tk.NORMAL if is_running or has_data_to_analyze else tk.DISABLED
state = tk.DISABLED if is_running else tk.NORMAL
self.reset_radar_button.config(state=state)
self.start_button.config(state=tk.DISABLED if is_running else tk.NORMAL)
self.stop_button.config(state=tk.NORMAL if is_running else tk.DISABLED)
self.analysis_button.config(state=analysis_state)
self.multiplier_combo.config(
state="readonly" if not is_running else tk.DISABLED
)
self.scenario_controls.new_button.config(state=state)
self.scenario_controls.save_button.config(state=state)
self.scenario_controls.save_as_button.config(state=state)
self.scenario_controls.delete_button.config(state=state)
self.scenario_controls.scenario_combobox.config(
state="readonly" if not is_running else tk.DISABLED
)
self.target_list.add_button.config(state=state)
self.target_list.remove_button.config(state=state)
self.target_list.edit_button.config(state=state)
self.target_list.tree.config(selectmode="browse" if not is_running else "none")
def _on_time_multiplier_changed(self, event=None):
"""Handles changes to the time multiplier selection."""
try:
multiplier_str = self.time_multiplier_var.get().replace("x", "")
self.time_multiplier = float(multiplier_str)
if self.simulation_engine and self.simulation_engine.is_running():
self.simulation_engine.set_time_multiplier(self.time_multiplier)
except ValueError:
self.logger.error(
f"Invalid time multiplier value: {self.time_multiplier_var.get()}"
)
self.time_multiplier = 1.0
def _update_simulation_progress_display(self):
"""Updates the elapsed/total time label from internal state."""
try:
elapsed = self.sim_elapsed_time
total = self.total_sim_time
# Update separate labels for elapsed and total time
try:
self.sim_elapsed_label.config(text=f"{elapsed:.1f}s")
self.sim_total_label.config(text=f"{total:.1f}s")
except Exception:
# Fallback for older layouts
if hasattr(self, 'sim_time_label'):
self.sim_time_label.config(text=f"{elapsed:.1f}s / {total:.1f}s")
except Exception:
pass
def _on_seek(self):
"""Called when the user releases the progress slider to seek."""
try:
if not self.simulation_engine or not self.simulation_engine.scenario:
return
frac = float(self.sim_slider_var.get())
# Compute the new time and clamp
new_time = max(0.0, min(self.total_sim_time, frac * self.total_sim_time))
# Ask engine to seek to this new time
try:
self.simulation_engine.set_simulation_time(new_time)
# Immediately update internal elapsed time and label
self.sim_elapsed_time = new_time
self._update_simulation_progress_display()
except Exception:
self.logger.exception("Failed to seek simulation time.")
except Exception:
self.logger.exception("Error in _on_seek handler.")
def _on_targets_changed(self, targets: List[Target]):
"""Callback executed when the target list is modified by the user."""
# 1. Update the internal scenario object
self.scenario.targets = {t.target_id: t for t in targets}
# 2. Update the PPI display with the latest target list
self.ppi_widget.update_targets(targets)
# 3. Automatically save the changes to the current scenario file
if self.current_scenario_name:
self.logger.info(
f"Targets changed for scenario '{self.current_scenario_name}'. Saving changes."
)
self.config_manager.save_scenario(
self.current_scenario_name, self.scenario.to_dict()
)
else:
self.logger.warning(
"Targets changed, but no scenario is currently loaded. Changes are not saved."
)
def _update_all_views(self, targets_to_display: Optional[List[Target]] = None):
self._update_window_title()
if targets_to_display is None:
targets_to_display = self.scenario.get_all_targets()
self.target_list.update_target_list(targets_to_display)
self.ppi_widget.update_targets(targets_to_display)
def _load_scenarios_into_ui(self):
scenario_names = self.config_manager.get_scenario_names()
self.scenario_controls.update_scenario_list(
scenario_names, self.current_scenario_name
)
self.sim_scenario_combobox["values"] = scenario_names
def _on_load_scenario(self, scenario_name: str):
if self.is_simulation_running.get():
self._on_stop_simulation()
self.logger.info(f"Loading scenario: {scenario_name}")
scenario_data = self.config_manager.get_scenario(scenario_name)
if scenario_data:
try:
self.scenario = Scenario.from_dict(scenario_data)
self.current_scenario_name = scenario_name
# Update target list UI with loaded scenario's targets
self.target_list.update_target_list(self.scenario.get_all_targets())
self._update_all_views()
self.sim_scenario_combobox.set(scenario_name)
except Exception as e:
self.logger.error(
f"Failed to parse scenario data for '{scenario_name}': {e}",
exc_info=True,
)
messagebox.showerror(
"Load Error", f"Could not load scenario '{scenario_name}'.\n{e}"
)
else:
self.logger.warning(
f"Attempted to load a non-existent scenario: {scenario_name}"
)
def _on_save_scenario(self, scenario_name: str):
self.logger.info(f"Saving scenario: {scenario_name}")
self.scenario.targets = {t.target_id: t for t in self.target_list.get_targets()}
self.config_manager.save_scenario(scenario_name, self.scenario.to_dict())
self.current_scenario_name = scenario_name
self._load_scenarios_into_ui()
self._update_window_title()
messagebox.showinfo(
"Success", f"Scenario '{scenario_name}' saved successfully.", parent=self
)
def _on_save_scenario_as(self, scenario_name: str):
self.scenario.targets = {t.target_id: t for t in self.target_list.get_targets()}
self._on_save_scenario(scenario_name)
def _on_new_scenario(self, scenario_name: str):
scenario_names = self.config_manager.get_scenario_names()
if scenario_name in scenario_names:
messagebox.showinfo(
"Duplicate Scenario",
f"Scenario '{scenario_name}' already exists. Loading it instead.",
parent=self,
)
self._on_load_scenario(scenario_name)
return
self.logger.info(f"Creating new scenario: {scenario_name}")
self.scenario = Scenario(name=scenario_name)
self.current_scenario_name = scenario_name
# Save the new empty scenario immediately so it's persisted
self.config_manager.save_scenario(scenario_name, self.scenario.to_dict())
# Update all UI elements
self._update_all_views() # Clears targets, updates title
self._load_scenarios_into_ui() # Reloads scenario lists in comboboxes
# After reloading, explicitly set the current selection to the new scenario
self.scenario_controls.current_scenario.set(scenario_name)
self.sim_scenario_combobox.set(scenario_name)
def _on_delete_scenario(self, scenario_name: str):
self.logger.info(f"Deleting scenario: {scenario_name}")
self.config_manager.delete_scenario(scenario_name)
if self.current_scenario_name == scenario_name:
self.current_scenario_name = None
self.scenario = Scenario()
self._update_all_views()
self._load_scenarios_into_ui()
def _on_send_lru_status(self):
# Implementation from your code
pass
def _on_send_initial_state(self):
"""Sends the full scenario using tgtinit for initial setup."""
if self.target_communicator and self.target_communicator.is_open:
if self.scenario and self.scenario.get_all_targets():
# reset_simulation() assicura che i dati inviati siano quelli iniziali
self.scenario.reset_simulation()
self.target_communicator.send_scenario(self.scenario)
messagebox.showinfo(
"Scenario Sent",
"Initial state of the scenario sent to the radar.",
parent=self,
)
else:
messagebox.showinfo(
"Empty Scenario", "Cannot send an empty scenario.", parent=self
)
else:
messagebox.showerror(
"Not Connected",
"Target communicator is not connected. Please check settings.",
parent=self,
)
def _on_reset_targets(self):
# Implementation from your code
pass
def _open_radar_config(self):
self.logger.info("Opening radar config window.")
dialog = RadarConfigWindow(
self, current_scan_limit=self.scan_limit, current_max_range=self.max_range
)
# wait_window è già gestito all'interno di RadarConfigWindow,
# quindi il codice prosegue solo dopo la sua chiusura.
if dialog.scan_limit is not None and dialog.max_range is not None:
# Check if values have actually changed to avoid unnecessary redraws
if (
self.scan_limit != dialog.scan_limit
or self.max_range != dialog.max_range
):
self.logger.info("Radar configuration changed. Applying new settings.")
self.scan_limit = dialog.scan_limit
self.max_range = dialog.max_range
# --- LOGICA MODIFICATA ---
# Non distruggere il widget, ma riconfiguralo.
self.ppi_widget.reconfigure_radar(
max_range_nm=self.max_range, scan_limit_deg=self.scan_limit
)
self.logger.info(f"Scan limit set to: ±{self.scan_limit} degrees")
self.logger.info(f"Max range set to: {self.max_range} NM")
# Non è necessario chiamare _update_all_views() perché
# reconfigure_radar forza già un ridisegno completo.
else:
self.logger.info(
"Radar configuration confirmed, but no changes were made."
)
def _on_closing(self):
self.logger.info("Application shutting down.")
if self.is_simulation_running.get():
self._on_stop_simulation()
settings_to_save = {
"scan_limit": self.scan_limit,
"max_range": self.max_range,
"geometry": self.winfo_geometry(),
"last_selected_scenario": self.current_scenario_name,
}
self.config_manager.save_general_settings(settings_to_save)
self.config_manager.save_connection_settings(self.connection_config)
if self.target_communicator:
if hasattr(self.target_communicator, 'remove_connection_state_callback'):
self.target_communicator.remove_connection_state_callback(self._on_connection_state_change)
if self.target_communicator.is_open:
self.target_communicator.disconnect()
if self.lru_communicator and self.lru_communicator.is_open:
self.lru_communicator.disconnect()
shutdown_logging_system()
self.destroy()
def _open_sfp_debug_window(self):
"""Opens the SFP debug window, ensuring only one instance exists."""
if self.sfp_debug_window and self.sfp_debug_window.winfo_exists():
self.sfp_debug_window.lift()
self.sfp_debug_window.focus_force()
self.logger.info("SFP Packet Inspector window is already open.")
return
self.logger.info("Opening SFP Packet Inspector window...")
self.sfp_debug_window = SfpDebugWindow(self)
def _on_reset_simulation(self):
self.logger.info("Resetting scenario to initial state.")
if self.is_simulation_running.get():
self._on_stop_simulation()
self.logger.info("Resetting simulation data hub and PPI trails.")
self.simulation_hub.reset()
self.ppi_widget.clear_trails()
self.scenario.reset_simulation()
self._update_all_views()
def _build_display_data_from_hub(self) -> Dict[str, List[Target]]:
"""
Builds the data structure for the PPIDisplay by fetching the latest
simulated and real states from the SimulationStateHub.
"""
simulated_targets_for_ppi = []
real_targets_for_ppi = []
if not self.simulation_hub:
return {"simulated": [], "real": []}
target_ids = self.simulation_hub.get_all_target_ids()
for tid in target_ids:
history = self.simulation_hub.get_target_history(tid)
if not history:
continue
# --- Process Simulated Data ---
if history["simulated"]:
last_sim_state = history["simulated"][-1]
_ts, x_ft, y_ft, z_ft = last_sim_state # Hub now stores feet directly
sim_target = Target(target_id=tid, trajectory=[]) # Lightweight object
# Manually set internal cartesian coords and update polar
setattr(sim_target, '_pos_x_ft', x_ft)
setattr(sim_target, '_pos_y_ft', y_ft)
setattr(sim_target, '_pos_z_ft', z_ft)
sim_target._update_current_polar_coords()
sim_target.active = True
simulated_targets_for_ppi.append(sim_target)
# --- Process Real Data ---
if history["real"]:
last_real_state = history["real"][-1]
_ts, x_ft, y_ft, z_ft = last_real_state # Hub now stores feet directly
real_target = Target(target_id=tid, trajectory=[]) # Lightweight object
setattr(real_target, '_pos_x_ft', x_ft)
setattr(real_target, '_pos_y_ft', y_ft)
setattr(real_target, '_pos_z_ft', z_ft)
real_target._update_current_polar_coords()
real_target.active = True
real_targets_for_ppi.append(real_target)
try:
self.logger.debug(
"PPIDisplay will receive simulated=%d real=%d targets from hub",
len(simulated_targets_for_ppi),
len(real_targets_for_ppi),
)
except Exception:
pass
return {"simulated": simulated_targets_for_ppi, "real": real_targets_for_ppi}
def _open_analysis_window(self):
"""Opens the performance analysis window, ensuring only one instance exists."""
if self.analysis_window and self.analysis_window.winfo_exists():
self.analysis_window.lift()
self.analysis_window.focus_force()
self.logger.info("Analysis window is already open.")
return
self.logger.info("Opening performance analysis window...")
self.analysis_window = AnalysisWindow(
self, analyzer=self.performance_analyzer, hub=self.simulation_hub
)