diff --git a/target_simulator/communication/communicator_manager.py b/target_simulator/communication/communicator_manager.py new file mode 100644 index 0000000..434fcb2 --- /dev/null +++ b/target_simulator/communication/communicator_manager.py @@ -0,0 +1,142 @@ +from typing import Optional, Dict, Any, Callable, List, Tuple +from target_simulator.core.sfp_communicator import SFPCommunicator +from target_simulator.core.serial_communicator import SerialCommunicator +from target_simulator.core.tftp_communicator import TFTPCommunicator + + +class CommunicatorManager: + """Encapsulates creation, initialization and basic lifecycle of communicators. + + This is a small, testable façade that mirrors the behavior previously + implemented directly inside MainView. It intentionally keeps a thin API + so MainView can migrate to it without changing external behavior. + """ + + def __init__(self, simulation_hub, logger=None, defer_sfp_connection: bool = True): + self.simulation_hub = simulation_hub + self.logger = logger + self.defer_sfp_connection = defer_sfp_connection + + self.config: Dict[str, Any] = {} + + self.target_communicator = None + self.lru_communicator = None + + # Callbacks that want to be notified about connection state changes + self._connection_state_callbacks: List[Callable[[bool], None]] = [] + + def set_config(self, config: Dict[str, Any]): + self.config = config or {} + + def initialize_communicators(self) -> Tuple[Optional[object], bool, Optional[object], bool]: + """Create and (optionally) connect communicator instances. + + Returns: (target_comm, target_connected, lru_comm, lru_connected) + """ + # Disconnect any existing connections + try: + if self.target_communicator and getattr(self.target_communicator, "is_open", False): + self.target_communicator.disconnect() + except Exception: + if self.logger: + self.logger.exception("Error disconnecting existing target communicator") + + try: + if self.lru_communicator and getattr(self.lru_communicator, "is_open", False): + self.lru_communicator.disconnect() + except Exception: + if self.logger: + self.logger.exception("Error disconnecting existing LRU communicator") + + target_cfg = (self.config or {}).get("target", {}) + lru_cfg = (self.config or {}).get("lru", {}) + + # Initialize Target Communicator + self.target_communicator, target_connected = self._setup_communicator(target_cfg, "Target") + + # Initialize LRU Communicator + self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU") + + return self.target_communicator, bool(target_connected), self.lru_communicator, bool(lru_connected) + + def _setup_communicator(self, config: Dict[str, Any], name: str): + comm_type = (config or {}).get("type") + if self.logger: + self.logger.info(f"Initializing {name} communicator of type: {comm_type}") + + communicator = None + config_data = None + + try: + if comm_type == "serial": + communicator = SerialCommunicator() + config_data = config.get("serial", {}) + elif comm_type == "tftp": + communicator = TFTPCommunicator() + config_data = config.get("tftp", {}) + elif comm_type == "sfp": + communicator = SFPCommunicator(simulation_hub=self.simulation_hub) + # Add bridge callback if present + if hasattr(communicator, "add_connection_state_callback"): + communicator.add_connection_state_callback(self._notify_connection_state) + config_data = config.get("sfp", {}) + if self.defer_sfp_connection: + # Return the communicator unconnected + return communicator, False + + if communicator and config_data: + if communicator.connect(config_data): + return communicator, True + except Exception: + if self.logger: + self.logger.exception(f"Failed to initialize or connect {name} communicator.") + + if self.logger: + self.logger.warning(f"Failed to initialize or connect {name} communicator.") + return None, False + + def connect_target(self) -> bool: + """Attempt to connect the target communicator using current config.""" + try: + if not self.target_communicator: + # Try to create and connect + self.initialize_communicators() + + cfg = (self.config or {}).get("target", {}) + sfp_cfg = cfg.get("sfp") if cfg else None + if cfg.get("type") == "sfp" and sfp_cfg and self.target_communicator: + return bool(self.target_communicator.connect(sfp_cfg)) + # For other connector types, re-run initialization which will connect + self.initialize_communicators() + return bool(self.target_communicator and getattr(self.target_communicator, "is_open", False)) + except Exception: + if self.logger: + self.logger.exception("Unhandled exception in connect_target") + return False + + def disconnect_target(self): + try: + if self.target_communicator and getattr(self.target_communicator, "is_open", False): + self.target_communicator.disconnect() + except Exception: + if self.logger: + self.logger.exception("Error disconnecting target communicator") + + def add_connection_state_callback(self, cb: Callable[[bool], None]): + if cb not in self._connection_state_callbacks: + self._connection_state_callbacks.append(cb) + + def remove_connection_state_callback(self, cb: Callable[[bool], None]): + try: + if cb in self._connection_state_callbacks: + self._connection_state_callbacks.remove(cb) + except Exception: + pass + + def _notify_connection_state(self, is_connected: bool): + for cb in list(self._connection_state_callbacks): + try: + cb(is_connected) + except Exception: + if self.logger: + self.logger.exception("Connection state callback failed") diff --git a/target_simulator/gui/connection_panel.py b/target_simulator/gui/connection_panel.py new file mode 100644 index 0000000..d26d973 --- /dev/null +++ b/target_simulator/gui/connection_panel.py @@ -0,0 +1,103 @@ +import tkinter as tk +from tkinter import ttk +from typing import Callable, Optional, Dict, Any + + +class ConnectionPanel(ttk.LabelFrame): + """Reusable connection panel used by MainView. + + This encapsulates the small UI that shows connection type/summary and + exposes simple hooks so the outer view can attach handlers for Connect + and Settings. + """ + + def __init__(self, parent, initial_config: Optional[Dict[str, Any]] = None): + super().__init__(parent, text="Connection") + + self._on_connect: Optional[Callable[[], None]] = None + self._on_open_settings: Optional[Callable[[], None]] = None + + self.conn_type_var = tk.StringVar(value=(initial_config or {}).get("target", {}).get("type", "-")) + self.conn_info_var = tk.StringVar(value=self._format_summary((initial_config or {}).get("target", {}))) + + ttk.Label(self, text="Type:").pack(side=tk.LEFT, padx=(6, 2)) + ttk.Label(self, textvariable=self.conn_type_var, width=10).pack(side=tk.LEFT) + ttk.Label(self, textvariable=self.conn_info_var).pack(side=tk.LEFT, padx=(8, 4)) + + # Buttons + self.connect_button = ttk.Button(self, text="Connect", command=self._handle_connect) + self.connect_button.pack(side=tk.RIGHT, padx=(4, 6)) + + self.conn_settings_button = ttk.Button(self, text="Settings...", command=self._handle_open_settings) + self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0)) + + def _format_summary(self, cfg: Dict[str, Any]) -> str: + try: + t = cfg.get("type") + if not t: + return "-" + if t == "sfp": + sfp = cfg.get("sfp", {}) + ip = sfp.get("ip") or sfp.get("host") or "?" + remote = sfp.get("port") or sfp.get("remote_port") + if isinstance(remote, (list, tuple)): + remote_str = ",".join(str(int(p)) for p in remote) + else: + try: + remote_str = str(int(remote)) if remote is not None else "?" + except Exception: + remote_str = str(remote) + local = sfp.get("local_port") + if local is not None: + try: + local_str = str(int(local)) + except Exception: + local_str = str(local) + return f"{ip} (remote:{remote_str} local:{local_str})" + return f"{ip} (remote:{remote_str})" + if t == "serial": + s = cfg.get("serial", {}) + port = s.get("port") or s.get("device") or "?" + baud = s.get("baudrate") or s.get("baud") or "?" + return f"{port} @{baud}" + if t == "tftp": + tftp = cfg.get("tftp", {}) + host = tftp.get("host") or tftp.get("server") or "?" + return f"{host}" + return "-" + except Exception: + return "-" + + def set_connect_handler(self, cb: Callable[[], None]): + self._on_connect = cb + + def set_open_settings_handler(self, cb: Callable[[], None]): + self._on_open_settings = cb + + def _handle_connect(self): + if self._on_connect: + try: + self._on_connect() + except Exception: + # Handler should manage its own errors + raise + + def _handle_open_settings(self): + if self._on_open_settings: + try: + self._on_open_settings() + except Exception: + raise + + def update_summary(self, conn_config: Dict[str, Any]): + try: + self.conn_type_var.set((conn_config or {}).get("target", {}).get("type", "-")) + self.conn_info_var.set(self._format_summary((conn_config or {}).get("target", {}))) + except Exception: + pass + + def update_toggle_state(self, is_connected: bool): + try: + self.connect_button.config(text="Disconnect" if is_connected else "Connect") + except Exception: + pass diff --git a/target_simulator/gui/main_view.py b/target_simulator/gui/main_view.py index 5a086d2..65bcce4 100644 --- a/target_simulator/gui/main_view.py +++ b/target_simulator/gui/main_view.py @@ -19,6 +19,7 @@ from target_simulator.gui.connection_settings_window import ConnectionSettingsWi from target_simulator.gui.radar_config_window import RadarConfigWindow from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame from target_simulator.gui.target_list_frame import TargetListFrame +from target_simulator.gui.connection_panel import ConnectionPanel from target_simulator.core.communicator_interface import CommunicatorInterface from target_simulator.core.serial_communicator import SerialCommunicator @@ -36,6 +37,7 @@ from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer from target_simulator.gui.analysis_window import AnalysisWindow from target_simulator.core import command_builder from target_simulator.analysis.simulation_archive import SimulationArchive +from target_simulator.communication.communicator_manager import CommunicatorManager # --- Import Version Info FOR THE WRAPPER ITSELF --- try: @@ -84,6 +86,15 @@ class MainView(tk.Tk): # --- Initialize the data hub and analyzer --- self.simulation_hub = SimulationStateHub() self.performance_analyzer = PerformanceAnalyzer(self.simulation_hub) + # Communicator manager handles communicator lifecycle + self.communicator_manager = CommunicatorManager( + simulation_hub=self.simulation_hub, logger=self.logger, defer_sfp_connection=True + ) + # Apply loaded connection settings to the manager + try: + self.communicator_manager.set_config(self.connection_config) + except Exception: + pass # --- Core Logic Handlers --- self.target_communicator: Optional[CommunicatorInterface] = None @@ -149,38 +160,34 @@ class MainView(tk.Tk): # Connection panel sits above the PPI on the right side and shows # current connection parameters and a centralized Connect/Disconnect button. - conn_panel = ttk.LabelFrame(right_container, text="Connection") - conn_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) - - # Display current connection summary (type / brief params) - self.conn_type_var = tk.StringVar( - value=self.connection_config.get("target", {}).get("type", "-") - ) - self.conn_info_var = tk.StringVar( - value=self._format_connection_summary( - self.connection_config.get("target", {}) + try: + self.connection_panel = ConnectionPanel(right_container, initial_config=self.connection_config) + self.connection_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) + # Wire handlers back to MainView + try: + self.connection_panel.set_connect_handler(self._on_connect_button) + self.connection_panel.set_open_settings_handler(self._open_settings) + except Exception: + pass + except Exception: + # Fallback to inline panel if the new component fails for any reason + conn_panel = ttk.LabelFrame(right_container, text="Connection") + conn_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) + self.conn_type_var = tk.StringVar( + value=self.connection_config.get("target", {}).get("type", "-") ) - ) - - ttk.Label(conn_panel, text="Type:").pack(side=tk.LEFT, padx=(6, 2)) - ttk.Label(conn_panel, textvariable=self.conn_type_var, width=10).pack( - side=tk.LEFT - ) - ttk.Label(conn_panel, textvariable=self.conn_info_var).pack( - side=tk.LEFT, padx=(8, 4) - ) - - # Connect / Disconnect button centralised here - self.connect_button = ttk.Button( - conn_panel, text="Connect", command=self._on_connect_button - ) - self.connect_button.pack(side=tk.RIGHT, padx=(4, 6)) - - # Open settings quick button - self.conn_settings_button = ttk.Button( - conn_panel, text="Settings...", command=self._open_settings - ) - self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0)) + self.conn_info_var = tk.StringVar( + value=self._format_connection_summary( + self.connection_config.get("target", {}) + ) + ) + ttk.Label(conn_panel, text="Type:").pack(side=tk.LEFT, padx=(6, 2)) + ttk.Label(conn_panel, textvariable=self.conn_type_var, width=10).pack(side=tk.LEFT) + ttk.Label(conn_panel, textvariable=self.conn_info_var).pack(side=tk.LEFT, padx=(8, 4)) + self.connect_button = ttk.Button(conn_panel, text="Connect", command=self._on_connect_button) + self.connect_button.pack(side=tk.RIGHT, padx=(4, 6)) + self.conn_settings_button = ttk.Button(conn_panel, text="Settings...", command=self._open_settings) + self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0)) # Now the PPI widget below the connection panel self.ppi_widget = PPIDisplay( @@ -715,7 +722,13 @@ class MainView(tk.Tk): # Update centralized connect button text and status indicator try: - if hasattr(self, "connect_button") and self.connect_button: + # Prefer the new ConnectionPanel if present + if hasattr(self, "connection_panel") and self.connection_panel: + try: + self.connection_panel.update_toggle_state(is_connected) + except Exception: + pass + elif hasattr(self, "connect_button") and self.connect_button: self.connect_button.config( text="Disconnect" if is_connected else "Connect" ) @@ -728,24 +741,41 @@ class MainView(tk.Tk): self.sfp_debug_window.update_toggle_state(is_connected) def _initialize_communicators(self): - # Disconnect any existing connections - if self.target_communicator and self.target_communicator.is_open: - self.target_communicator.disconnect() - if self.lru_communicator and self.lru_communicator.is_open: - self.lru_communicator.disconnect() + # Delegate communicator lifecycle to CommunicatorManager + try: + # Ensure manager knows the latest config + try: + self.communicator_manager.set_config(self.connection_config) + except Exception: + pass - target_cfg = self.connection_config.get("target", {}) - lru_cfg = self.connection_config.get("lru", {}) + t_comm, t_connected, l_comm, l_connected = self.communicator_manager.initialize_communicators() + self.target_communicator = t_comm + self.lru_communicator = l_comm + self._update_communicator_status("Target", bool(t_connected)) + self._update_communicator_status("LRU", bool(l_connected)) + # Ensure the manager notifies through MainView callback when state changes + try: + self.communicator_manager.add_connection_state_callback(self._on_connection_state_change) + except Exception: + pass + except Exception: + # Fallback to original inline initialization in case of problems + try: + if self.target_communicator and self.target_communicator.is_open: + self.target_communicator.disconnect() + if self.lru_communicator and self.lru_communicator.is_open: + self.lru_communicator.disconnect() - # Initialize Target Communicator - self.target_communicator, target_connected = self._setup_communicator( - target_cfg, "Target" - ) - self._update_communicator_status("Target", target_connected) + target_cfg = self.connection_config.get("target", {}) + lru_cfg = self.connection_config.get("lru", {}) - # Initialize LRU Communicator - self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU") - self._update_communicator_status("LRU", lru_connected) + self.target_communicator, target_connected = self._setup_communicator(target_cfg, "Target") + self._update_communicator_status("Target", target_connected) + self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU") + self._update_communicator_status("LRU", lru_connected) + except Exception: + self.logger.exception("Fallback communicator initialization failed") def _setup_communicator( self, config: dict, name: str @@ -796,7 +826,26 @@ class MainView(tk.Tk): ) except Exception: pass - self._initialize_communicators() + # Update the connection panel UI if present + try: + if hasattr(self, "connection_panel") and self.connection_panel: + try: + self.connection_panel.update_summary(self.connection_config) + except Exception: + pass + except Exception: + pass + + # Update the communicator manager and reinitialize + try: + self.communicator_manager.set_config(new_config) + self._initialize_communicators() + except Exception: + # Fallback: call the previous initialization path + try: + self._initialize_communicators() + except Exception: + pass def _open_settings(self): self.logger.info("Opening connection settings window.")