""" PyBusMonitor1553 - Main GUI Window Tkinter-based interface for MIL-STD-1553 Bus Monitoring """ import tkinter as tk from tkinter import ttk from tkinter.scrolledtext import ScrolledText import threading import time import queue import sys import os from dataclasses import dataclass, field from typing import Dict, List, Optional, Any from collections import OrderedDict import logging from ..lib1553.fields import Field # Import TkinterLogger from externals submodule sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-tkinter-logger')) try: from tkinter_logger import TkinterLogger except ImportError: TkinterLogger = None # Fallback if module not available @dataclass class MessageStats: """Statistics for a single message type""" name: str cw: str # Command Word formatted string sw: str # Status Word formatted string count: int = 0 errors: int = 0 last_update: float = 0.0 period_ms: float = 0.0 word_count: int = 0 is_transmit: bool = False # True = RT->BC (B-msg), False = BC->RT (A-msg) last_data: Optional[Any] = None raw_words: List[int] = field(default_factory=list) class BusMonitorApp: """Main application window for PyBusMonitor1553""" def __init__(self, root: tk.Tk): self.root = root self.root.title("PyBusMonitor1553 - MIL-STD-1553 Bus Monitor") self.root.geometry("1200x900") # Increased height for log panel self.root.minsize(800, 700) # Message statistics storage self.message_stats: Dict[str, MessageStats] = OrderedDict() self._init_message_stats() # Selected message for detail view self.selected_message: Optional[str] = None # Connection status self.is_connected = False self.tx_count = 0 self.rx_count = 0 self.last_error = "" # Thread-safe queue for message updates (disaccoppiamento GUI/motore) self._update_queue = queue.Queue(maxsize=1000) self._status_queue = queue.Queue(maxsize=100) # GUI update timer (rate limiting a 100ms invece di aggiornamento continuo) self._gui_update_interval_ms = 100 # Logger system self.logger_system: Optional[Any] = None # TkinterLogger instance # Build UI self._create_menu() self._create_main_layout() self._create_log_panel() # NEW: Log panel at bottom self._create_status_bar() # Initialize logging system self._setup_logging() # Update timer self._update_id = None # Avvia il timer periodico per elaborare la coda aggiornamenti self._start_gui_update_timer() # Log startup logging.getLogger(__name__).info("PyBusMonitor1553 GUI initialized") def _init_message_stats(self): """Initialize message statistics for all known message types""" # A-messages (BC -> RT, Receive by RT) a_messages = [ ("A1", 1, False, 10), # Settings ("A2", 2, False, 3), # Operation Command ("A3", 3, False, 32), # Graphics Command ("A4", 4, False, 31), # Navigation Data ("A5", 5, False, 23), # INU/GPS Data ("A6", 6, False, 31), # Weapon Aiming ("A7", 7, False, 31), # Reserved ("A8", 8, False, 30), # Reserved ] # B-messages (RT -> BC, Transmit by RT) b_messages = [ ("B1", 11, True, 29), # Target Report #1 ("B2", 12, True, 27), # Target Report #2 ("B3", 13, True, 27), # Target Report #3 ("B4", 14, True, 27), # Terrain Avoidance ("B5", 15, True, 27), # Reserved ("B6", 16, True, 23), # Settings Tell-Back ("B7", 17, True, 3), # Status Tell-Back ("B8", 18, True, 32), # Reserved ] for name, sa, is_tx, wc in a_messages + b_messages: direction = "T" if is_tx else "R" cw_str = f"20-{direction}-{sa}-{wc}" self.message_stats[name] = MessageStats( name=name, cw=cw_str, sw="0-0-0", word_count=wc, is_transmit=is_tx ) def _create_menu(self): """Create application menu bar""" menubar = tk.Menu(self.root) self.root.config(menu=menubar) # File menu file_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="File", menu=file_menu) file_menu.add_command(label="Connect", command=self._on_connect) file_menu.add_command(label="Disconnect", command=self._on_disconnect) file_menu.add_separator() file_menu.add_command(label="Exit", command=self.root.quit) # View menu view_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="View", menu=view_menu) view_menu.add_command(label="Reset Counters", command=self._reset_counters) # Help menu help_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Help", menu=help_menu) help_menu.add_command(label="About", command=self._show_about) def _create_main_layout(self): """Create the main application layout with notebook tabs""" # Control panel at top self._create_control_panel() # Main container with PanedWindow for resizable split self.main_paned = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL) self.main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Left panel - Message list left_frame = ttk.Frame(self.main_paned) self.main_paned.add(left_frame, weight=1) # Right panel - Notebook with tabs right_frame = ttk.Frame(self.main_paned) self.main_paned.add(right_frame, weight=2) # Create message list (left side) self._create_message_list(left_frame) # Create notebook tabs (right side) self._create_notebook(right_frame) def _create_control_panel(self): """Create the control panel with Run/Stop buttons""" control_frame = ttk.Frame(self.root) control_frame.pack(fill=tk.X, padx=5, pady=(5, 0)) # Run button (green) self.btn_run = tk.Button( control_frame, text="▶ RUN", font=('Helvetica', 10, 'bold'), bg='#28a745', fg='white', activebackground='#218838', activeforeground='white', width=12, height=1, relief=tk.RAISED, command=self._on_run ) self.btn_run.pack(side=tk.LEFT, padx=5) # Stop button (red) self.btn_stop = tk.Button( control_frame, text="■ STOP", font=('Helvetica', 10, 'bold'), bg='#dc3545', fg='white', activebackground='#c82333', activeforeground='white', width=12, height=1, relief=tk.RAISED, command=self._on_stop, state=tk.DISABLED # Initially disabled ) self.btn_stop.pack(side=tk.LEFT, padx=5) # Status label self.lbl_status = ttk.Label( control_frame, text="● Ready - Press RUN to start", font=('Helvetica', 9), foreground='orange' ) self.lbl_status.pack(side=tk.LEFT, padx=20) def _create_message_list(self, parent): """Create the message list TreeView (like GrifoScope)""" # Header header = ttk.Label(parent, text="1553 Bus Messages", font=('Helvetica', 11, 'bold')) header.pack(pady=(0, 5)) # TreeView with scrollbar tree_frame = ttk.Frame(parent) tree_frame.pack(fill=tk.BOTH, expand=True) columns = ('name', 'cw', 'sw', 'count', 'errs', 'period', 'wc') self.msg_tree = ttk.Treeview(tree_frame, columns=columns, show='headings', selectmode='browse') # Column headers self.msg_tree.heading('name', text='Name') self.msg_tree.heading('cw', text='CW') self.msg_tree.heading('sw', text='SW') self.msg_tree.heading('count', text='Num') self.msg_tree.heading('errs', text='Errs') self.msg_tree.heading('period', text='Period') self.msg_tree.heading('wc', text='WC') # Column widths self.msg_tree.column('name', width=50, anchor='center') self.msg_tree.column('cw', width=90, anchor='center') self.msg_tree.column('sw', width=70, anchor='center') self.msg_tree.column('count', width=60, anchor='center') self.msg_tree.column('errs', width=50, anchor='center') self.msg_tree.column('period', width=70, anchor='center') self.msg_tree.column('wc', width=40, anchor='center') # Scrollbar scrollbar = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL, command=self.msg_tree.yview) self.msg_tree.configure(yscrollcommand=scrollbar.set) self.msg_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Bind selection event self.msg_tree.bind('<>', self._on_message_select) # Configure row tags for coloring self.msg_tree.tag_configure('a_msg', background='#E8F4E8') # Light green for A-msgs self.msg_tree.tag_configure('b_msg', background='#E8E8F4') # Light blue for B-msgs self.msg_tree.tag_configure('error', foreground='red') # Populate initial data self._populate_message_list() def _populate_message_list(self): """Populate the message list with initial data""" for name, stats in self.message_stats.items(): tag = 'b_msg' if stats.is_transmit else 'a_msg' period_str = f"{stats.period_ms:.1f}" if stats.period_ms > 0 else "-" self.msg_tree.insert('', 'end', iid=name, values=( name, stats.cw, stats.sw, stats.count, stats.errors, period_str, stats.word_count ), tags=(tag,)) def _create_notebook(self, parent): """Create the right-side notebook with tabs""" self.notebook = ttk.Notebook(parent) self.notebook.pack(fill=tk.BOTH, expand=True) # Tab 1: Message Detail self.detail_frame = ttk.Frame(self.notebook) self.notebook.add(self.detail_frame, text="Message Detail") self._create_detail_tab(self.detail_frame) # Tab 2: Radar Status Dashboard self.dashboard_frame = ttk.Frame(self.notebook) self.notebook.add(self.dashboard_frame, text="Radar Status") self._create_dashboard_tab(self.dashboard_frame) # Tab 3: Raw Data self.raw_frame = ttk.Frame(self.notebook) self.notebook.add(self.raw_frame, text="Raw Data") self._create_raw_tab(self.raw_frame) def _create_log_panel(self): """Create the log panel at the bottom of the window""" # Separator separator = ttk.Separator(self.root, orient=tk.HORIZONTAL) separator.pack(fill=tk.X, padx=5) # Log panel container log_container = ttk.LabelFrame(self.root, text="Application Log", padding=5) log_container.pack(fill=tk.BOTH, expand=False, padx=5, pady=(5, 0), side=tk.BOTTOM) # ScrolledText widget for log display self.log_text = ScrolledText( log_container, height=8, # 8 lines of log width=80, state=tk.DISABLED, # Read-only wrap=tk.WORD, background='#F5F5F5', font=('Consolas', 9) ) self.log_text.pack(fill=tk.BOTH, expand=True) def _setup_logging(self): """Initialize the TkinterLogger system""" if TkinterLogger is None: # Fallback: basic logging to console logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%H:%M:%S' ) logging.getLogger(__name__).warning("TkinterLogger not available, using console logging") return # Initialize TkinterLogger system self.logger_system = TkinterLogger(self.root) self.logger_system.setup( enable_console=True, # Also log to console enable_tkinter=True, enable_file=False, # No file logging for now log_format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', date_format='%H:%M:%S', root_level=logging.INFO # Use root_level instead of log_level ) # Add the Tkinter widget handler self.logger_system.add_tkinter_handler( self.log_text, max_lines=500 # Keep last 500 log lines ) logging.getLogger(__name__).info("Logging system initialized") def _create_detail_tab(self, parent): """Create the message detail view tab""" # Header with selected message name self.detail_header = ttk.Label(parent, text="Select a message to view details", font=('Helvetica', 12, 'bold')) self.detail_header.pack(pady=10) # Scrollable frame for fields canvas = tk.Canvas(parent) scrollbar = ttk.Scrollbar(parent, orient="vertical", command=canvas.yview) self.detail_scroll_frame = ttk.Frame(canvas) self.detail_scroll_frame.bind( "", lambda e: canvas.configure(scrollregion=canvas.bbox("all")) ) canvas.create_window((0, 0), window=self.detail_scroll_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Store reference to canvas for updates self.detail_canvas = canvas # Dictionary to hold field labels for updating self.detail_field_labels: Dict[str, ttk.Label] = {} # Track which message detail view is currently built (cache) self._current_detail_message: Optional[str] = None self._detail_widgets_cache: Dict[str, Dict[str, tuple]] = {} # Structure: {msg_name: {field_name: (raw_label, enum_label, value_label)}} def _create_dashboard_tab(self, parent): """Create the radar status dashboard tab""" # Title title = ttk.Label(parent, text="Radar Status Dashboard", font=('Helvetica', 14, 'bold')) title.pack(pady=10) # Main frame with grid dashboard = ttk.Frame(parent) dashboard.pack(fill=tk.BOTH, expand=True, padx=20, pady=10) # Left column - Mode and Status left_col = ttk.LabelFrame(dashboard, text="Radar Mode", padding=10) left_col.grid(row=0, column=0, padx=10, pady=5, sticky='nsew') self.dashboard_labels = {} # Mode indicator ttk.Label(left_col, text="Master Mode:").grid(row=0, column=0, sticky='e', padx=5) self.dashboard_labels['mode'] = ttk.Label(left_col, text="---", font=('Helvetica', 11, 'bold')) self.dashboard_labels['mode'].grid(row=0, column=1, sticky='w', padx=5) ttk.Label(left_col, text="Standby:").grid(row=1, column=0, sticky='e', padx=5) self.dashboard_labels['standby'] = ttk.Label(left_col, text="---") self.dashboard_labels['standby'].grid(row=1, column=1, sticky='w', padx=5) ttk.Label(left_col, text="RF Radiation:").grid(row=2, column=0, sticky='e', padx=5) self.dashboard_labels['rf'] = ttk.Label(left_col, text="---") self.dashboard_labels['rf'].grid(row=2, column=1, sticky='w', padx=5) ttk.Label(left_col, text="Transition:").grid(row=3, column=0, sticky='e', padx=5) self.dashboard_labels['transition'] = ttk.Label(left_col, text="---") self.dashboard_labels['transition'].grid(row=3, column=1, sticky='w', padx=5) # Right column - Health right_col = ttk.LabelFrame(dashboard, text="Health Status", padding=10) right_col.grid(row=0, column=1, padx=10, pady=5, sticky='nsew') health_items = [ ('radar_failed', 'Radar'), ('array_failed', 'Array'), ('transmitter_failed', 'Transmitter'), ('receiver_failed', 'Receiver'), ('processor_failed', 'Processor'), ('tx_overtemp', 'TX Overtemp'), ] for i, (key, label) in enumerate(health_items): ttk.Label(right_col, text=f"{label}:").grid(row=i, column=0, sticky='e', padx=5) self.dashboard_labels[key] = ttk.Label(right_col, text="---", width=8) self.dashboard_labels[key].grid(row=i, column=1, sticky='w', padx=5) # Bottom - Scan Settings bottom_col = ttk.LabelFrame(dashboard, text="Scan Parameters", padding=10) bottom_col.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky='nsew') scan_items = [ ('range_scale', 'Range Scale'), ('bar_scan', 'Bars'), ('azimuth_scan', 'Az Scan'), ] for i, (key, label) in enumerate(scan_items): ttk.Label(bottom_col, text=f"{label}:").grid(row=0, column=i*2, sticky='e', padx=5) self.dashboard_labels[key] = ttk.Label(bottom_col, text="---", width=10) self.dashboard_labels[key].grid(row=0, column=i*2+1, sticky='w', padx=5) dashboard.columnconfigure(0, weight=1) dashboard.columnconfigure(1, weight=1) def _create_raw_tab(self, parent): """Create the raw data view tab""" # Text widget for raw hex data self.raw_text = tk.Text(parent, font=('Courier', 10), wrap=tk.WORD) scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.raw_text.yview) self.raw_text.configure(yscrollcommand=scrollbar.set) self.raw_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.raw_text.insert('1.0', "Select a message to view raw data...") self.raw_text.configure(state='disabled') def _create_status_bar(self): """Create the status bar at the bottom""" status_frame = ttk.Frame(self.root) status_frame.pack(side=tk.BOTTOM, fill=tk.X) # Connection status self.status_connection = ttk.Label(status_frame, text="● Disconnected", foreground='red') self.status_connection.pack(side=tk.LEFT, padx=10) # TX/RX counters self.status_tx = ttk.Label(status_frame, text="TX: 0") self.status_tx.pack(side=tk.LEFT, padx=10) self.status_rx = ttk.Label(status_frame, text="RX: 0") self.status_rx.pack(side=tk.LEFT, padx=10) # Error display self.status_error = ttk.Label(status_frame, text="", foreground='red') self.status_error.pack(side=tk.RIGHT, padx=10) # Separator ttk.Separator(self.root, orient=tk.HORIZONTAL).pack(side=tk.BOTTOM, fill=tk.X) def _on_message_select(self, event): """Handle message selection in the tree view""" selection = self.msg_tree.selection() if selection: msg_name = selection[0] self.selected_message = msg_name self._update_detail_view(msg_name) self._update_raw_view(msg_name) def _update_detail_view(self, msg_name: str): """Update the detail view for the selected message""" stats = self.message_stats.get(msg_name) if not stats: return # Update header direction = "RT → BC" if stats.is_transmit else "BC → RT" self.detail_header.config(text=f"{msg_name} - {direction}") # Se non abbiamo ancora dati per questo messaggio if not stats.last_data: self._show_no_data_message() return # Se stiamo cambiando messaggio, dobbiamo ricostruire il layout if self._current_detail_message != msg_name: self._build_detail_table(msg_name, stats.last_data) self._current_detail_message = msg_name # Aggiorna solo i valori (veloce, senza ricostruire layout) self._update_detail_values(msg_name, stats.last_data) def _show_no_data_message(self): """Mostra messaggio quando non ci sono dati""" # Clear existing for widget in self.detail_scroll_frame.winfo_children(): widget.destroy() self.detail_field_labels.clear() self._current_detail_message = None no_data = ttk.Label(self.detail_scroll_frame, text="No data received yet for this message", font=('Helvetica', 10, 'italic')) no_data.pack(pady=20) def _build_detail_table(self, msg_name: str, msg_obj): """Costruisce la tabella dei dettagli (chiamato solo alla prima selezione del messaggio)""" # Clear existing widgets for widget in self.detail_scroll_frame.winfo_children(): widget.destroy() self.detail_field_labels.clear() cls = msg_obj.__class__ # Get all Field descriptors fields = [] for name, obj in cls.__dict__.items(): if isinstance(obj, Field): fields.append((name, obj)) # Sort by word_index then start_bit fields.sort(key=lambda x: (x[1].word_index, x[1].start_bit)) # Cache per questo messaggio widget_cache = {} # Crea header tabella header_frame = ttk.Frame(self.detail_scroll_frame) header_frame.pack(fill=tk.X, padx=10, pady=5) ttk.Label(header_frame, text="Field Name", width=25, anchor='w', font=('Helvetica', 9, 'bold')).grid(row=0, column=0, padx=2) ttk.Label(header_frame, text="Raw Value", width=12, anchor='center', font=('Helvetica', 9, 'bold')).grid(row=0, column=1, padx=2) ttk.Label(header_frame, text="Decoded Value", width=25, anchor='w', font=('Helvetica', 9, 'bold')).grid(row=0, column=2, padx=2) ttk.Separator(self.detail_scroll_frame, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=2) # Create field rows current_word = -1 for field_name, field_desc in fields: # Add word separator if field_desc.word_index != current_word: current_word = field_desc.word_index sep = ttk.Separator(self.detail_scroll_frame, orient=tk.HORIZONTAL) sep.pack(fill=tk.X, pady=3) word_label = ttk.Label(self.detail_scroll_frame, text=f"Word {current_word:02d}", font=('Helvetica', 9, 'bold'), background='#e0e0e0') word_label.pack(fill=tk.X, padx=5, pady=2) # Field row con tabella row_frame = ttk.Frame(self.detail_scroll_frame) row_frame.pack(fill=tk.X, padx=10, pady=1) # Nome campo (fisso) name_label = ttk.Label(row_frame, text=f"{field_name}", width=25, anchor='w', font=('Courier', 9)) name_label.grid(row=0, column=0, padx=2, sticky='w') # Valore raw (aggiornabile) raw_label = ttk.Label(row_frame, text="---", width=12, anchor='center', font=('Courier', 9, 'bold'), foreground='#0066cc') raw_label.grid(row=0, column=1, padx=2) # Valore decodificato (aggiornabile) decoded_label = ttk.Label(row_frame, text="---", width=25, anchor='w', font=('Courier', 9)) decoded_label.grid(row=0, column=2, padx=2, sticky='w') # Salva riferimenti per aggiornamenti futuri widget_cache[field_name] = (raw_label, decoded_label, field_desc) # Salva cache per questo messaggio self._detail_widgets_cache[msg_name] = widget_cache def _update_detail_values(self, msg_name: str, msg_obj): """Aggiorna solo i valori nella tabella (veloce, senza ricostruire layout)""" widget_cache = self._detail_widgets_cache.get(msg_name) if not widget_cache: return for field_name, (raw_label, decoded_label, field_desc) in widget_cache.items(): try: # Ottieni valore raw dal campo raw_value = field_desc._get_raw_value(msg_obj) # Ottieni valore interpretato value = getattr(msg_obj, field_name) # Formatta valore raw (hex per bit fields) if field_desc.width <= 8: raw_str = f"0x{raw_value:0{(field_desc.width+3)//4}X}" # Hex else: raw_str = f"{raw_value}" # Decimale per campi grandi # Formatta valore decodificato if isinstance(value, float): decoded_str = f"{value:.4f}" elif hasattr(value, 'name'): # Enum decoded_str = f"{value.name} ({value.value})" else: decoded_str = str(value) # Aggiorna labels (VELOCE - no widget creation) raw_label.config(text=raw_str) decoded_label.config(text=decoded_str) except Exception as e: raw_label.config(text="ERR") decoded_label.config(text=f"<{str(e)[:20]}>") def _update_raw_view(self, msg_name: str): """Update the raw data view for the selected message""" stats = self.message_stats.get(msg_name) if not stats: return self.raw_text.configure(state='normal') self.raw_text.delete('1.0', tk.END) text = f"Message: {msg_name}\n" text += f"Command Word: {stats.cw}\n" text += f"Status Word: {stats.sw}\n" text += f"Word Count: {stats.word_count}\n" text += f"Message Count: {stats.count}\n" text += f"Errors: {stats.errors}\n\n" if stats.raw_words: text += "Raw Data Words (hex):\n" text += "-" * 40 + "\n" for i, word in enumerate(stats.raw_words): text += f" Word {i:02d}: 0x{word:04X} ({word:5d})\n" else: text += "No raw data available.\n" self.raw_text.insert('1.0', text) self.raw_text.configure(state='disabled') def _update_dashboard(self, b6_data, b7_data): """Update the radar dashboard with B6/B7 data""" if b7_data: # Mode mode = getattr(b7_data, 'master_mode_tb', None) if mode: self.dashboard_labels['mode'].config( text=mode.name if hasattr(mode, 'name') else str(mode) ) # Standby stby = getattr(b7_data, 'standby_status', None) if stby is not None: text = stby.name if hasattr(stby, 'name') else str(stby) color = 'orange' if 'ON' in text.upper() else 'green' self.dashboard_labels['standby'].config(text=text, foreground=color) # RF Radiation rf = getattr(b7_data, 'rf_radiation_status', None) if rf is not None: text = rf.name if hasattr(rf, 'name') else str(rf) color = 'green' if 'ON' in text.upper() else 'gray' self.dashboard_labels['rf'].config(text=text, foreground=color) # Transition trans = getattr(b7_data, 'transition_status', None) if trans is not None: text = trans.name if hasattr(trans, 'name') else str(trans) self.dashboard_labels['transition'].config(text=text) # Scan parameters range_s = getattr(b7_data, 'range_scale_tb', None) if range_s: self.dashboard_labels['range_scale'].config( text=range_s.name if hasattr(range_s, 'name') else str(range_s) ) bar_s = getattr(b7_data, 'bar_scan_tb', None) if bar_s: self.dashboard_labels['bar_scan'].config( text=bar_s.name if hasattr(bar_s, 'name') else str(bar_s) ) az_s = getattr(b7_data, 'azimuth_scan_tb', None) if az_s: self.dashboard_labels['azimuth_scan'].config( text=az_s.name if hasattr(az_s, 'name') else str(az_s) ) if b6_data: # Health status from B6 health_fields = [ 'radar_failed', 'array_failed', 'transmitter_failed', 'receiver_failed', 'processor_failed', 'tx_overtemp' ] for field in health_fields: value = getattr(b6_data, field, None) if value is not None: if value == 0: self.dashboard_labels[field].config(text="OK", foreground='green') else: self.dashboard_labels[field].config(text="FAIL", foreground='red') def _start_gui_update_timer(self): """Avvia il timer periodico per elaborare gli aggiornamenti dalla queue""" self._process_update_queue() def _process_update_queue(self): """Elabora tutti gli aggiornamenti in coda in batch (rate limiting)""" # Processa fino a 50 aggiornamenti per ciclo per evitare blocchi updates_processed = 0 max_updates_per_cycle = 50 # Processa aggiornamenti messaggi while updates_processed < max_updates_per_cycle: try: msg_name, msg_obj, raw_words = self._update_queue.get_nowait() self._update_message_stats_internal(msg_name, msg_obj, raw_words) updates_processed += 1 except queue.Empty: break # Processa aggiornamenti status (prendi solo l'ultimo) last_status = None while True: try: last_status = self._status_queue.get_nowait() except queue.Empty: break if last_status: connected, tx, rx, error = last_status self._update_connection_status_internal(connected, tx, rx, error) # Riprogramma il prossimo ciclo self._update_id = self.root.after(self._gui_update_interval_ms, self._process_update_queue) def queue_message_update(self, msg_name: str, msg_obj, raw_words: List[int] = None): """Accoda un aggiornamento messaggio (chiamato da thread esterni)""" try: # Non bloccare se la coda è piena, scarta l'aggiornamento self._update_queue.put_nowait((msg_name, msg_obj, raw_words)) except queue.Full: # Coda piena, scarta aggiornamento (evita blocco) pass def queue_status_update(self, connected: bool, tx: int, rx: int, error: str = ""): """Accoda un aggiornamento status (chiamato da thread esterni)""" try: self._status_queue.put_nowait((connected, tx, rx, error)) except queue.Full: # Svuota e metti l'ultimo try: self._status_queue.get_nowait() self._status_queue.put_nowait((connected, tx, rx, error)) except: pass def _update_message_stats_internal(self, msg_name: str, msg_obj, raw_words: List[int] = None): """Aggiorna statistiche messaggio (chiamato solo dal thread GUI)""" if msg_name not in self.message_stats: return stats = self.message_stats[msg_name] now = time.time() # Calculate period if stats.last_update > 0: stats.period_ms = (now - stats.last_update) * 1000 stats.last_update = now stats.count += 1 stats.last_data = msg_obj if raw_words: stats.raw_words = raw_words # Update tree view period_str = f"{stats.period_ms:.1f}" if stats.period_ms > 0 else "-" self.msg_tree.item(msg_name, values=( msg_name, stats.cw, stats.sw, stats.count, stats.errors, period_str, stats.word_count )) # Update detail view if this message is selected if self.selected_message == msg_name: self._update_detail_view(msg_name) self._update_raw_view(msg_name) # Update dashboard for B6/B7 if msg_name == 'B6': self._update_dashboard(msg_obj, None) elif msg_name == 'B7': self._update_dashboard(None, msg_obj) def update_message_stats(self, msg_name: str, msg_obj, raw_words: List[int] = None): """Update statistics for a received message (metodo pubblico deprecato, usa queue_message_update)""" # Backward compatibility: accoda invece di aggiornare direttamente self.queue_message_update(msg_name, msg_obj, raw_words) def _update_connection_status_internal(self, connected: bool, tx: int, rx: int, error: str): """Aggiorna status bar (chiamato solo dal thread GUI)""" self.is_connected = connected self.tx_count = tx self.rx_count = rx self.last_error = error if connected: self.status_connection.config(text="● Connected", foreground='green') else: self.status_connection.config(text="○ Disconnected", foreground='red') self.status_tx.config(text=f"TX: {tx}") self.status_rx.config(text=f"RX: {rx}") if error: self.status_error.config(text=f"Error: {error}", foreground='red') else: self.status_error.config(text="") def update_connection_status(self, connected: bool, tx: int = 0, rx: int = 0, error: str = ""): """Update the status bar (metodo pubblico, usa queue)""" self.queue_status_update(connected, tx, rx, error) def _on_run(self): """Handle RUN button - start connection and apply defaults""" logging.getLogger(__name__).info("RUN button pressed - starting system...") # Disable RUN, enable STOP self.btn_run.config(state=tk.DISABLED) self.btn_stop.config(state=tk.NORMAL) self.lbl_status.config(text="● Starting...", foreground='orange') # Call the external connect handler (wired in __main__.py) if hasattr(self, '_on_connect') and callable(self._on_connect): self._on_connect() self.lbl_status.config(text="● Running", foreground='green') def _on_stop(self): """Handle STOP button - disconnect""" logging.getLogger(__name__).info("STOP button pressed - stopping system...") # Disable STOP, enable RUN self.btn_stop.config(state=tk.DISABLED) self.btn_run.config(state=tk.NORMAL) self.lbl_status.config(text="● Stopping...", foreground='orange') # Call the external disconnect handler (wired in __main__.py) if hasattr(self, '_on_disconnect') and callable(self._on_disconnect): self._on_disconnect() self.lbl_status.config(text="● Stopped - Press RUN to restart", foreground='red') def _on_connect(self): """Handle connect menu action (deprecated - use RUN button)""" self._on_run() def _on_disconnect(self): """Handle disconnect menu action (deprecated - use STOP button)""" self._on_stop() def _reset_counters(self): """Reset all message counters""" for name, stats in self.message_stats.items(): stats.count = 0 stats.errors = 0 stats.period_ms = 0.0 stats.last_update = 0.0 self.msg_tree.item(name, values=( name, stats.cw, stats.sw, 0, 0, "-", stats.word_count )) def _show_about(self): """Show about dialog""" from tkinter import messagebox messagebox.showinfo( "About PyBusMonitor1553", "PyBusMonitor1553\n\n" "MIL-STD-1553 Bus Monitor\n" "Version 1.0\n\n" "Python implementation for GRIFO-F radar interface" ) def shutdown(self): """Clean shutdown of GUI and logging system""" logging.getLogger(__name__).info("Shutting down GUI...") if self.logger_system: self.logger_system.shutdown() def run_gui(): """Launch the GUI application""" root = tk.Tk() app = BusMonitorApp(root) # Register shutdown handler def on_closing(): app.shutdown() root.destroy() root.protocol("WM_DELETE_WINDOW", on_closing) return root, app