# target_simulator/gui/external_profiler_window.py import tkinter as tk from tkinter import ttk, messagebox from target_simulator.gui.external_profiler import ExternalProfiler from target_simulator.core.sfp_communicator import SFPCommunicator class ExternalProfilerWindow(tk.Toplevel): """ Finestra di dialogo per controllare lo strumento di profiling esterno. """ def __init__(self, master): super().__init__(master) self.title("External Profiling Tool") self.geometry("500x450") self.transient(master) self.grab_set() self.profiler = ExternalProfiler() self.router = None # Ottieni il router dal communicator SFP principale communicator = getattr(master, "target_communicator", None) if isinstance(communicator, SFPCommunicator): self.router = communicator.router() self._create_widgets() self.protocol("WM_DELETE_WINDOW", self._on_close) def _create_widgets(self): main_frame = ttk.Frame(self, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # Info panel info_frame = ttk.LabelFrame(main_frame, text="ℹ️ Information", padding=10) info_frame.pack(fill='x', pady=(0, 10)) info_text = ( "This tool broadcasts RIS packets to specified UDP ports for network latency analysis.\n\n" "• Port A: Packets sent when received from server (start of processing)\n" "• Port B: Packets sent after processing (end of processing)\n\n" "To capture packets, use Wireshark with filter:\n" " udp.port == || udp.port == \n\n" "Enable Broadcast when the server is on a different machine." ) info_label = ttk.Label(info_frame, text=info_text, justify='left', wraplength=460) info_label.pack(anchor='w') # Configuration frame config_frame = ttk.LabelFrame(main_frame, text="Configuration", padding=10) config_frame.pack(fill='x', pady=(0, 10)) ttk.Label(config_frame, text="Port A (Start of Processing):").pack(anchor='w') self.port_a_var = tk.StringVar(value="55001") self.port_a_entry = ttk.Entry(config_frame, textvariable=self.port_a_var) self.port_a_entry.pack(fill='x', pady=(0, 10)) ttk.Label(config_frame, text="Port B (End of Processing):").pack(anchor='w') self.port_b_var = tk.StringVar(value="55002") self.port_b_entry = ttk.Entry(config_frame, textvariable=self.port_b_var) self.port_b_entry.pack(fill='x', pady=(0, 10)) # Broadcast checkbox self.broadcast_var = tk.BooleanVar(value=False) self.broadcast_check = ttk.Checkbutton( config_frame, text="Enable Broadcast (for remote servers)", variable=self.broadcast_var ) self.broadcast_check.pack(anchor='w', pady=(5, 0)) broadcast_hint = ttk.Label( config_frame, text="⚠ Localhost (127.0.0.1) if disabled, Broadcast (255.255.255.255) if enabled", font=('TkDefaultFont', 8), foreground='gray' ) broadcast_hint.pack(anchor='w', padx=(20, 0)) # Control frame control_frame = ttk.Frame(main_frame) control_frame.pack(fill='x', pady=(10, 0)) self.start_stop_button = ttk.Button(control_frame, text="Start Profiling", command=self._toggle_profiling) self.start_stop_button.pack(pady=(0, 10)) self.status_var = tk.StringVar(value="Status: Inactive") status_label = ttk.Label(control_frame, textvariable=self.status_var, font=('TkDefaultFont', 9, 'bold')) status_label.pack() def _toggle_profiling(self): if self.profiler.is_active: self._stop_profiling() else: self._start_profiling() def _start_profiling(self): if not self.router: messagebox.showerror("Error", "SFP communicator's router not found.", parent=self) return try: port_a = int(self.port_a_var.get()) port_b = int(self.port_b_var.get()) if port_a == port_b: raise ValueError("Ports must be different.") except ValueError as e: messagebox.showerror("Invalid Input", f"Please enter valid and different port numbers.\n{e}", parent=self) return use_broadcast = self.broadcast_var.get() if self.profiler.start(port_a, port_b, use_broadcast): # Registra i callback sul router self.router.add_raw_packet_listener(self.profiler.forward_to_port_a) self.router.add_post_processing_listener(self.profiler.forward_to_port_b) target = "broadcast" if use_broadcast else "localhost" self.status_var.set(f"Status: Active - Sending to {target} ports {port_a}/{port_b}") self.start_stop_button.config(text="Stop Profiling") self.port_a_entry.config(state='disabled') self.port_b_entry.config(state='disabled') self.broadcast_check.config(state='disabled') def _stop_profiling(self): self.profiler.stop() if self.router: # Deregistra i callback per pulizia self.router.remove_raw_packet_listener(self.profiler.forward_to_port_a) self.router.remove_post_processing_listener(self.profiler.forward_to_port_b) self.status_var.set("Status: Inactive") self.start_stop_button.config(text="Start Profiling") self.port_a_entry.config(state='normal') self.port_b_entry.config(state='normal') self.broadcast_check.config(state='normal') def _on_close(self): if self.profiler.is_active: self._stop_profiling() self.destroy()