159 lines
5.9 KiB
Python
159 lines
5.9 KiB
Python
# target_simulator/gui/external_profiler_window.py
|
||
import tkinter as tk
|
||
from tkinter import ttk, messagebox
|
||
from target_simulator.gui.external_profiler import ExternalProfiler
|
||
from target_simulator.core.sfp_communicator import SFPCommunicator
|
||
|
||
|
||
class ExternalProfilerWindow(tk.Toplevel):
|
||
"""
|
||
Finestra di dialogo per controllare lo strumento di profiling esterno.
|
||
"""
|
||
|
||
def __init__(self, master):
|
||
super().__init__(master)
|
||
self.title("External Profiling Tool")
|
||
self.geometry("500x450")
|
||
self.transient(master)
|
||
self.grab_set()
|
||
|
||
self.profiler = ExternalProfiler()
|
||
self.router = None
|
||
|
||
# Ottieni il router dal communicator SFP principale
|
||
communicator = getattr(master, "target_communicator", None)
|
||
if isinstance(communicator, SFPCommunicator):
|
||
self.router = communicator.router()
|
||
|
||
self._create_widgets()
|
||
self.protocol("WM_DELETE_WINDOW", self._on_close)
|
||
|
||
def _create_widgets(self):
|
||
main_frame = ttk.Frame(self, padding=10)
|
||
main_frame.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# Info panel
|
||
info_frame = ttk.LabelFrame(main_frame, text="ℹ️ Information", padding=10)
|
||
info_frame.pack(fill="x", pady=(0, 10))
|
||
|
||
info_text = (
|
||
"This tool broadcasts RIS packets to specified UDP ports for network latency analysis.\n\n"
|
||
"• Port A: Packets sent when received from server (start of processing)\n"
|
||
"• Port B: Packets sent after processing (end of processing)\n\n"
|
||
"To capture packets, use Wireshark with filter:\n"
|
||
" udp.port == <PortA> || udp.port == <PortB>\n\n"
|
||
"Enable Broadcast when the server is on a different machine."
|
||
)
|
||
info_label = ttk.Label(
|
||
info_frame, text=info_text, justify="left", wraplength=460
|
||
)
|
||
info_label.pack(anchor="w")
|
||
|
||
# Configuration frame
|
||
config_frame = ttk.LabelFrame(main_frame, text="Configuration", padding=10)
|
||
config_frame.pack(fill="x", pady=(0, 10))
|
||
|
||
ttk.Label(config_frame, text="Port A (Start of Processing):").pack(anchor="w")
|
||
self.port_a_var = tk.StringVar(value="55001")
|
||
self.port_a_entry = ttk.Entry(config_frame, textvariable=self.port_a_var)
|
||
self.port_a_entry.pack(fill="x", pady=(0, 10))
|
||
|
||
ttk.Label(config_frame, text="Port B (End of Processing):").pack(anchor="w")
|
||
self.port_b_var = tk.StringVar(value="55002")
|
||
self.port_b_entry = ttk.Entry(config_frame, textvariable=self.port_b_var)
|
||
self.port_b_entry.pack(fill="x", pady=(0, 10))
|
||
|
||
# Broadcast checkbox
|
||
self.broadcast_var = tk.BooleanVar(value=False)
|
||
self.broadcast_check = ttk.Checkbutton(
|
||
config_frame,
|
||
text="Enable Broadcast (for remote servers)",
|
||
variable=self.broadcast_var,
|
||
)
|
||
self.broadcast_check.pack(anchor="w", pady=(5, 0))
|
||
|
||
broadcast_hint = ttk.Label(
|
||
config_frame,
|
||
text="⚠ Localhost (127.0.0.1) if disabled, Broadcast (255.255.255.255) if enabled",
|
||
font=("TkDefaultFont", 8),
|
||
foreground="gray",
|
||
)
|
||
broadcast_hint.pack(anchor="w", padx=(20, 0))
|
||
|
||
# Control frame
|
||
control_frame = ttk.Frame(main_frame)
|
||
control_frame.pack(fill="x", pady=(10, 0))
|
||
|
||
self.start_stop_button = ttk.Button(
|
||
control_frame, text="Start Profiling", command=self._toggle_profiling
|
||
)
|
||
self.start_stop_button.pack(pady=(0, 10))
|
||
|
||
self.status_var = tk.StringVar(value="Status: Inactive")
|
||
status_label = ttk.Label(
|
||
control_frame,
|
||
textvariable=self.status_var,
|
||
font=("TkDefaultFont", 9, "bold"),
|
||
)
|
||
status_label.pack()
|
||
|
||
def _toggle_profiling(self):
|
||
if self.profiler.is_active:
|
||
self._stop_profiling()
|
||
else:
|
||
self._start_profiling()
|
||
|
||
def _start_profiling(self):
|
||
if not self.router:
|
||
messagebox.showerror(
|
||
"Error", "SFP communicator's router not found.", parent=self
|
||
)
|
||
return
|
||
|
||
try:
|
||
port_a = int(self.port_a_var.get())
|
||
port_b = int(self.port_b_var.get())
|
||
if port_a == port_b:
|
||
raise ValueError("Ports must be different.")
|
||
except ValueError as e:
|
||
messagebox.showerror(
|
||
"Invalid Input",
|
||
f"Please enter valid and different port numbers.\n{e}",
|
||
parent=self,
|
||
)
|
||
return
|
||
|
||
use_broadcast = self.broadcast_var.get()
|
||
|
||
if self.profiler.start(port_a, port_b, use_broadcast):
|
||
# Registra i callback sul router
|
||
self.router.add_raw_packet_listener(self.profiler.forward_to_port_a)
|
||
self.router.add_post_processing_listener(self.profiler.forward_to_port_b)
|
||
|
||
target = "broadcast" if use_broadcast else "localhost"
|
||
self.status_var.set(
|
||
f"Status: Active - Sending to {target} ports {port_a}/{port_b}"
|
||
)
|
||
self.start_stop_button.config(text="Stop Profiling")
|
||
self.port_a_entry.config(state="disabled")
|
||
self.port_b_entry.config(state="disabled")
|
||
self.broadcast_check.config(state="disabled")
|
||
|
||
def _stop_profiling(self):
|
||
self.profiler.stop()
|
||
if self.router:
|
||
# Deregistra i callback per pulizia
|
||
self.router.remove_raw_packet_listener(self.profiler.forward_to_port_a)
|
||
self.router.remove_post_processing_listener(self.profiler.forward_to_port_b)
|
||
|
||
self.status_var.set("Status: Inactive")
|
||
self.start_stop_button.config(text="Start Profiling")
|
||
self.port_a_entry.config(state="normal")
|
||
self.port_b_entry.config(state="normal")
|
||
self.broadcast_check.config(state="normal")
|
||
|
||
def _on_close(self):
|
||
if self.profiler.is_active:
|
||
self._stop_profiling()
|
||
self.destroy()
|