from typing import Optional, Dict, Any, Callable, List, Tuple from target_simulator.core.sfp_communicator import SFPCommunicator from target_simulator.core.serial_communicator import SerialCommunicator from target_simulator.core.tftp_communicator import TFTPCommunicator class CommunicatorManager: """Encapsulates creation, initialization and basic lifecycle of communicators. This is a small, testable façade that mirrors the behavior previously implemented directly inside MainView. It intentionally keeps a thin API so MainView can migrate to it without changing external behavior. """ def __init__(self, simulation_hub, logger=None, defer_sfp_connection: bool = True): self.simulation_hub = simulation_hub self.logger = logger self.defer_sfp_connection = defer_sfp_connection self.config: Dict[str, Any] = {} self.target_communicator = None self.lru_communicator = None # Callbacks that want to be notified about connection state changes self._connection_state_callbacks: List[Callable[[bool], None]] = [] def set_config(self, config: Dict[str, Any]): self.config = config or {} def initialize_communicators(self) -> Tuple[Optional[object], bool, Optional[object], bool]: """Create and (optionally) connect communicator instances. Returns: (target_comm, target_connected, lru_comm, lru_connected) """ # Disconnect any existing connections try: if self.target_communicator and getattr(self.target_communicator, "is_open", False): self.target_communicator.disconnect() except Exception: if self.logger: self.logger.exception("Error disconnecting existing target communicator") try: if self.lru_communicator and getattr(self.lru_communicator, "is_open", False): self.lru_communicator.disconnect() except Exception: if self.logger: self.logger.exception("Error disconnecting existing LRU communicator") target_cfg = (self.config or {}).get("target", {}) lru_cfg = (self.config or {}).get("lru", {}) # Initialize Target Communicator self.target_communicator, target_connected = self._setup_communicator(target_cfg, "Target") # Initialize LRU Communicator self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU") return self.target_communicator, bool(target_connected), self.lru_communicator, bool(lru_connected) def _setup_communicator(self, config: Dict[str, Any], name: str): comm_type = (config or {}).get("type") if self.logger: self.logger.info(f"Initializing {name} communicator of type: {comm_type}") communicator = None config_data = None try: if comm_type == "serial": communicator = SerialCommunicator() config_data = config.get("serial", {}) elif comm_type == "tftp": communicator = TFTPCommunicator() config_data = config.get("tftp", {}) elif comm_type == "sfp": communicator = SFPCommunicator(simulation_hub=self.simulation_hub) # Add bridge callback if present if hasattr(communicator, "add_connection_state_callback"): communicator.add_connection_state_callback(self._notify_connection_state) config_data = config.get("sfp", {}) if self.defer_sfp_connection: # Return the communicator unconnected return communicator, False if communicator and config_data: if communicator.connect(config_data): return communicator, True except Exception: if self.logger: self.logger.exception(f"Failed to initialize or connect {name} communicator.") if self.logger: self.logger.warning(f"Failed to initialize or connect {name} communicator.") return None, False def connect_target(self) -> bool: """Attempt to connect the target communicator using current config.""" try: if not self.target_communicator: # Try to create and connect self.initialize_communicators() cfg = (self.config or {}).get("target", {}) sfp_cfg = cfg.get("sfp") if cfg else None if cfg.get("type") == "sfp" and sfp_cfg and self.target_communicator: return bool(self.target_communicator.connect(sfp_cfg)) # For other connector types, re-run initialization which will connect self.initialize_communicators() return bool(self.target_communicator and getattr(self.target_communicator, "is_open", False)) except Exception: if self.logger: self.logger.exception("Unhandled exception in connect_target") return False def disconnect_target(self): try: if self.target_communicator and getattr(self.target_communicator, "is_open", False): self.target_communicator.disconnect() except Exception: if self.logger: self.logger.exception("Error disconnecting target communicator") def add_connection_state_callback(self, cb: Callable[[bool], None]): if cb not in self._connection_state_callbacks: self._connection_state_callbacks.append(cb) def remove_connection_state_callback(self, cb: Callable[[bool], None]): try: if cb in self._connection_state_callbacks: self._connection_state_callbacks.remove(cb) except Exception: pass def _notify_connection_state(self, is_connected: bool): for cb in list(self._connection_state_callbacks): try: cb(is_connected) except Exception: if self.logger: self.logger.exception("Connection state callback failed")