""" PyBusMonitor1553 - Main GUI Window Tkinter-based interface for MIL-STD-1553 Bus Monitoring """ import tkinter as tk from tkinter import ttk import threading import time import queue from dataclasses import dataclass, field from typing import Dict, List, Optional, Any from collections import OrderedDict from ..lib1553.fields import Field @dataclass class MessageStats: """Statistics for a single message type""" name: str cw: str # Command Word formatted string sw: str # Status Word formatted string count: int = 0 errors: int = 0 last_update: float = 0.0 period_ms: float = 0.0 word_count: int = 0 is_transmit: bool = False # True = RT->BC (B-msg), False = BC->RT (A-msg) last_data: Optional[Any] = None raw_words: List[int] = field(default_factory=list) class BusMonitorApp: """Main application window for PyBusMonitor1553""" def __init__(self, root: tk.Tk): self.root = root self.root.title("PyBusMonitor1553 - MIL-STD-1553 Bus Monitor") self.root.geometry("1200x800") self.root.minsize(800, 600) # Message statistics storage self.message_stats: Dict[str, MessageStats] = OrderedDict() self._init_message_stats() # Selected message for detail view self.selected_message: Optional[str] = None # Connection status self.is_connected = False self.tx_count = 0 self.rx_count = 0 self.last_error = "" # Thread-safe queue for message updates (disaccoppiamento GUI/motore) self._update_queue = queue.Queue(maxsize=1000) self._status_queue = queue.Queue(maxsize=100) # GUI update timer (rate limiting a 100ms invece di aggiornamento continuo) self._gui_update_interval_ms = 100 # Build UI self._create_menu() self._create_main_layout() self._create_status_bar() # Update timer self._update_id = None # Avvia il timer periodico per elaborare la coda aggiornamenti self._start_gui_update_timer() def _init_message_stats(self): """Initialize message statistics for all known message types""" # A-messages (BC -> RT, Receive by RT) a_messages = [ ("A1", 1, False, 10), # Settings ("A2", 2, False, 3), # Operation Command ("A3", 3, False, 32), # Graphics Command ("A4", 4, False, 31), # Navigation Data ("A5", 5, False, 23), # INU/GPS Data ("A6", 6, False, 31), # Weapon Aiming ("A7", 7, False, 31), # Reserved ("A8", 8, False, 30), # Reserved ] # B-messages (RT -> BC, Transmit by RT) b_messages = [ ("B1", 11, True, 29), # Target Report #1 ("B2", 12, True, 27), # Target Report #2 ("B3", 13, True, 27), # Target Report #3 ("B4", 14, True, 27), # Terrain Avoidance ("B5", 15, True, 27), # Reserved ("B6", 16, True, 23), # Settings Tell-Back ("B7", 17, True, 3), # Status Tell-Back ("B8", 18, True, 32), # Reserved ] for name, sa, is_tx, wc in a_messages + b_messages: direction = "T" if is_tx else "R" cw_str = f"20-{direction}-{sa}-{wc}" self.message_stats[name] = MessageStats( name=name, cw=cw_str, sw="0-0-0", word_count=wc, is_transmit=is_tx ) def _create_menu(self): """Create application menu bar""" menubar = tk.Menu(self.root) self.root.config(menu=menubar) # File menu file_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="File", menu=file_menu) file_menu.add_command(label="Connect", command=self._on_connect) file_menu.add_command(label="Disconnect", command=self._on_disconnect) file_menu.add_separator() file_menu.add_command(label="Exit", command=self.root.quit) # View menu view_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="View", menu=view_menu) view_menu.add_command(label="Reset Counters", command=self._reset_counters) # Help menu help_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Help", menu=help_menu) help_menu.add_command(label="About", command=self._show_about) def _create_main_layout(self): """Create the main application layout with notebook tabs""" # Main container with PanedWindow for resizable split self.main_paned = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL) self.main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5) # Left panel - Message list left_frame = ttk.Frame(self.main_paned) self.main_paned.add(left_frame, weight=1) # Right panel - Notebook with tabs right_frame = ttk.Frame(self.main_paned) self.main_paned.add(right_frame, weight=2) # Create message list (left side) self._create_message_list(left_frame) # Create notebook tabs (right side) self._create_notebook(right_frame) def _create_message_list(self, parent): """Create the message list TreeView (like GrifoScope)""" # Header header = ttk.Label(parent, text="1553 Bus Messages", font=('Helvetica', 11, 'bold')) header.pack(pady=(0, 5)) # TreeView with scrollbar tree_frame = ttk.Frame(parent) tree_frame.pack(fill=tk.BOTH, expand=True) columns = ('name', 'cw', 'sw', 'count', 'errs', 'period', 'wc') self.msg_tree = ttk.Treeview(tree_frame, columns=columns, show='headings', selectmode='browse') # Column headers self.msg_tree.heading('name', text='Name') self.msg_tree.heading('cw', text='CW') self.msg_tree.heading('sw', text='SW') self.msg_tree.heading('count', text='Num') self.msg_tree.heading('errs', text='Errs') self.msg_tree.heading('period', text='Period') self.msg_tree.heading('wc', text='WC') # Column widths self.msg_tree.column('name', width=50, anchor='center') self.msg_tree.column('cw', width=90, anchor='center') self.msg_tree.column('sw', width=70, anchor='center') self.msg_tree.column('count', width=60, anchor='center') self.msg_tree.column('errs', width=50, anchor='center') self.msg_tree.column('period', width=70, anchor='center') self.msg_tree.column('wc', width=40, anchor='center') # Scrollbar scrollbar = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL, command=self.msg_tree.yview) self.msg_tree.configure(yscrollcommand=scrollbar.set) self.msg_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # Bind selection event self.msg_tree.bind('<>', self._on_message_select) # Configure row tags for coloring self.msg_tree.tag_configure('a_msg', background='#E8F4E8') # Light green for A-msgs self.msg_tree.tag_configure('b_msg', background='#E8E8F4') # Light blue for B-msgs self.msg_tree.tag_configure('error', foreground='red') # Populate initial data self._populate_message_list() def _populate_message_list(self): """Populate the message list with initial data""" for name, stats in self.message_stats.items(): tag = 'b_msg' if stats.is_transmit else 'a_msg' period_str = f"{stats.period_ms:.1f}" if stats.period_ms > 0 else "-" self.msg_tree.insert('', 'end', iid=name, values=( name, stats.cw, stats.sw, stats.count, stats.errors, period_str, stats.word_count ), tags=(tag,)) def _create_notebook(self, parent): """Create the right-side notebook with tabs""" self.notebook = ttk.Notebook(parent) self.notebook.pack(fill=tk.BOTH, expand=True) # Tab 1: Message Detail self.detail_frame = ttk.Frame(self.notebook) self.notebook.add(self.detail_frame, text="Message Detail") self._create_detail_tab(self.detail_frame) # Tab 2: Radar Status Dashboard self.dashboard_frame = ttk.Frame(self.notebook) self.notebook.add(self.dashboard_frame, text="Radar Status") self._create_dashboard_tab(self.dashboard_frame) # Tab 3: Raw Data self.raw_frame = ttk.Frame(self.notebook) self.notebook.add(self.raw_frame, text="Raw Data") self._create_raw_tab(self.raw_frame) def _create_detail_tab(self, parent): """Create the message detail view tab""" # Header with selected message name self.detail_header = ttk.Label(parent, text="Select a message to view details", font=('Helvetica', 12, 'bold')) self.detail_header.pack(pady=10) # Scrollable frame for fields canvas = tk.Canvas(parent) scrollbar = ttk.Scrollbar(parent, orient="vertical", command=canvas.yview) self.detail_scroll_frame = ttk.Frame(canvas) self.detail_scroll_frame.bind( "", lambda e: canvas.configure(scrollregion=canvas.bbox("all")) ) canvas.create_window((0, 0), window=self.detail_scroll_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Store reference to canvas for updates self.detail_canvas = canvas # Dictionary to hold field labels for updating self.detail_field_labels: Dict[str, ttk.Label] = {} def _create_dashboard_tab(self, parent): """Create the radar status dashboard tab""" # Title title = ttk.Label(parent, text="Radar Status Dashboard", font=('Helvetica', 14, 'bold')) title.pack(pady=10) # Main frame with grid dashboard = ttk.Frame(parent) dashboard.pack(fill=tk.BOTH, expand=True, padx=20, pady=10) # Left column - Mode and Status left_col = ttk.LabelFrame(dashboard, text="Radar Mode", padding=10) left_col.grid(row=0, column=0, padx=10, pady=5, sticky='nsew') self.dashboard_labels = {} # Mode indicator ttk.Label(left_col, text="Master Mode:").grid(row=0, column=0, sticky='e', padx=5) self.dashboard_labels['mode'] = ttk.Label(left_col, text="---", font=('Helvetica', 11, 'bold')) self.dashboard_labels['mode'].grid(row=0, column=1, sticky='w', padx=5) ttk.Label(left_col, text="Standby:").grid(row=1, column=0, sticky='e', padx=5) self.dashboard_labels['standby'] = ttk.Label(left_col, text="---") self.dashboard_labels['standby'].grid(row=1, column=1, sticky='w', padx=5) ttk.Label(left_col, text="RF Radiation:").grid(row=2, column=0, sticky='e', padx=5) self.dashboard_labels['rf'] = ttk.Label(left_col, text="---") self.dashboard_labels['rf'].grid(row=2, column=1, sticky='w', padx=5) ttk.Label(left_col, text="Transition:").grid(row=3, column=0, sticky='e', padx=5) self.dashboard_labels['transition'] = ttk.Label(left_col, text="---") self.dashboard_labels['transition'].grid(row=3, column=1, sticky='w', padx=5) # Right column - Health right_col = ttk.LabelFrame(dashboard, text="Health Status", padding=10) right_col.grid(row=0, column=1, padx=10, pady=5, sticky='nsew') health_items = [ ('radar_failed', 'Radar'), ('array_failed', 'Array'), ('transmitter_failed', 'Transmitter'), ('receiver_failed', 'Receiver'), ('processor_failed', 'Processor'), ('tx_overtemp', 'TX Overtemp'), ] for i, (key, label) in enumerate(health_items): ttk.Label(right_col, text=f"{label}:").grid(row=i, column=0, sticky='e', padx=5) self.dashboard_labels[key] = ttk.Label(right_col, text="---", width=8) self.dashboard_labels[key].grid(row=i, column=1, sticky='w', padx=5) # Bottom - Scan Settings bottom_col = ttk.LabelFrame(dashboard, text="Scan Parameters", padding=10) bottom_col.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky='nsew') scan_items = [ ('range_scale', 'Range Scale'), ('bar_scan', 'Bars'), ('azimuth_scan', 'Az Scan'), ] for i, (key, label) in enumerate(scan_items): ttk.Label(bottom_col, text=f"{label}:").grid(row=0, column=i*2, sticky='e', padx=5) self.dashboard_labels[key] = ttk.Label(bottom_col, text="---", width=10) self.dashboard_labels[key].grid(row=0, column=i*2+1, sticky='w', padx=5) dashboard.columnconfigure(0, weight=1) dashboard.columnconfigure(1, weight=1) def _create_raw_tab(self, parent): """Create the raw data view tab""" # Text widget for raw hex data self.raw_text = tk.Text(parent, font=('Courier', 10), wrap=tk.WORD) scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.raw_text.yview) self.raw_text.configure(yscrollcommand=scrollbar.set) self.raw_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.raw_text.insert('1.0', "Select a message to view raw data...") self.raw_text.configure(state='disabled') def _create_status_bar(self): """Create the status bar at the bottom""" status_frame = ttk.Frame(self.root) status_frame.pack(side=tk.BOTTOM, fill=tk.X) # Connection status self.status_connection = ttk.Label(status_frame, text="● Disconnected", foreground='red') self.status_connection.pack(side=tk.LEFT, padx=10) # TX/RX counters self.status_tx = ttk.Label(status_frame, text="TX: 0") self.status_tx.pack(side=tk.LEFT, padx=10) self.status_rx = ttk.Label(status_frame, text="RX: 0") self.status_rx.pack(side=tk.LEFT, padx=10) # Error display self.status_error = ttk.Label(status_frame, text="", foreground='red') self.status_error.pack(side=tk.RIGHT, padx=10) # Separator ttk.Separator(self.root, orient=tk.HORIZONTAL).pack(side=tk.BOTTOM, fill=tk.X) def _on_message_select(self, event): """Handle message selection in the tree view""" selection = self.msg_tree.selection() if selection: msg_name = selection[0] self.selected_message = msg_name self._update_detail_view(msg_name) self._update_raw_view(msg_name) def _update_detail_view(self, msg_name: str): """Update the detail view for the selected message""" stats = self.message_stats.get(msg_name) if not stats: return # Update header direction = "RT → BC" if stats.is_transmit else "BC → RT" self.detail_header.config(text=f"{msg_name} - {direction}") # Clear existing fields for widget in self.detail_scroll_frame.winfo_children(): widget.destroy() self.detail_field_labels.clear() # If we have message data, show its fields if stats.last_data: msg_obj = stats.last_data cls = msg_obj.__class__ # Get all Field descriptors fields = [] for name, obj in cls.__dict__.items(): if isinstance(obj, Field): fields.append((name, obj)) # Sort by word_index then start_bit fields.sort(key=lambda x: (x[1].word_index, x[1].start_bit)) # Create field displays current_word = -1 for field_name, field_desc in fields: # Add word separator if field_desc.word_index != current_word: current_word = field_desc.word_index sep = ttk.Separator(self.detail_scroll_frame, orient=tk.HORIZONTAL) sep.pack(fill=tk.X, pady=2) word_label = ttk.Label(self.detail_scroll_frame, text=f"Word {current_word:02d}", font=('Helvetica', 9, 'bold')) word_label.pack(anchor='w', padx=5) # Field row row_frame = ttk.Frame(self.detail_scroll_frame) row_frame.pack(fill=tk.X, padx=10, pady=1) name_label = ttk.Label(row_frame, text=f"{field_name}:", width=30, anchor='e') name_label.pack(side=tk.LEFT) try: value = getattr(msg_obj, field_name) if isinstance(value, float): val_str = f"{value:.4f}" elif hasattr(value, 'name'): # Enum val_str = f"{value.name} ({value.value})" else: val_str = str(value) except Exception as e: val_str = f"" value_label = ttk.Label(row_frame, text=val_str, width=30, anchor='w') value_label.pack(side=tk.LEFT, padx=5) self.detail_field_labels[field_name] = value_label else: no_data = ttk.Label(self.detail_scroll_frame, text="No data received yet for this message", font=('Helvetica', 10, 'italic')) no_data.pack(pady=20) def _update_raw_view(self, msg_name: str): """Update the raw data view for the selected message""" stats = self.message_stats.get(msg_name) if not stats: return self.raw_text.configure(state='normal') self.raw_text.delete('1.0', tk.END) text = f"Message: {msg_name}\n" text += f"Command Word: {stats.cw}\n" text += f"Status Word: {stats.sw}\n" text += f"Word Count: {stats.word_count}\n" text += f"Message Count: {stats.count}\n" text += f"Errors: {stats.errors}\n\n" if stats.raw_words: text += "Raw Data Words (hex):\n" text += "-" * 40 + "\n" for i, word in enumerate(stats.raw_words): text += f" Word {i:02d}: 0x{word:04X} ({word:5d})\n" else: text += "No raw data available.\n" self.raw_text.insert('1.0', text) self.raw_text.configure(state='disabled') def _update_dashboard(self, b6_data, b7_data): """Update the radar dashboard with B6/B7 data""" if b7_data: # Mode mode = getattr(b7_data, 'master_mode_tb', None) if mode: self.dashboard_labels['mode'].config( text=mode.name if hasattr(mode, 'name') else str(mode) ) # Standby stby = getattr(b7_data, 'standby_status', None) if stby is not None: text = stby.name if hasattr(stby, 'name') else str(stby) color = 'orange' if 'ON' in text.upper() else 'green' self.dashboard_labels['standby'].config(text=text, foreground=color) # RF Radiation rf = getattr(b7_data, 'rf_radiation_status', None) if rf is not None: text = rf.name if hasattr(rf, 'name') else str(rf) color = 'green' if 'ON' in text.upper() else 'gray' self.dashboard_labels['rf'].config(text=text, foreground=color) # Transition trans = getattr(b7_data, 'transition_status', None) if trans is not None: text = trans.name if hasattr(trans, 'name') else str(trans) self.dashboard_labels['transition'].config(text=text) # Scan parameters range_s = getattr(b7_data, 'range_scale_tb', None) if range_s: self.dashboard_labels['range_scale'].config( text=range_s.name if hasattr(range_s, 'name') else str(range_s) ) bar_s = getattr(b7_data, 'bar_scan_tb', None) if bar_s: self.dashboard_labels['bar_scan'].config( text=bar_s.name if hasattr(bar_s, 'name') else str(bar_s) ) az_s = getattr(b7_data, 'azimuth_scan_tb', None) if az_s: self.dashboard_labels['azimuth_scan'].config( text=az_s.name if hasattr(az_s, 'name') else str(az_s) ) if b6_data: # Health status from B6 health_fields = [ 'radar_failed', 'array_failed', 'transmitter_failed', 'receiver_failed', 'processor_failed', 'tx_overtemp' ] for field in health_fields: value = getattr(b6_data, field, None) if value is not None: if value == 0: self.dashboard_labels[field].config(text="OK", foreground='green') else: self.dashboard_labels[field].config(text="FAIL", foreground='red') def _start_gui_update_timer(self): """Avvia il timer periodico per elaborare gli aggiornamenti dalla queue""" self._process_update_queue() def _process_update_queue(self): """Elabora tutti gli aggiornamenti in coda in batch (rate limiting)""" # Processa fino a 50 aggiornamenti per ciclo per evitare blocchi updates_processed = 0 max_updates_per_cycle = 50 # Processa aggiornamenti messaggi while updates_processed < max_updates_per_cycle: try: msg_name, msg_obj, raw_words = self._update_queue.get_nowait() self._update_message_stats_internal(msg_name, msg_obj, raw_words) updates_processed += 1 except queue.Empty: break # Processa aggiornamenti status (prendi solo l'ultimo) last_status = None while True: try: last_status = self._status_queue.get_nowait() except queue.Empty: break if last_status: connected, tx, rx, error = last_status self._update_connection_status_internal(connected, tx, rx, error) # Riprogramma il prossimo ciclo self._update_id = self.root.after(self._gui_update_interval_ms, self._process_update_queue) def queue_message_update(self, msg_name: str, msg_obj, raw_words: List[int] = None): """Accoda un aggiornamento messaggio (chiamato da thread esterni)""" try: # Non bloccare se la coda è piena, scarta l'aggiornamento self._update_queue.put_nowait((msg_name, msg_obj, raw_words)) except queue.Full: # Coda piena, scarta aggiornamento (evita blocco) pass def queue_status_update(self, connected: bool, tx: int, rx: int, error: str = ""): """Accoda un aggiornamento status (chiamato da thread esterni)""" try: self._status_queue.put_nowait((connected, tx, rx, error)) except queue.Full: # Svuota e metti l'ultimo try: self._status_queue.get_nowait() self._status_queue.put_nowait((connected, tx, rx, error)) except: pass def _update_message_stats_internal(self, msg_name: str, msg_obj, raw_words: List[int] = None): """Aggiorna statistiche messaggio (chiamato solo dal thread GUI)""" if msg_name not in self.message_stats: return stats = self.message_stats[msg_name] now = time.time() # Calculate period if stats.last_update > 0: stats.period_ms = (now - stats.last_update) * 1000 stats.last_update = now stats.count += 1 stats.last_data = msg_obj if raw_words: stats.raw_words = raw_words # Update tree view period_str = f"{stats.period_ms:.1f}" if stats.period_ms > 0 else "-" self.msg_tree.item(msg_name, values=( msg_name, stats.cw, stats.sw, stats.count, stats.errors, period_str, stats.word_count )) # Update detail view if this message is selected if self.selected_message == msg_name: self._update_detail_view(msg_name) self._update_raw_view(msg_name) # Update dashboard for B6/B7 if msg_name == 'B6': self._update_dashboard(msg_obj, None) elif msg_name == 'B7': self._update_dashboard(None, msg_obj) def update_message_stats(self, msg_name: str, msg_obj, raw_words: List[int] = None): """Update statistics for a received message (metodo pubblico deprecato, usa queue_message_update)""" # Backward compatibility: accoda invece di aggiornare direttamente self.queue_message_update(msg_name, msg_obj, raw_words) def _update_connection_status_internal(self, connected: bool, tx: int, rx: int, error: str): """Aggiorna status bar (chiamato solo dal thread GUI)""" self.is_connected = connected self.tx_count = tx self.rx_count = rx self.last_error = error if connected: self.status_connection.config(text="● Connected", foreground='green') else: self.status_connection.config(text="○ Disconnected", foreground='red') self.status_tx.config(text=f"TX: {tx}") self.status_rx.config(text=f"RX: {rx}") if error: self.status_error.config(text=f"Error: {error}", foreground='red') else: self.status_error.config(text="") def update_connection_status(self, connected: bool, tx: int = 0, rx: int = 0, error: str = ""): """Update the status bar (metodo pubblico, usa queue)""" self.queue_status_update(connected, tx, rx, error) def _on_connect(self): """Handle connect menu action""" # This will be wired up to the actual network handler pass def _on_disconnect(self): """Handle disconnect menu action""" pass def _reset_counters(self): """Reset all message counters""" for name, stats in self.message_stats.items(): stats.count = 0 stats.errors = 0 stats.period_ms = 0.0 stats.last_update = 0.0 self.msg_tree.item(name, values=( name, stats.cw, stats.sw, 0, 0, "-", stats.word_count )) def _show_about(self): """Show about dialog""" from tkinter import messagebox messagebox.showinfo( "About PyBusMonitor1553", "PyBusMonitor1553\n\n" "MIL-STD-1553 Bus Monitor\n" "Version 1.0\n\n" "Python implementation for GRIFO-F radar interface" ) def run_gui(): """Launch the GUI application""" root = tk.Tk() app = BusMonitorApp(root) return root, app