392 lines
15 KiB
Python
392 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Simplified 1553 Debug View for embedding in main ARTOS GUI.
|
|
|
|
Shows only:
|
|
- Bus Monitor table (messages with stats)
|
|
- Details pane (selected message fields)
|
|
|
|
Uses existing BusMonitorCore instance from main application.
|
|
Logs to main GUI logger (no internal log widget).
|
|
"""
|
|
import tkinter as tk
|
|
from tkinter import ttk
|
|
import logging
|
|
import time
|
|
|
|
|
|
class DebugView(tk.Frame):
|
|
"""
|
|
Lightweight 1553 debug panel showing message table and details.
|
|
Designed to be embedded in a Toplevel, uses external BusMonitorCore.
|
|
"""
|
|
|
|
def __init__(self, parent, bus_monitor):
|
|
"""
|
|
Args:
|
|
parent: Tk parent widget
|
|
bus_monitor: BusMonitorCore instance (already initialized)
|
|
"""
|
|
super().__init__(parent)
|
|
self.pack(fill=tk.BOTH, expand=True)
|
|
|
|
self.bus_monitor = bus_monitor
|
|
self.logger = logging.getLogger('ARTOS.1553Debug')
|
|
|
|
# Cache for tree items and message wrappers
|
|
self._tree_items = {}
|
|
self._current_selected_label = None
|
|
|
|
self._create_widgets()
|
|
self._start_update_loop()
|
|
|
|
self.logger.info("1553 Debug View initialized")
|
|
|
|
def _create_widgets(self):
|
|
"""Create Bus Monitor table and Details pane."""
|
|
# Top controls
|
|
controls = tk.Frame(self)
|
|
controls.pack(side=tk.TOP, fill=tk.X, padx=6, pady=6)
|
|
|
|
tk.Label(controls, text="1553 Bus Monitor - Real-time", font=('Arial', 10, 'bold')).pack(side=tk.LEFT, padx=10)
|
|
|
|
refresh_btn = tk.Button(controls, text="Refresh", command=self._refresh_table)
|
|
refresh_btn.pack(side=tk.RIGHT, padx=4)
|
|
|
|
# Main content: table (left) + details (right)
|
|
content = tk.Frame(self)
|
|
content.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=6, pady=2)
|
|
|
|
# Left: Bus Monitor table
|
|
table_frame = tk.LabelFrame(content, text="Bus Monitor")
|
|
table_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 6))
|
|
|
|
columns = ("label", "cw", "sw", "num", "errs", "period", "mc")
|
|
self.tree = ttk.Treeview(table_frame, columns=columns, show="headings", height=20)
|
|
self.tree.heading("label", text="Name")
|
|
self.tree.heading("cw", text="RT-SA-WC-T/R")
|
|
self.tree.heading("sw", text="SW")
|
|
self.tree.heading("num", text="Num")
|
|
self.tree.heading("errs", text="Errs")
|
|
self.tree.heading("period", text="period")
|
|
self.tree.heading("mc", text="MC")
|
|
|
|
self.tree.column("label", width=120)
|
|
self.tree.column("cw", width=120)
|
|
self.tree.column("sw", width=60)
|
|
self.tree.column("num", width=60)
|
|
self.tree.column("errs", width=60)
|
|
self.tree.column("period", width=80)
|
|
self.tree.column("mc", width=60)
|
|
|
|
scrollbar = ttk.Scrollbar(table_frame, orient=tk.VERTICAL, command=self.tree.yview)
|
|
self.tree.configure(yscrollcommand=scrollbar.set)
|
|
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
|
self.tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
|
|
|
|
self.tree.bind('<<TreeviewSelect>>', self._on_tree_select)
|
|
|
|
# Right: Details pane
|
|
details_frame = tk.LabelFrame(content, text="Message Details")
|
|
details_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=False, padx=6)
|
|
|
|
detail_columns = ("param", "value")
|
|
self.detail_tree = ttk.Treeview(details_frame, columns=detail_columns, show="headings", height=20)
|
|
self.detail_tree.heading("param", text="Parameter")
|
|
self.detail_tree.heading("value", text="Value")
|
|
self.detail_tree.column("param", width=200)
|
|
self.detail_tree.column("value", width=150)
|
|
|
|
detail_scroll = ttk.Scrollbar(details_frame, orient=tk.VERTICAL, command=self.detail_tree.yview)
|
|
self.detail_tree.configure(yscrollcommand=detail_scroll.set)
|
|
detail_scroll.pack(side=tk.RIGHT, fill=tk.Y)
|
|
self.detail_tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
|
|
|
|
def _get_messagedb(self):
|
|
"""Get MessageDB from BusMonitorCore."""
|
|
if self.bus_monitor is None:
|
|
return None
|
|
return getattr(self.bus_monitor, '_messagedb', None)
|
|
|
|
def _refresh_table(self):
|
|
"""Refresh the message table from MessageDB."""
|
|
messagedb = self._get_messagedb()
|
|
if not messagedb:
|
|
self.logger.warning("MessageDB not available")
|
|
return
|
|
|
|
try:
|
|
all_messages = messagedb.getAllMessages()
|
|
|
|
for label, wrapper in all_messages.items():
|
|
try:
|
|
# Use the same attributes as MonitorApp for compatibility
|
|
# CW from head.cw.raw
|
|
try:
|
|
cw_raw = getattr(wrapper.head.cw, 'raw', None)
|
|
except Exception:
|
|
cw_raw = None
|
|
|
|
# SW from head.sw
|
|
try:
|
|
sw = getattr(wrapper.head, 'sw', 0)
|
|
except Exception:
|
|
sw = 0
|
|
|
|
# Num = sent_count (or size as fallback)
|
|
num = getattr(wrapper, 'sent_count', getattr(wrapper, 'size', 0))
|
|
|
|
# Errs from head.errcode
|
|
try:
|
|
errs = getattr(wrapper.head, 'errcode', 0)
|
|
except Exception:
|
|
errs = 0
|
|
|
|
# Period from _time_ms or calculated from freq
|
|
period_ms = getattr(wrapper, '_time_ms', None)
|
|
if period_ms is None:
|
|
freq = getattr(wrapper, 'freq', None)
|
|
if freq and freq > 0:
|
|
period_ms = 1000.0 / freq
|
|
else:
|
|
period_ms = 0
|
|
|
|
# MC = recv_count
|
|
mc = getattr(wrapper, 'recv_count', 0)
|
|
|
|
# Format CW as RT-SA-WC-T/R
|
|
try:
|
|
rt = getattr(wrapper.head.cw.str, 'rt', 0)
|
|
sa = getattr(wrapper.head.cw.str, 'sa', 0)
|
|
wc = getattr(wrapper.head.cw.str, 'wc', 0)
|
|
tr = getattr(wrapper.head.cw.str, 'tr', 0)
|
|
tr_str = "T" if tr == 0 else "R"
|
|
cw_str = f"{rt:02d}-{sa:02d}-{wc:02d}-{tr_str}"
|
|
except Exception:
|
|
cw_str = "---"
|
|
|
|
values = (
|
|
label,
|
|
cw_str,
|
|
f"{sw}",
|
|
f"{num}",
|
|
f"{errs}",
|
|
f"{period_ms:.1f}ms" if period_ms > 0 else "---",
|
|
f"{mc}"
|
|
)
|
|
|
|
# Update or insert
|
|
if label in self._tree_items:
|
|
item_id = self._tree_items[label]
|
|
self.tree.item(item_id, values=values)
|
|
else:
|
|
item_id = self.tree.insert('', tk.END, values=values)
|
|
self._tree_items[label] = item_id
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Error updating row for {label}: {e}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error refreshing table: {e}")
|
|
|
|
def _on_tree_select(self, event):
|
|
"""Handle tree selection - show message details."""
|
|
selection = self.tree.selection()
|
|
if not selection:
|
|
return
|
|
|
|
item_id = selection[0]
|
|
values = self.tree.item(item_id, 'values')
|
|
if not values:
|
|
return
|
|
|
|
label = values[0]
|
|
self._current_selected_label = label
|
|
self._update_details(label)
|
|
|
|
def _update_details(self, label):
|
|
"""Show details for selected message - display payload fields with raw and enum values."""
|
|
# Clear existing details
|
|
for item in self.detail_tree.get_children():
|
|
self.detail_tree.delete(item)
|
|
|
|
messagedb = self._get_messagedb()
|
|
if not messagedb:
|
|
return
|
|
|
|
try:
|
|
all_messages = messagedb.getAllMessages()
|
|
if label not in all_messages:
|
|
return
|
|
|
|
wrapper = all_messages[label]
|
|
|
|
# Show message label header
|
|
self._add_detail_row("=== Message ===", label)
|
|
|
|
# Show message payload fields with raw and enum values
|
|
try:
|
|
msg = wrapper.message
|
|
if msg:
|
|
self._extract_message_fields(msg)
|
|
else:
|
|
self._add_detail_row("No payload data", "---")
|
|
except Exception as e:
|
|
self._add_detail_row("Error reading payload", str(e))
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating details for {label}: {e}")
|
|
|
|
def _get_enum_items(self, class_name):
|
|
"""Return list of (name, value) tuples for enum class, or None."""
|
|
try:
|
|
from .monitor_helpers import get_enum_items
|
|
return get_enum_items(class_name)
|
|
except Exception:
|
|
return None
|
|
|
|
def _get_enum_for_field(self, field_name):
|
|
"""Return enum class for field_name from ENUM_MAP, or None."""
|
|
try:
|
|
import Grifo_E_1553lib.data_types.enum_map as em
|
|
return em.ENUM_MAP.get(field_name)
|
|
except Exception:
|
|
try:
|
|
import pybusmonitor1553.Grifo_E_1553lib.data_types.enum_map as em
|
|
return em.ENUM_MAP.get(field_name)
|
|
except Exception:
|
|
return None
|
|
|
|
def _is_struct_like(self, v):
|
|
"""Check if value is a struct-like object - same logic as monitor.py."""
|
|
if v is None:
|
|
return False
|
|
if isinstance(v, (int, float, str, bytes)):
|
|
return False
|
|
if callable(v):
|
|
return False
|
|
# ctypes.Structure expose _fields_
|
|
if hasattr(v, '_fields_'):
|
|
return True
|
|
# objects with many public attributes are likely struct-like
|
|
public = [n for n in dir(v) if not n.startswith('_')]
|
|
if len(public) > 2:
|
|
return True
|
|
return False
|
|
|
|
def _extract_message_fields(self, obj, prefix="", _depth=0, max_depth=5):
|
|
"""Recursively extract fields from ctypes message structure - mimics monitor.py formatting."""
|
|
if _depth >= max_depth:
|
|
return
|
|
|
|
try:
|
|
# Iterate over public attributes like monitor.py does
|
|
for name in [n for n in dir(obj) if not n.startswith('_')]:
|
|
try:
|
|
val = getattr(obj, name)
|
|
except Exception:
|
|
continue
|
|
|
|
if callable(val):
|
|
continue
|
|
|
|
full_name = f"{prefix}.{name}" if prefix else name
|
|
|
|
# Special-case: ctypes Union with a '.str' structure (bitfields)
|
|
if hasattr(val, 'str') and self._is_struct_like(getattr(val, 'str')):
|
|
# Show structure header
|
|
indent = " " * _depth
|
|
self._add_detail_row(f"{indent}{name}", "")
|
|
# Recurse into the .str sub-structure
|
|
try:
|
|
self._extract_message_fields(getattr(val, 'str'), prefix=f"{full_name}.str", _depth=_depth+1, max_depth=max_depth)
|
|
except Exception:
|
|
pass
|
|
continue
|
|
|
|
# Group nested structs (ctypes structs or objects with many public attrs)
|
|
if self._is_struct_like(val) and not (hasattr(val, 'raw') or hasattr(val, 'value')):
|
|
# Show structure header
|
|
indent = " " * _depth
|
|
self._add_detail_row(f"{indent}{name}", "")
|
|
# Recurse
|
|
self._extract_message_fields(val, prefix=full_name, _depth=_depth+1, max_depth=max_depth)
|
|
continue
|
|
|
|
# Scalar value or enum - show with proper formatting
|
|
indent = " " * (_depth + 1)
|
|
|
|
# Get raw value
|
|
if hasattr(val, 'value'):
|
|
raw_val = val.value
|
|
elif hasattr(val, 'raw'):
|
|
raw_val = val.raw
|
|
else:
|
|
raw_val = val
|
|
|
|
# Try to get enum representation - use ENUM_MAP first (by field name)
|
|
enum_items = None
|
|
try:
|
|
# Try ENUM_MAP first using field name
|
|
enum_cls = self._get_enum_for_field(name)
|
|
if enum_cls:
|
|
enum_items = [(m.name, m.value) for m in enum_cls]
|
|
else:
|
|
# Fallback to class-based lookup
|
|
enum_items = self._get_enum_items(val.__class__.__name__)
|
|
except Exception:
|
|
pass
|
|
|
|
# Format the value display like monitor.py: "ENUM_NAME (value)" or just value
|
|
if enum_items:
|
|
try:
|
|
raw = int(raw_val)
|
|
# Find matching enum name
|
|
enum_name = None
|
|
for (n, v) in enum_items:
|
|
if int(v) == raw:
|
|
enum_name = n
|
|
break
|
|
if enum_name:
|
|
val_str = f"{enum_name} ({raw})"
|
|
else:
|
|
val_str = str(raw)
|
|
except Exception:
|
|
val_str = str(raw_val)
|
|
else:
|
|
# No enum, just show the value
|
|
val_str = str(raw_val)
|
|
|
|
self._add_detail_row(f"{indent}{name}", val_str)
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Error extracting fields: {e}")
|
|
|
|
def _add_detail_row(self, param, value):
|
|
"""Add a row to details tree."""
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=(param, value))
|
|
except Exception:
|
|
pass
|
|
|
|
def _start_update_loop(self):
|
|
"""Start periodic update loop."""
|
|
self._update_active = True
|
|
self._schedule_update()
|
|
|
|
def _schedule_update(self):
|
|
"""Schedule next update."""
|
|
if self._update_active:
|
|
self._refresh_table()
|
|
|
|
# If a message is selected, refresh its details
|
|
if self._current_selected_label:
|
|
self._update_details(self._current_selected_label)
|
|
|
|
# Schedule next update (500ms)
|
|
self.after(500, self._schedule_update)
|
|
|
|
def stop(self):
|
|
"""Stop update loop."""
|
|
self._update_active = False
|