# -*- coding: utf-8 -*- """ Simplified 1553 Debug View for embedding in main ARTOS GUI. Shows only: - Bus Monitor table (messages with stats) - Details pane (selected message fields) Uses existing BusMonitorCore instance from main application. Logs to main GUI logger (no internal log widget). """ import tkinter as tk from tkinter import ttk import logging import time class DebugView(tk.Frame): """ Lightweight 1553 debug panel showing message table and details. Designed to be embedded in a Toplevel, uses external BusMonitorCore. """ def __init__(self, parent, bus_monitor): """ Args: parent: Tk parent widget bus_monitor: BusMonitorCore instance (already initialized) """ super().__init__(parent) self.pack(fill=tk.BOTH, expand=True) self.bus_monitor = bus_monitor self.logger = logging.getLogger('ARTOS.1553Debug') # Cache for tree items and message wrappers self._tree_items = {} self._current_selected_label = None self._create_widgets() self._start_update_loop() self.logger.info("1553 Debug View initialized") def _create_widgets(self): """Create Bus Monitor table and Details pane.""" # Top controls controls = tk.Frame(self) controls.pack(side=tk.TOP, fill=tk.X, padx=6, pady=6) tk.Label(controls, text="1553 Bus Monitor - Real-time", font=('Arial', 10, 'bold')).pack(side=tk.LEFT, padx=10) refresh_btn = tk.Button(controls, text="Refresh", command=self._refresh_table) refresh_btn.pack(side=tk.RIGHT, padx=4) # Main content: table (left) + details (right) content = tk.Frame(self) content.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=6, pady=2) # Left: Bus Monitor table table_frame = tk.LabelFrame(content, text="Bus Monitor") table_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 6)) columns = ("label", "cw", "sw", "num", "errs", "period", "mc") self.tree = ttk.Treeview(table_frame, columns=columns, show="headings", height=20) self.tree.heading("label", text="Name") self.tree.heading("cw", text="RT-SA-WC-T/R") self.tree.heading("sw", text="SW") self.tree.heading("num", text="Num") self.tree.heading("errs", text="Errs") self.tree.heading("period", text="period") self.tree.heading("mc", text="MC") self.tree.column("label", width=120) self.tree.column("cw", width=120) self.tree.column("sw", width=60) self.tree.column("num", width=60) self.tree.column("errs", width=60) self.tree.column("period", width=80) self.tree.column("mc", width=60) scrollbar = ttk.Scrollbar(table_frame, orient=tk.VERTICAL, command=self.tree.yview) self.tree.configure(yscrollcommand=scrollbar.set) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) self.tree.bind('<>', self._on_tree_select) # Right: Details pane details_frame = tk.LabelFrame(content, text="Message Details") details_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=False, padx=6) detail_columns = ("param", "value") self.detail_tree = ttk.Treeview(details_frame, columns=detail_columns, show="headings", height=20) self.detail_tree.heading("param", text="Parameter") self.detail_tree.heading("value", text="Value") self.detail_tree.column("param", width=200) self.detail_tree.column("value", width=150) detail_scroll = ttk.Scrollbar(details_frame, orient=tk.VERTICAL, command=self.detail_tree.yview) self.detail_tree.configure(yscrollcommand=detail_scroll.set) detail_scroll.pack(side=tk.RIGHT, fill=tk.Y) self.detail_tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) def _get_messagedb(self): """Get MessageDB from BusMonitorCore.""" if self.bus_monitor is None: return None return getattr(self.bus_monitor, '_messagedb', None) def _refresh_table(self): """Refresh the message table from MessageDB.""" messagedb = self._get_messagedb() if not messagedb: self.logger.warning("MessageDB not available") return try: all_messages = messagedb.getAllMessages() for label, wrapper in all_messages.items(): try: # Use the same attributes as MonitorApp for compatibility # CW from head.cw.raw try: cw_raw = getattr(wrapper.head.cw, 'raw', None) except Exception: cw_raw = None # SW from head.sw try: sw = getattr(wrapper.head, 'sw', 0) except Exception: sw = 0 # Num = sent_count (or size as fallback) num = getattr(wrapper, 'sent_count', getattr(wrapper, 'size', 0)) # Errs from head.errcode try: errs = getattr(wrapper.head, 'errcode', 0) except Exception: errs = 0 # Period from _time_ms or calculated from freq period_ms = getattr(wrapper, '_time_ms', None) if period_ms is None: freq = getattr(wrapper, 'freq', None) if freq and freq > 0: period_ms = 1000.0 / freq else: period_ms = 0 # MC = recv_count mc = getattr(wrapper, 'recv_count', 0) # Format CW as RT-SA-WC-T/R try: rt = getattr(wrapper.head.cw.str, 'rt', 0) sa = getattr(wrapper.head.cw.str, 'sa', 0) wc = getattr(wrapper.head.cw.str, 'wc', 0) tr = getattr(wrapper.head.cw.str, 'tr', 0) tr_str = "T" if tr == 0 else "R" cw_str = f"{rt:02d}-{sa:02d}-{wc:02d}-{tr_str}" except Exception: cw_str = "---" values = ( label, cw_str, f"{sw}", f"{num}", f"{errs}", f"{period_ms:.1f}ms" if period_ms > 0 else "---", f"{mc}" ) # Update or insert if label in self._tree_items: item_id = self._tree_items[label] self.tree.item(item_id, values=values) else: item_id = self.tree.insert('', tk.END, values=values) self._tree_items[label] = item_id except Exception as e: self.logger.debug(f"Error updating row for {label}: {e}") except Exception as e: self.logger.error(f"Error refreshing table: {e}") def _on_tree_select(self, event): """Handle tree selection - show message details.""" selection = self.tree.selection() if not selection: return item_id = selection[0] values = self.tree.item(item_id, 'values') if not values: return label = values[0] self._current_selected_label = label self._update_details(label) def _update_details(self, label): """Show details for selected message - display payload fields with raw and enum values.""" # Clear existing details for item in self.detail_tree.get_children(): self.detail_tree.delete(item) messagedb = self._get_messagedb() if not messagedb: return try: all_messages = messagedb.getAllMessages() if label not in all_messages: return wrapper = all_messages[label] # Show message label header self._add_detail_row("=== Message ===", label) # Show message payload fields with raw and enum values try: msg = wrapper.message if msg: self._extract_message_fields(msg) else: self._add_detail_row("No payload data", "---") except Exception as e: self._add_detail_row("Error reading payload", str(e)) except Exception as e: self.logger.error(f"Error updating details for {label}: {e}") def _get_enum_items(self, class_name): """Return list of (name, value) tuples for enum class, or None.""" try: from .monitor_helpers import get_enum_items return get_enum_items(class_name) except Exception: return None def _get_enum_for_field(self, field_name): """Return enum class for field_name from ENUM_MAP, or None.""" try: import Grifo_E_1553lib.data_types.enum_map as em return em.ENUM_MAP.get(field_name) except Exception: try: import pybusmonitor1553.Grifo_E_1553lib.data_types.enum_map as em return em.ENUM_MAP.get(field_name) except Exception: return None def _is_struct_like(self, v): """Check if value is a struct-like object - same logic as monitor.py.""" if v is None: return False if isinstance(v, (int, float, str, bytes)): return False if callable(v): return False # ctypes.Structure expose _fields_ if hasattr(v, '_fields_'): return True # objects with many public attributes are likely struct-like public = [n for n in dir(v) if not n.startswith('_')] if len(public) > 2: return True return False def _extract_message_fields(self, obj, prefix="", _depth=0, max_depth=5): """Recursively extract fields from ctypes message structure - mimics monitor.py formatting.""" if _depth >= max_depth: return try: # Iterate over public attributes like monitor.py does for name in [n for n in dir(obj) if not n.startswith('_')]: try: val = getattr(obj, name) except Exception: continue if callable(val): continue full_name = f"{prefix}.{name}" if prefix else name # Special-case: ctypes Union with a '.str' structure (bitfields) if hasattr(val, 'str') and self._is_struct_like(getattr(val, 'str')): # Show structure header indent = " " * _depth self._add_detail_row(f"{indent}{name}", "") # Recurse into the .str sub-structure try: self._extract_message_fields(getattr(val, 'str'), prefix=f"{full_name}.str", _depth=_depth+1, max_depth=max_depth) except Exception: pass continue # Group nested structs (ctypes structs or objects with many public attrs) if self._is_struct_like(val) and not (hasattr(val, 'raw') or hasattr(val, 'value')): # Show structure header indent = " " * _depth self._add_detail_row(f"{indent}{name}", "") # Recurse self._extract_message_fields(val, prefix=full_name, _depth=_depth+1, max_depth=max_depth) continue # Scalar value or enum - show with proper formatting indent = " " * (_depth + 1) # Get raw value if hasattr(val, 'value'): raw_val = val.value elif hasattr(val, 'raw'): raw_val = val.raw else: raw_val = val # Try to get enum representation - use ENUM_MAP first (by field name) enum_items = None try: # Try ENUM_MAP first using field name enum_cls = self._get_enum_for_field(name) if enum_cls: enum_items = [(m.name, m.value) for m in enum_cls] else: # Fallback to class-based lookup enum_items = self._get_enum_items(val.__class__.__name__) except Exception: pass # Format the value display like monitor.py: "ENUM_NAME (value)" or just value if enum_items: try: raw = int(raw_val) # Find matching enum name enum_name = None for (n, v) in enum_items: if int(v) == raw: enum_name = n break if enum_name: val_str = f"{enum_name} ({raw})" else: val_str = str(raw) except Exception: val_str = str(raw_val) else: # No enum, just show the value val_str = str(raw_val) self._add_detail_row(f"{indent}{name}", val_str) except Exception as e: self.logger.debug(f"Error extracting fields: {e}") def _add_detail_row(self, param, value): """Add a row to details tree.""" try: self.detail_tree.insert('', tk.END, values=(param, value)) except Exception: pass def _start_update_loop(self): """Start periodic update loop.""" self._update_active = True self._schedule_update() def _schedule_update(self): """Schedule next update.""" if self._update_active: self._refresh_table() # If a message is selected, refresh its details if self._current_selected_label: self._update_details(self._current_selected_label) # Schedule next update (500ms) self.after(500, self._schedule_update) def stop(self): """Stop update loop.""" self._update_active = False