step 1 refactoring main_view

This commit is contained in:
VALLONGOL 2025-11-03 13:51:44 +01:00
parent da079c08f9
commit b8a39798bb
3 changed files with 342 additions and 48 deletions

View File

@ -0,0 +1,142 @@
from typing import Optional, Dict, Any, Callable, List, Tuple
from target_simulator.core.sfp_communicator import SFPCommunicator
from target_simulator.core.serial_communicator import SerialCommunicator
from target_simulator.core.tftp_communicator import TFTPCommunicator
class CommunicatorManager:
"""Encapsulates creation, initialization and basic lifecycle of communicators.
This is a small, testable façade that mirrors the behavior previously
implemented directly inside MainView. It intentionally keeps a thin API
so MainView can migrate to it without changing external behavior.
"""
def __init__(self, simulation_hub, logger=None, defer_sfp_connection: bool = True):
self.simulation_hub = simulation_hub
self.logger = logger
self.defer_sfp_connection = defer_sfp_connection
self.config: Dict[str, Any] = {}
self.target_communicator = None
self.lru_communicator = None
# Callbacks that want to be notified about connection state changes
self._connection_state_callbacks: List[Callable[[bool], None]] = []
def set_config(self, config: Dict[str, Any]):
self.config = config or {}
def initialize_communicators(self) -> Tuple[Optional[object], bool, Optional[object], bool]:
"""Create and (optionally) connect communicator instances.
Returns: (target_comm, target_connected, lru_comm, lru_connected)
"""
# Disconnect any existing connections
try:
if self.target_communicator and getattr(self.target_communicator, "is_open", False):
self.target_communicator.disconnect()
except Exception:
if self.logger:
self.logger.exception("Error disconnecting existing target communicator")
try:
if self.lru_communicator and getattr(self.lru_communicator, "is_open", False):
self.lru_communicator.disconnect()
except Exception:
if self.logger:
self.logger.exception("Error disconnecting existing LRU communicator")
target_cfg = (self.config or {}).get("target", {})
lru_cfg = (self.config or {}).get("lru", {})
# Initialize Target Communicator
self.target_communicator, target_connected = self._setup_communicator(target_cfg, "Target")
# Initialize LRU Communicator
self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU")
return self.target_communicator, bool(target_connected), self.lru_communicator, bool(lru_connected)
def _setup_communicator(self, config: Dict[str, Any], name: str):
comm_type = (config or {}).get("type")
if self.logger:
self.logger.info(f"Initializing {name} communicator of type: {comm_type}")
communicator = None
config_data = None
try:
if comm_type == "serial":
communicator = SerialCommunicator()
config_data = config.get("serial", {})
elif comm_type == "tftp":
communicator = TFTPCommunicator()
config_data = config.get("tftp", {})
elif comm_type == "sfp":
communicator = SFPCommunicator(simulation_hub=self.simulation_hub)
# Add bridge callback if present
if hasattr(communicator, "add_connection_state_callback"):
communicator.add_connection_state_callback(self._notify_connection_state)
config_data = config.get("sfp", {})
if self.defer_sfp_connection:
# Return the communicator unconnected
return communicator, False
if communicator and config_data:
if communicator.connect(config_data):
return communicator, True
except Exception:
if self.logger:
self.logger.exception(f"Failed to initialize or connect {name} communicator.")
if self.logger:
self.logger.warning(f"Failed to initialize or connect {name} communicator.")
return None, False
def connect_target(self) -> bool:
"""Attempt to connect the target communicator using current config."""
try:
if not self.target_communicator:
# Try to create and connect
self.initialize_communicators()
cfg = (self.config or {}).get("target", {})
sfp_cfg = cfg.get("sfp") if cfg else None
if cfg.get("type") == "sfp" and sfp_cfg and self.target_communicator:
return bool(self.target_communicator.connect(sfp_cfg))
# For other connector types, re-run initialization which will connect
self.initialize_communicators()
return bool(self.target_communicator and getattr(self.target_communicator, "is_open", False))
except Exception:
if self.logger:
self.logger.exception("Unhandled exception in connect_target")
return False
def disconnect_target(self):
try:
if self.target_communicator and getattr(self.target_communicator, "is_open", False):
self.target_communicator.disconnect()
except Exception:
if self.logger:
self.logger.exception("Error disconnecting target communicator")
def add_connection_state_callback(self, cb: Callable[[bool], None]):
if cb not in self._connection_state_callbacks:
self._connection_state_callbacks.append(cb)
def remove_connection_state_callback(self, cb: Callable[[bool], None]):
try:
if cb in self._connection_state_callbacks:
self._connection_state_callbacks.remove(cb)
except Exception:
pass
def _notify_connection_state(self, is_connected: bool):
for cb in list(self._connection_state_callbacks):
try:
cb(is_connected)
except Exception:
if self.logger:
self.logger.exception("Connection state callback failed")

View File

@ -0,0 +1,103 @@
import tkinter as tk
from tkinter import ttk
from typing import Callable, Optional, Dict, Any
class ConnectionPanel(ttk.LabelFrame):
"""Reusable connection panel used by MainView.
This encapsulates the small UI that shows connection type/summary and
exposes simple hooks so the outer view can attach handlers for Connect
and Settings.
"""
def __init__(self, parent, initial_config: Optional[Dict[str, Any]] = None):
super().__init__(parent, text="Connection")
self._on_connect: Optional[Callable[[], None]] = None
self._on_open_settings: Optional[Callable[[], None]] = None
self.conn_type_var = tk.StringVar(value=(initial_config or {}).get("target", {}).get("type", "-"))
self.conn_info_var = tk.StringVar(value=self._format_summary((initial_config or {}).get("target", {})))
ttk.Label(self, text="Type:").pack(side=tk.LEFT, padx=(6, 2))
ttk.Label(self, textvariable=self.conn_type_var, width=10).pack(side=tk.LEFT)
ttk.Label(self, textvariable=self.conn_info_var).pack(side=tk.LEFT, padx=(8, 4))
# Buttons
self.connect_button = ttk.Button(self, text="Connect", command=self._handle_connect)
self.connect_button.pack(side=tk.RIGHT, padx=(4, 6))
self.conn_settings_button = ttk.Button(self, text="Settings...", command=self._handle_open_settings)
self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0))
def _format_summary(self, cfg: Dict[str, Any]) -> str:
try:
t = cfg.get("type")
if not t:
return "-"
if t == "sfp":
sfp = cfg.get("sfp", {})
ip = sfp.get("ip") or sfp.get("host") or "?"
remote = sfp.get("port") or sfp.get("remote_port")
if isinstance(remote, (list, tuple)):
remote_str = ",".join(str(int(p)) for p in remote)
else:
try:
remote_str = str(int(remote)) if remote is not None else "?"
except Exception:
remote_str = str(remote)
local = sfp.get("local_port")
if local is not None:
try:
local_str = str(int(local))
except Exception:
local_str = str(local)
return f"{ip} (remote:{remote_str} local:{local_str})"
return f"{ip} (remote:{remote_str})"
if t == "serial":
s = cfg.get("serial", {})
port = s.get("port") or s.get("device") or "?"
baud = s.get("baudrate") or s.get("baud") or "?"
return f"{port} @{baud}"
if t == "tftp":
tftp = cfg.get("tftp", {})
host = tftp.get("host") or tftp.get("server") or "?"
return f"{host}"
return "-"
except Exception:
return "-"
def set_connect_handler(self, cb: Callable[[], None]):
self._on_connect = cb
def set_open_settings_handler(self, cb: Callable[[], None]):
self._on_open_settings = cb
def _handle_connect(self):
if self._on_connect:
try:
self._on_connect()
except Exception:
# Handler should manage its own errors
raise
def _handle_open_settings(self):
if self._on_open_settings:
try:
self._on_open_settings()
except Exception:
raise
def update_summary(self, conn_config: Dict[str, Any]):
try:
self.conn_type_var.set((conn_config or {}).get("target", {}).get("type", "-"))
self.conn_info_var.set(self._format_summary((conn_config or {}).get("target", {})))
except Exception:
pass
def update_toggle_state(self, is_connected: bool):
try:
self.connect_button.config(text="Disconnect" if is_connected else "Connect")
except Exception:
pass

View File

@ -19,6 +19,7 @@ from target_simulator.gui.connection_settings_window import ConnectionSettingsWi
from target_simulator.gui.radar_config_window import RadarConfigWindow from target_simulator.gui.radar_config_window import RadarConfigWindow
from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame from target_simulator.gui.scenario_controls_frame import ScenarioControlsFrame
from target_simulator.gui.target_list_frame import TargetListFrame from target_simulator.gui.target_list_frame import TargetListFrame
from target_simulator.gui.connection_panel import ConnectionPanel
from target_simulator.core.communicator_interface import CommunicatorInterface from target_simulator.core.communicator_interface import CommunicatorInterface
from target_simulator.core.serial_communicator import SerialCommunicator from target_simulator.core.serial_communicator import SerialCommunicator
@ -36,6 +37,7 @@ from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
from target_simulator.gui.analysis_window import AnalysisWindow from target_simulator.gui.analysis_window import AnalysisWindow
from target_simulator.core import command_builder from target_simulator.core import command_builder
from target_simulator.analysis.simulation_archive import SimulationArchive from target_simulator.analysis.simulation_archive import SimulationArchive
from target_simulator.communication.communicator_manager import CommunicatorManager
# --- Import Version Info FOR THE WRAPPER ITSELF --- # --- Import Version Info FOR THE WRAPPER ITSELF ---
try: try:
@ -84,6 +86,15 @@ class MainView(tk.Tk):
# --- Initialize the data hub and analyzer --- # --- Initialize the data hub and analyzer ---
self.simulation_hub = SimulationStateHub() self.simulation_hub = SimulationStateHub()
self.performance_analyzer = PerformanceAnalyzer(self.simulation_hub) self.performance_analyzer = PerformanceAnalyzer(self.simulation_hub)
# Communicator manager handles communicator lifecycle
self.communicator_manager = CommunicatorManager(
simulation_hub=self.simulation_hub, logger=self.logger, defer_sfp_connection=True
)
# Apply loaded connection settings to the manager
try:
self.communicator_manager.set_config(self.connection_config)
except Exception:
pass
# --- Core Logic Handlers --- # --- Core Logic Handlers ---
self.target_communicator: Optional[CommunicatorInterface] = None self.target_communicator: Optional[CommunicatorInterface] = None
@ -149,38 +160,34 @@ class MainView(tk.Tk):
# Connection panel sits above the PPI on the right side and shows # Connection panel sits above the PPI on the right side and shows
# current connection parameters and a centralized Connect/Disconnect button. # current connection parameters and a centralized Connect/Disconnect button.
conn_panel = ttk.LabelFrame(right_container, text="Connection") try:
conn_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2)) self.connection_panel = ConnectionPanel(right_container, initial_config=self.connection_config)
self.connection_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2))
# Display current connection summary (type / brief params) # Wire handlers back to MainView
self.conn_type_var = tk.StringVar( try:
value=self.connection_config.get("target", {}).get("type", "-") self.connection_panel.set_connect_handler(self._on_connect_button)
) self.connection_panel.set_open_settings_handler(self._open_settings)
self.conn_info_var = tk.StringVar( except Exception:
value=self._format_connection_summary( pass
self.connection_config.get("target", {}) except Exception:
# Fallback to inline panel if the new component fails for any reason
conn_panel = ttk.LabelFrame(right_container, text="Connection")
conn_panel.pack(side=tk.TOP, fill=tk.X, padx=5, pady=(5, 2))
self.conn_type_var = tk.StringVar(
value=self.connection_config.get("target", {}).get("type", "-")
) )
) self.conn_info_var = tk.StringVar(
value=self._format_connection_summary(
ttk.Label(conn_panel, text="Type:").pack(side=tk.LEFT, padx=(6, 2)) self.connection_config.get("target", {})
ttk.Label(conn_panel, textvariable=self.conn_type_var, width=10).pack( )
side=tk.LEFT )
) ttk.Label(conn_panel, text="Type:").pack(side=tk.LEFT, padx=(6, 2))
ttk.Label(conn_panel, textvariable=self.conn_info_var).pack( ttk.Label(conn_panel, textvariable=self.conn_type_var, width=10).pack(side=tk.LEFT)
side=tk.LEFT, padx=(8, 4) ttk.Label(conn_panel, textvariable=self.conn_info_var).pack(side=tk.LEFT, padx=(8, 4))
) self.connect_button = ttk.Button(conn_panel, text="Connect", command=self._on_connect_button)
self.connect_button.pack(side=tk.RIGHT, padx=(4, 6))
# Connect / Disconnect button centralised here self.conn_settings_button = ttk.Button(conn_panel, text="Settings...", command=self._open_settings)
self.connect_button = ttk.Button( self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0))
conn_panel, text="Connect", command=self._on_connect_button
)
self.connect_button.pack(side=tk.RIGHT, padx=(4, 6))
# Open settings quick button
self.conn_settings_button = ttk.Button(
conn_panel, text="Settings...", command=self._open_settings
)
self.conn_settings_button.pack(side=tk.RIGHT, padx=(4, 0))
# Now the PPI widget below the connection panel # Now the PPI widget below the connection panel
self.ppi_widget = PPIDisplay( self.ppi_widget = PPIDisplay(
@ -715,7 +722,13 @@ class MainView(tk.Tk):
# Update centralized connect button text and status indicator # Update centralized connect button text and status indicator
try: try:
if hasattr(self, "connect_button") and self.connect_button: # Prefer the new ConnectionPanel if present
if hasattr(self, "connection_panel") and self.connection_panel:
try:
self.connection_panel.update_toggle_state(is_connected)
except Exception:
pass
elif hasattr(self, "connect_button") and self.connect_button:
self.connect_button.config( self.connect_button.config(
text="Disconnect" if is_connected else "Connect" text="Disconnect" if is_connected else "Connect"
) )
@ -728,24 +741,41 @@ class MainView(tk.Tk):
self.sfp_debug_window.update_toggle_state(is_connected) self.sfp_debug_window.update_toggle_state(is_connected)
def _initialize_communicators(self): def _initialize_communicators(self):
# Disconnect any existing connections # Delegate communicator lifecycle to CommunicatorManager
if self.target_communicator and self.target_communicator.is_open: try:
self.target_communicator.disconnect() # Ensure manager knows the latest config
if self.lru_communicator and self.lru_communicator.is_open: try:
self.lru_communicator.disconnect() self.communicator_manager.set_config(self.connection_config)
except Exception:
pass
target_cfg = self.connection_config.get("target", {}) t_comm, t_connected, l_comm, l_connected = self.communicator_manager.initialize_communicators()
lru_cfg = self.connection_config.get("lru", {}) self.target_communicator = t_comm
self.lru_communicator = l_comm
self._update_communicator_status("Target", bool(t_connected))
self._update_communicator_status("LRU", bool(l_connected))
# Ensure the manager notifies through MainView callback when state changes
try:
self.communicator_manager.add_connection_state_callback(self._on_connection_state_change)
except Exception:
pass
except Exception:
# Fallback to original inline initialization in case of problems
try:
if self.target_communicator and self.target_communicator.is_open:
self.target_communicator.disconnect()
if self.lru_communicator and self.lru_communicator.is_open:
self.lru_communicator.disconnect()
# Initialize Target Communicator target_cfg = self.connection_config.get("target", {})
self.target_communicator, target_connected = self._setup_communicator( lru_cfg = self.connection_config.get("lru", {})
target_cfg, "Target"
)
self._update_communicator_status("Target", target_connected)
# Initialize LRU Communicator self.target_communicator, target_connected = self._setup_communicator(target_cfg, "Target")
self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU") self._update_communicator_status("Target", target_connected)
self._update_communicator_status("LRU", lru_connected) self.lru_communicator, lru_connected = self._setup_communicator(lru_cfg, "LRU")
self._update_communicator_status("LRU", lru_connected)
except Exception:
self.logger.exception("Fallback communicator initialization failed")
def _setup_communicator( def _setup_communicator(
self, config: dict, name: str self, config: dict, name: str
@ -796,7 +826,26 @@ class MainView(tk.Tk):
) )
except Exception: except Exception:
pass pass
self._initialize_communicators() # Update the connection panel UI if present
try:
if hasattr(self, "connection_panel") and self.connection_panel:
try:
self.connection_panel.update_summary(self.connection_config)
except Exception:
pass
except Exception:
pass
# Update the communicator manager and reinitialize
try:
self.communicator_manager.set_config(new_config)
self._initialize_communicators()
except Exception:
# Fallback: call the previous initialization path
try:
self._initialize_communicators()
except Exception:
pass
def _open_settings(self): def _open_settings(self):
self.logger.info("Opening connection settings window.") self.logger.info("Opening connection settings window.")