1857 lines
74 KiB
Python
1857 lines
74 KiB
Python
"""Tkinter GUI monitor for PyBusMonitor1553.
|
|
|
|
Minimal GUI with:
|
|
- Treeview showing registered messages (label, CW.raw, size)
|
|
- Text log for raw sent/received events
|
|
- Buttons: `Initialize`, `Start`, `Stop`, `Refresh`
|
|
|
|
This GUI uses the ARTOS-compliant BusMonitorCore API (same as ARTOS Collector).
|
|
All code, comments and UI text are English.
|
|
"""
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext, messagebox
|
|
# Import BusMonitorCore lazily to avoid import-time errors
|
|
BusMonitorCore = None
|
|
import threading
|
|
import time
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
# Try to import the user's external tkinter logger. If not available, add externals path.
|
|
try:
|
|
from tkinter_logger import TkinterLogger, get_logger
|
|
except Exception:
|
|
try:
|
|
externals_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-tkinter-logger'))
|
|
if externals_path not in sys.path:
|
|
sys.path.insert(0, externals_path)
|
|
from tkinter_logger import TkinterLogger, get_logger
|
|
except Exception:
|
|
TkinterLogger = None
|
|
get_logger = None
|
|
|
|
# Optional resource monitor (externals/python-resource-monitor)
|
|
TkinterResourceMonitor = None
|
|
try:
|
|
externals_rm = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-resource-monitor'))
|
|
if externals_rm not in sys.path:
|
|
sys.path.insert(0, externals_rm)
|
|
# Attempt to import the Tkinter integration class
|
|
try:
|
|
from resource_monitor import TkinterResourceMonitor # module provides class at top-level
|
|
except Exception:
|
|
# fallback to package-like import
|
|
try:
|
|
from target_simulator.utils.resource_monitor import TkinterResourceMonitor
|
|
except Exception:
|
|
TkinterResourceMonitor = None
|
|
except Exception:
|
|
TkinterResourceMonitor = None
|
|
|
|
|
|
class MonitorApp(tk.Frame):
|
|
def __init__(self, master=None):
|
|
super().__init__(master)
|
|
self.master = master
|
|
self.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Use BusMonitorCore (ARTOS API) - same as ARTOS Collector will use
|
|
# The application will remain idle until the user presses `Initialize`.
|
|
self.bus_monitor = None
|
|
self.import_error = None
|
|
self.create_widgets()
|
|
self.update_loop_running = False
|
|
# cache tree item ids by message label for incremental updates
|
|
self._tree_items = {}
|
|
# parents we want to keep always expanded in detail tree
|
|
self._always_open_parents = {}
|
|
# No background bind or retry at startup; user must press Initialize.
|
|
|
|
def create_widgets(self):
|
|
# Top Controls Frame
|
|
controls = tk.LabelFrame(self, text="Controls")
|
|
controls.pack(side=tk.TOP, fill=tk.X, padx=6, pady=6)
|
|
|
|
btn_fr = tk.Frame(controls)
|
|
btn_fr.pack(side=tk.LEFT, padx=4, pady=4)
|
|
self.init_btn = tk.Button(btn_fr, text="Initialize", command=self.on_init)
|
|
self.init_btn.pack(side=tk.LEFT, padx=4)
|
|
self.start_btn = tk.Button(btn_fr, text="Start", command=self.on_start)
|
|
self.start_btn.pack(side=tk.LEFT, padx=4)
|
|
self.stop_btn = tk.Button(btn_fr, text="Stop", command=self.on_stop)
|
|
self.stop_btn.pack(side=tk.LEFT, padx=4)
|
|
self.refresh_btn = tk.Button(btn_fr, text="Refresh", command=self.refresh_messages)
|
|
self.refresh_btn.pack(side=tk.LEFT, padx=4)
|
|
# Start/Stop are disabled until initialization is completed
|
|
try:
|
|
self.start_btn.config(state=tk.DISABLED)
|
|
self.stop_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
|
|
# Filters inside controls
|
|
filter_fr = tk.Frame(controls)
|
|
filter_fr.pack(side=tk.RIGHT, padx=4, pady=4)
|
|
tk.Label(filter_fr, text="Label filter:").pack(side=tk.LEFT)
|
|
self.filter_entry = tk.Entry(filter_fr, width=20)
|
|
self.filter_entry.pack(side=tk.LEFT, padx=4)
|
|
tk.Label(filter_fr, text="Period min (ms):").pack(side=tk.LEFT, padx=(8,0))
|
|
self.period_min = tk.Entry(filter_fr, width=8)
|
|
self.period_min.pack(side=tk.LEFT, padx=4)
|
|
tk.Label(filter_fr, text="max(ms):").pack(side=tk.LEFT)
|
|
self.period_max = tk.Entry(filter_fr, width=8)
|
|
self.period_max.pack(side=tk.LEFT, padx=4)
|
|
tk.Button(filter_fr, text="Apply", command=self.refresh_messages).pack(side=tk.LEFT, padx=6)
|
|
|
|
# Middle: Bus Monitor (left) and Details (right)
|
|
middle = tk.Frame(self)
|
|
middle.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=6, pady=2)
|
|
|
|
bus_frame = tk.LabelFrame(middle, text="Bus Monitor")
|
|
bus_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0,6))
|
|
|
|
# Message table (columns per screenshot)
|
|
columns = ("label", "cw", "sw", "num", "errs", "period", "wc", "mc")
|
|
self.tree = ttk.Treeview(bus_frame, columns=columns, show="headings", height=14)
|
|
self.tree.heading("label", text="Name")
|
|
self.tree.heading("cw", text="CW")
|
|
self.tree.heading("sw", text="SW")
|
|
self.tree.heading("num", text="Num")
|
|
self.tree.heading("errs", text="Errs")
|
|
self.tree.heading("period", text="period")
|
|
self.tree.heading("wc", text="wc")
|
|
self.tree.heading("mc", text="MC")
|
|
self.tree.column("label", width=100)
|
|
self.tree.column("cw", width=80)
|
|
self.tree.column("sw", width=60)
|
|
self.tree.column("num", width=60)
|
|
self.tree.column("errs", width=60)
|
|
self.tree.column("period", width=80)
|
|
self.tree.column("wc", width=40)
|
|
self.tree.column("mc", width=60)
|
|
self.tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
|
|
|
|
# details pane moved to dedicated submodule
|
|
try:
|
|
from .details_pane import DetailsPane
|
|
DETAILS_WIDTH = 520
|
|
DETAILS_HEIGHT = 540
|
|
self._details = DetailsPane(middle, app=self, width=DETAILS_WIDTH, height=DETAILS_HEIGHT)
|
|
# ensure attributes expected by existing methods exist
|
|
self.form_widgets = getattr(self, 'form_widgets', {})
|
|
self.current_form_label = getattr(self, 'current_form_label', None)
|
|
self.current_msg_wrapper = getattr(self, 'current_msg_wrapper', None)
|
|
self.detail_rows = getattr(self, 'detail_rows', {})
|
|
self.detail_selected_label = getattr(self, 'detail_selected_label', None)
|
|
self.detail_accessors = getattr(self, 'detail_accessors', {})
|
|
self._flash_items = getattr(self, '_flash_items', {})
|
|
self._last_detail_update = 0.0
|
|
try:
|
|
self.detail_tree.tag_configure('changed', background='lightyellow')
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
# fallback: create minimal placeholder frame if import fails
|
|
details_frame = tk.LabelFrame(middle, text="Details")
|
|
details_frame.pack(side=tk.RIGHT, fill=tk.Y, expand=False, padx=6)
|
|
tk.Label(details_frame, text="Message details:").pack(anchor=tk.W)
|
|
self.detail_tree = ttk.Treeview(details_frame, columns=("param","value"), show="headings")
|
|
self.detail_tree.heading('param', text='Parameter')
|
|
self.detail_tree.heading('value', text='Value')
|
|
self.detail_tree.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Bottom: Log area
|
|
log_frame = tk.LabelFrame(self, text="Log")
|
|
log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=False, padx=6, pady=6)
|
|
self.log = scrolledtext.ScrolledText(log_frame, height=8)
|
|
self.log.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
|
|
|
|
# Status bar will be added below the whole UI (outside all frames)
|
|
|
|
# Initialize external TkinterLogger if available, attach text widget as handler
|
|
self.logger_system = None
|
|
try:
|
|
if TkinterLogger is not None:
|
|
self.logger_system = TkinterLogger(self.master)
|
|
self.logger_system.setup(enable_console=True, enable_tkinter=True)
|
|
self.logger_system.add_tkinter_handler(self.log)
|
|
self.logger = logging.getLogger(__name__)
|
|
else:
|
|
logging.basicConfig(level=logging.INFO)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info("TkinterLogger not available; using basic logging")
|
|
except Exception as e:
|
|
logging.basicConfig(level=logging.INFO)
|
|
self.logger = logging.getLogger(__name__)
|
|
try:
|
|
self.log.insert(tk.END, f"Logger init error: {e}\n")
|
|
except Exception:
|
|
pass
|
|
|
|
# Bind selection
|
|
self.tree.bind('<<TreeviewSelect>>', self.on_tree_select)
|
|
|
|
# If import of connection manager failed, disable init/start
|
|
if getattr(self, 'import_error', None):
|
|
try:
|
|
self.init_btn.config(state=tk.DISABLED)
|
|
self.start_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
|
|
# Add a global status bar below everything
|
|
self.global_status = tk.Frame(self.master, relief=tk.SUNKEN, bd=1)
|
|
# left / center / right panes
|
|
self.status_left = tk.Label(self.global_status, text="", anchor=tk.W)
|
|
self.status_center = tk.Label(self.global_status, text="", anchor=tk.CENTER)
|
|
# right pane: may use a StringVar updated by external resource monitor
|
|
self.status_resource_var = tk.StringVar()
|
|
self.status_right = tk.Label(self.global_status, textvariable=self.status_resource_var, anchor=tk.E)
|
|
self.status_left.pack(side=tk.LEFT, padx=6)
|
|
self.status_center.pack(side=tk.LEFT, expand=True)
|
|
self.status_right.pack(side=tk.RIGHT, padx=6)
|
|
# pack at the bottom of the main window (below this frame)
|
|
self.global_status.pack(side=tk.BOTTOM, fill=tk.X)
|
|
|
|
# If there was an import error, auto-open diagnostics and set status
|
|
if getattr(self, 'import_error', None):
|
|
self.open_diagnostics(auto=True)
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
self.update_status()
|
|
# Start resource monitor if available
|
|
self._resource_monitor = None
|
|
try:
|
|
if TkinterResourceMonitor is not None:
|
|
# create and start monitor with a 1s poll interval
|
|
try:
|
|
self._resource_monitor = TkinterResourceMonitor(self.master, self.status_resource_var, poll_interval=1.0)
|
|
# start() returns True if psutil available and monitor started
|
|
try:
|
|
self._resource_monitor.start()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
self._resource_monitor = None
|
|
except Exception:
|
|
self._resource_monitor = None
|
|
|
|
def _get_message_db(self):
|
|
"""Get MessageDB via BusMonitorCore (ARTOS API)."""
|
|
if self.bus_monitor is None:
|
|
return None
|
|
|
|
# MessageDB is cached in BusMonitorCore after initialize()
|
|
# This ensures we use the SAME MessageDB instance that ConnectionManager uses
|
|
if self.bus_monitor._messagedb is not None:
|
|
return self.bus_monitor._messagedb
|
|
|
|
# Fallback: try to import directly if not cached yet
|
|
try:
|
|
import importlib
|
|
try:
|
|
mod = importlib.import_module('Grifo_E_1553lib.messages.messages')
|
|
except Exception:
|
|
mod = importlib.import_module('pybusmonitor1553.Grifo_E_1553lib.messages.messages')
|
|
return getattr(mod, 'MessageDB', None)
|
|
except Exception as e:
|
|
self.import_error = str(e)
|
|
return None
|
|
|
|
def _fmt_enum(self, val, enum_names):
|
|
"""Format a numeric `val` with possible enum name(s).
|
|
|
|
`enum_names` is a list of candidate enum class names to try in the
|
|
`Grifo_E_1553lib.data_types.enums` module. Returns a string like
|
|
"3 (RWS)" or the original value if no mapping found.
|
|
"""
|
|
# delegate to helper to keep monitor.py small
|
|
try:
|
|
from .monitor_helpers import fmt_enum
|
|
return fmt_enum(val, enum_names)
|
|
except Exception:
|
|
return str(val)
|
|
|
|
def _format_ctypes_obj(self, obj, indent=0, max_depth=3):
|
|
"""Recursively format ctypes-backed message fields into readable text.
|
|
|
|
- Prints simple ctypes numbers as integers
|
|
- For nested objects, recurses up to `max_depth`
|
|
- Uses `_fmt_enum` to annotate enum-like values where possible
|
|
"""
|
|
try:
|
|
from .monitor_helpers import format_ctypes_obj
|
|
return format_ctypes_obj(obj, indent=indent, max_depth=max_depth)
|
|
except Exception:
|
|
pad = ' ' * indent
|
|
return f"{pad}{repr(obj)}\n"
|
|
|
|
def _format_value_for_table(self, val):
|
|
"""Return a short string suitable for the details table's value column."""
|
|
try:
|
|
from .monitor_helpers import format_value_for_table
|
|
return format_value_for_table(val)
|
|
except Exception:
|
|
try:
|
|
if val is None:
|
|
return '<unset>'
|
|
return str(val)
|
|
except Exception:
|
|
return '<unrepresentable>'
|
|
|
|
def _get_enum_items(self, class_name):
|
|
"""Return a list of (name, value) tuples for an enum class name, or None."""
|
|
try:
|
|
from .monitor_helpers import get_enum_items
|
|
return get_enum_items(class_name)
|
|
except Exception:
|
|
return None
|
|
|
|
def _get_enum_for_field(self, field_name):
|
|
"""Return enum class for field_name from ENUM_MAP, or None."""
|
|
try:
|
|
import Grifo_E_1553lib.data_types.enum_map as em
|
|
return em.ENUM_MAP.get(field_name)
|
|
except Exception:
|
|
return None
|
|
|
|
def show_message_form(self, label: str, editable: bool = True):
|
|
"""Build an editable form for message `label` (A messages).
|
|
|
|
The form is placed into `self.detail_form_container`. Existing form
|
|
widgets are destroyed and rebuilt for the new message.
|
|
"""
|
|
try:
|
|
# Use the form container for showing message details; do not toggle the
|
|
# tree visibility here. The DetailsPane places the form inside a
|
|
# scrollable canvas so it remains visible while updating.
|
|
# if same form is already shown, refresh widget values safely
|
|
if getattr(self, 'current_form_label', None) == label and self.form_widgets:
|
|
try:
|
|
# use the existing helper which fetches fresh wrapper and
|
|
# updates widgets without rebuilding the form
|
|
self._refresh_and_update_form()
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
mdb = self._get_message_db()
|
|
if mdb is None:
|
|
try:
|
|
self.logger.warning(f"show_message_form: MessageDB not available for {label}")
|
|
except Exception:
|
|
pass
|
|
return
|
|
try:
|
|
msg_wrapper = mdb.getMessage(label)
|
|
except Exception:
|
|
allm = mdb.getAllMessages()
|
|
msg_wrapper = allm.get(label)
|
|
if not msg_wrapper:
|
|
try:
|
|
self.logger.warning(f"show_message_form: No wrapper found for {label}")
|
|
except Exception:
|
|
pass
|
|
return
|
|
msg = getattr(msg_wrapper, 'message', None)
|
|
if msg is None:
|
|
try:
|
|
self.logger.warning(f"show_message_form: Wrapper for {label} has no message")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Log successful retrieval
|
|
try:
|
|
self.logger.info(f"Building form for {label}, message type: {type(msg)}")
|
|
except Exception:
|
|
pass
|
|
|
|
# clear previous form only when we know we're going to build a new one
|
|
for w in self.detail_form_container.winfo_children():
|
|
try:
|
|
w.destroy()
|
|
except Exception:
|
|
pass
|
|
self.form_widgets = {}
|
|
self.current_form_label = label
|
|
self.current_msg_wrapper = msg_wrapper # Store wrapper for updates
|
|
|
|
# header with Apply button at top
|
|
hdr = tk.Frame(self.detail_form_container)
|
|
hdr.pack(fill=tk.X, padx=6, pady=(6,0))
|
|
lbl_hdr = tk.Label(hdr, text=f"Edit message {label}", anchor=tk.W)
|
|
lbl_hdr.pack(side=tk.LEFT)
|
|
# For editable forms show Apply button, otherwise present a label
|
|
if editable:
|
|
apply_btn = tk.Button(hdr, text='Apply', command=lambda: self._apply_form_values(msg_wrapper))
|
|
apply_btn.pack(side=tk.RIGHT, padx=6)
|
|
else:
|
|
tk.Label(hdr, text='Read-only', anchor=tk.E).pack(side=tk.RIGHT, padx=6)
|
|
|
|
# recursive builder for fields
|
|
frm = tk.Frame(self.detail_form_container)
|
|
frm.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
|
|
self._build_form_fields(frm, msg, prefix='', editable=editable)
|
|
|
|
# IMPORTANT: Update values after building widgets
|
|
try:
|
|
self._update_form_values(msg_wrapper)
|
|
self.logger.info(f"Form values updated for {label}, {len(self.form_widgets)} widgets")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to update form values: {e}")
|
|
|
|
# show form container
|
|
try:
|
|
self.detail_form_container.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
try:
|
|
self.log.insert(tk.END, f"Error building form: {e}\n")
|
|
self.logger.error(f"Error building form for {label}: {e}")
|
|
except Exception:
|
|
pass
|
|
|
|
def _build_form_fields(self, parent, obj, prefix='', max_depth=3, _depth=0, editable=True):
|
|
"""Recursively create labeled widgets for public fields of obj."""
|
|
if _depth > max_depth:
|
|
return
|
|
row = 0
|
|
# helper to detect struct-like ctypes objects
|
|
def _is_struct_like(v):
|
|
if v is None:
|
|
return False
|
|
if isinstance(v, (int, float, str, bytes)):
|
|
return False
|
|
if callable(v):
|
|
return False
|
|
# ctypes.Structure expose _fields_
|
|
if hasattr(v, '_fields_'):
|
|
return True
|
|
# objects with many public attributes are likely struct-like
|
|
public = [n for n in dir(v) if not n.startswith('_')]
|
|
if len(public) > 2:
|
|
return True
|
|
return False
|
|
|
|
for name in [n for n in dir(obj) if not n.startswith('_')]:
|
|
try:
|
|
val = getattr(obj, name)
|
|
except Exception:
|
|
continue
|
|
if callable(val):
|
|
continue
|
|
full = f"{prefix}.{name}" if prefix else name
|
|
# Special-case: ctypes Union with a '.str' structure (bitfields)
|
|
if hasattr(val, 'str') and _is_struct_like(getattr(val, 'str')):
|
|
subfrm = ttk.LabelFrame(parent, text=name)
|
|
subfrm.pack(fill=tk.X, padx=6, pady=4)
|
|
# recurse into the .str sub-structure - prefix must include .str for correct value resolution
|
|
try:
|
|
self._build_form_fields(subfrm, getattr(val, 'str'), prefix=f"{full}.str", max_depth=max_depth, _depth=_depth+1)
|
|
except Exception:
|
|
pass
|
|
continue
|
|
|
|
# group nested structs (ctypes structs or objects with many public attrs)
|
|
if _is_struct_like(val) and not (hasattr(val, 'raw') or hasattr(val, 'value')):
|
|
subfrm = ttk.LabelFrame(parent, text=name)
|
|
subfrm.pack(fill=tk.X, padx=6, pady=4)
|
|
self._build_form_fields(subfrm, val, prefix=full, max_depth=max_depth, _depth=_depth+1)
|
|
continue
|
|
# create label + widget
|
|
row_fr = tk.Frame(parent)
|
|
row_fr.pack(fill=tk.X, padx=2, pady=1)
|
|
lbl = tk.Label(row_fr, text=name, width=28, anchor=tk.W)
|
|
lbl.pack(side=tk.LEFT)
|
|
# enum choices? Check ENUM_MAP using field name
|
|
enum_items = None
|
|
try:
|
|
# Try ENUM_MAP first using field name
|
|
enum_cls = self._get_enum_for_field(name)
|
|
if enum_cls:
|
|
enum_items = [(m.name, m.value) for m in enum_cls]
|
|
else:
|
|
# Fallback to class-based lookup
|
|
enum_items = self._get_enum_items(val.__class__.__name__)
|
|
except Exception:
|
|
enum_items = None
|
|
widget = None
|
|
if enum_items:
|
|
# build combobox with names
|
|
# show human-friendly label first, then numeric value
|
|
choices = [f"{n} ({v})" for (n, v) in enum_items]
|
|
var = tk.StringVar()
|
|
# combobox is selectable for editable forms, otherwise disabled
|
|
cb_state = 'readonly' if editable else 'disabled'
|
|
cb = ttk.Combobox(row_fr, values=choices, textvariable=var, state=cb_state)
|
|
# set current value
|
|
try:
|
|
raw = int(getattr(val, 'raw', getattr(val, 'value', val)))
|
|
# find matching choice
|
|
sel = None
|
|
for (n, v) in enum_items:
|
|
if int(v) == raw:
|
|
sel = f"{n} ({v})"
|
|
break
|
|
if sel is not None:
|
|
var.set(sel)
|
|
except Exception:
|
|
pass
|
|
cb.pack(side=tk.RIGHT, fill=tk.X, expand=True)
|
|
widget = ('combobox', cb, enum_items)
|
|
else:
|
|
# numeric or text entry
|
|
var = tk.StringVar()
|
|
try:
|
|
cur = getattr(val, 'raw', getattr(val, 'value', val))
|
|
if cur is None:
|
|
var.set('')
|
|
else:
|
|
var.set(str(cur))
|
|
except Exception:
|
|
var.set('')
|
|
ent = tk.Entry(row_fr, textvariable=var)
|
|
# disable entry if not editable
|
|
if not editable:
|
|
try:
|
|
ent.config(state='disabled')
|
|
except Exception:
|
|
pass
|
|
ent.pack(side=tk.RIGHT, fill=tk.X, expand=True)
|
|
widget = ('entry', ent)
|
|
|
|
# register created widget for later updates; values will be filled
|
|
# by `_update_form_values` when a message wrapper is available.
|
|
try:
|
|
self.form_widgets[full] = widget
|
|
except Exception:
|
|
pass
|
|
|
|
def _apply_form_values(self, msg_wrapper):
|
|
"""Read widgets from `self.form_widgets` and apply values to message."""
|
|
for full, widget in list(self.form_widgets.items()):
|
|
try:
|
|
if widget[0] == 'combobox':
|
|
cb = widget[1]
|
|
sel = cb.get()
|
|
if not sel:
|
|
continue
|
|
# sel like 'NAME (3)'; try to extract numeric value inside parentheses
|
|
try:
|
|
start = sel.rfind('(')
|
|
end = sel.rfind(')')
|
|
if start != -1 and end != -1 and end > start:
|
|
num = sel[start+1:end].strip()
|
|
val = int(num)
|
|
else:
|
|
# fallback to first token
|
|
num = sel.split()[0]
|
|
val = int(num)
|
|
except Exception:
|
|
val = sel
|
|
else:
|
|
ent = widget[1]
|
|
txt = ent.get()
|
|
val = self._coerce_text_to_type(txt)
|
|
# apply
|
|
try:
|
|
self._apply_edit_to_msg(full, str(val), msg_wrapper)
|
|
except Exception as e:
|
|
try:
|
|
self.log.insert(tk.END, f"Failed to set {full}: {e}\n")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, "Applied form values\n")
|
|
except Exception:
|
|
pass
|
|
|
|
def _add_detail_row(self, name: str, value: str, accessor=None):
|
|
"""Insert or update a row in the details table (param -> value)."""
|
|
try:
|
|
# support hierarchical names separated by '.' or '/'
|
|
def _ensure_hierarchy(full_name):
|
|
parts = [p for p in full_name.replace('/', '.').split('.') if p != '']
|
|
parent = ''
|
|
acc = []
|
|
for p in parts[:-1]:
|
|
acc.append(p)
|
|
key = '.'.join(acc)
|
|
if key in self.detail_rows:
|
|
parent = self.detail_rows[key]
|
|
continue
|
|
# create parent node with empty value
|
|
try:
|
|
iid = self.detail_tree.insert(parent, tk.END, values=(p, ''))
|
|
# ensure parent is expanded so nested fields remain visible
|
|
try:
|
|
self.detail_tree.item(iid, open=True)
|
|
self._always_open_parents[key] = True
|
|
except Exception:
|
|
pass
|
|
self.detail_rows[key] = iid
|
|
parent = iid
|
|
except Exception:
|
|
# fallback: try to insert at root
|
|
try:
|
|
iid = self.detail_tree.insert('', tk.END, values=(p, ''))
|
|
self.detail_rows[key] = iid
|
|
parent = iid
|
|
except Exception:
|
|
parent = ''
|
|
leaf = parts[-1] if parts else full_name
|
|
leaf_parent_key = '.'.join(parts[:-1]) if parts[:-1] else ''
|
|
leaf_parent = self.detail_rows.get(leaf_parent_key, '')
|
|
return leaf_parent, leaf
|
|
|
|
if name in self.detail_rows:
|
|
iid = self.detail_rows[name]
|
|
try:
|
|
# keep the param column stable; show only the leaf label
|
|
leaf = name.split('.')[-1]
|
|
parent_key = '.'.join(name.replace('/', '.').split('.')[:-1])
|
|
if parent_key and parent_key in self.detail_rows:
|
|
disp = f"-> {leaf}"
|
|
else:
|
|
disp = leaf
|
|
self.detail_tree.item(iid, values=(disp, value))
|
|
# if this row represents a parent that we marked to remain open,
|
|
# ensure it's still expanded after value updates
|
|
try:
|
|
parent_key = name
|
|
if parent_key in self._always_open_parents:
|
|
self.detail_tree.item(iid, open=True)
|
|
except Exception:
|
|
pass
|
|
if accessor is not None:
|
|
self.detail_accessors[name] = accessor
|
|
return iid
|
|
except Exception:
|
|
pass
|
|
|
|
# insert hierarchical node
|
|
parent_iid, leaf_label = _ensure_hierarchy(name)
|
|
try:
|
|
leaf_parent_key = '.'.join(name.replace('/', '.').split('.')[:-1])
|
|
if leaf_parent_key:
|
|
disp_label = f"-> {leaf_label}"
|
|
else:
|
|
disp_label = leaf_label
|
|
iid = self.detail_tree.insert(parent_iid, tk.END, values=(disp_label, value))
|
|
self.detail_rows[name] = iid
|
|
if accessor is not None:
|
|
self.detail_accessors[name] = accessor
|
|
return iid
|
|
except Exception:
|
|
try:
|
|
leaf_parent_key = '.'.join(name.replace('/', '.').split('.')[:-1])
|
|
if leaf_parent_key:
|
|
disp_label = f"-> {leaf_label}"
|
|
else:
|
|
disp_label = leaf_label
|
|
iid = self.detail_tree.insert('', tk.END, values=(disp_label, value))
|
|
self.detail_rows[name] = iid
|
|
if accessor is not None:
|
|
self.detail_accessors[name] = accessor
|
|
return iid
|
|
except Exception:
|
|
return None
|
|
except Exception:
|
|
try:
|
|
# best-effort: insert without tracking
|
|
# best-effort hierarchical insert
|
|
parent_iid, leaf_label = (lambda n: ('' , n))(name)
|
|
try:
|
|
leaf_parent_key = '.'.join(name.replace('/', '.').split('.')[:-1])
|
|
if leaf_parent_key:
|
|
disp_label = f"-> {leaf_label}"
|
|
else:
|
|
disp_label = leaf_label
|
|
iid = self.detail_tree.insert(parent_iid, tk.END, values=(disp_label, str(value)))
|
|
if accessor is not None:
|
|
self.detail_accessors[name] = accessor
|
|
return iid
|
|
except Exception:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def _add_detail_rows_from_obj(self, prefix: str, obj, max_depth: int = 4, _depth: int = 0):
|
|
"""Recursively add detail rows for public fields of `obj`.
|
|
|
|
- `prefix` is a string prepended to field names (e.g. 'param1').
|
|
- Stops recursing after `max_depth` to avoid noisy dumps.
|
|
- Uses `_format_value_for_table` and `_fmt_enum` to annotate enum-like values.
|
|
"""
|
|
if _depth > max_depth:
|
|
try:
|
|
self._add_detail_row(prefix or '<value>', self._format_value_for_table(obj))
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
if obj is None:
|
|
try:
|
|
self._add_detail_row(prefix or '<value>', '<None>')
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# primitive types
|
|
if isinstance(obj, (int, float, str)):
|
|
try:
|
|
self._add_detail_row(prefix or '<value>', str(obj))
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Enum-like or ctypes scalar handling
|
|
def _is_struct_like_local(v):
|
|
# heuristics similar to form builder: ctypes.Structure or many public attrs
|
|
if v is None:
|
|
return False
|
|
if isinstance(v, (int, float, str, bytes)):
|
|
return False
|
|
if hasattr(v, '_fields_'):
|
|
return True
|
|
public = [n for n in dir(v) if not n.startswith('_')]
|
|
if len(public) > 2:
|
|
return True
|
|
return False
|
|
|
|
if hasattr(obj, 'raw') or hasattr(obj, 'value'):
|
|
# If the object also looks like a struct (has nested fields), recurse
|
|
if _is_struct_like_local(obj):
|
|
# fall through to attribute iteration to expand nested fields
|
|
pass
|
|
else:
|
|
try:
|
|
s = None
|
|
try:
|
|
raw = int(getattr(obj, 'raw')) if hasattr(obj, 'raw') else int(getattr(obj, 'value'))
|
|
s = self._fmt_enum(raw, [obj.__class__.__name__])
|
|
except Exception:
|
|
try:
|
|
s = str(getattr(obj, 'raw', getattr(obj, 'value', obj)))
|
|
except Exception:
|
|
s = repr(obj)
|
|
self._add_detail_row(prefix or obj.__class__.__name__, s)
|
|
except Exception:
|
|
try:
|
|
self._add_detail_row(prefix or '<value>', repr(obj))
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# dict-like
|
|
try:
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
name = f"{prefix}.{k}" if prefix else str(k)
|
|
self._add_detail_rows_from_obj(name, v, max_depth=max_depth, _depth=_depth+1)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
# sequence
|
|
try:
|
|
if isinstance(obj, (list, tuple)):
|
|
for i, v in enumerate(obj):
|
|
name = f"{prefix}[{i}]" if prefix else f"[{i}]"
|
|
self._add_detail_rows_from_obj(name, v, max_depth=max_depth, _depth=_depth+1)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
# otherwise iterate public attributes
|
|
import inspect
|
|
names = [n for n in dir(obj) if not n.startswith('_')]
|
|
for n in names:
|
|
try:
|
|
v = getattr(obj, n)
|
|
except Exception:
|
|
continue
|
|
field_name = f"{prefix}.{n}" if prefix else n
|
|
|
|
# If attribute is a ctypes Union with a '.str' bitfield, expand it
|
|
try:
|
|
if hasattr(v, 'str') and (hasattr(getattr(v, 'str'), '__dict__') or hasattr(getattr(v, 'str'), '_fields_')):
|
|
try:
|
|
self._add_detail_rows_from_obj(field_name, getattr(v, 'str'), max_depth=max_depth, _depth=_depth+1)
|
|
continue
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
# If callable, try to call zero-arg getters (get_*/is_*) to obtain a value
|
|
if callable(v):
|
|
called = False
|
|
try:
|
|
sig = inspect.signature(v)
|
|
if len(sig.parameters) == 0:
|
|
try:
|
|
val = v()
|
|
self._add_detail_row(field_name, self._format_value_for_table(val))
|
|
called = True
|
|
except Exception:
|
|
called = False
|
|
except Exception:
|
|
# fallback: attempt to call and ignore errors
|
|
try:
|
|
val = v()
|
|
self._add_detail_row(field_name, self._format_value_for_table(val))
|
|
called = True
|
|
except Exception:
|
|
called = False
|
|
if called:
|
|
continue
|
|
# otherwise skip arbitrary callables
|
|
continue
|
|
|
|
# Primitive or scalar-like values
|
|
if isinstance(v, (int, float, str)) or hasattr(v, 'raw') or hasattr(v, 'value'):
|
|
try:
|
|
self._add_detail_row(field_name, self._format_value_for_table(v))
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# Recurse into nested objects
|
|
try:
|
|
self._add_detail_rows_from_obj(field_name, v, max_depth=max_depth, _depth=_depth+1)
|
|
except Exception:
|
|
pass
|
|
|
|
def on_init(self):
|
|
# On-demand import/creation of BusMonitorCore when user requests initialization.
|
|
# This is the ARTOS-compliant API that ARTOS Collector will also use.
|
|
if self.bus_monitor is None:
|
|
try:
|
|
from pybusmonitor1553.core import BusMonitorCore
|
|
self.bus_monitor = BusMonitorCore()
|
|
self.import_error = None
|
|
except Exception as e:
|
|
# record import/bind error and show diagnostics
|
|
self.import_error = str(e)
|
|
try:
|
|
self.open_diagnostics(auto=False)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
try:
|
|
# Build config from environment variables (same as ARTOS would do)
|
|
config = {
|
|
'ip': os.getenv('PYBM_RX_IP', '127.0.0.1'),
|
|
'send_port': int(os.getenv('PYBM_TX_PORT', '5001')),
|
|
'recv_port': int(os.getenv('PYBM_RX_PORT', '5002'))
|
|
}
|
|
|
|
# Call ARTOS initialize() method
|
|
success = self.bus_monitor.initialize(config)
|
|
if not success:
|
|
status = self.bus_monitor.get_status()
|
|
error_msg = status.get('errors', ['Unknown error'])[0] if status.get('errors') else 'Initialization failed'
|
|
messagebox.showerror("Initialization Error", error_msg)
|
|
self.import_error = error_msg
|
|
return
|
|
|
|
try:
|
|
self.logger.info("Initialization completed.")
|
|
except Exception:
|
|
try:
|
|
self.log.insert(tk.END, "Initialization completed.\n")
|
|
except Exception:
|
|
pass
|
|
self.refresh_messages()
|
|
try:
|
|
self.start_btn.config(state=tk.NORMAL)
|
|
self.stop_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|
|
|
|
def on_start(self):
|
|
if getattr(self, 'import_error', None):
|
|
messagebox.showerror("Error", f"Connection library error: {self.import_error}")
|
|
return
|
|
try:
|
|
# Use ARTOS start_session() method
|
|
self.bus_monitor.start_session()
|
|
try:
|
|
self.logger.info("Start send/receive")
|
|
except Exception:
|
|
self.log.insert(tk.END, "Start send/receive\n")
|
|
|
|
# Force a refresh to show messages
|
|
self.refresh_messages()
|
|
|
|
if not self.update_loop_running:
|
|
self.update_loop_running = True
|
|
threading.Thread(target=self.periodic_update, daemon=True).start()
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|
|
try:
|
|
self.start_btn.config(state=tk.DISABLED)
|
|
self.stop_btn.config(state=tk.NORMAL)
|
|
except Exception:
|
|
pass
|
|
|
|
def on_stop(self):
|
|
if getattr(self, 'import_error', None):
|
|
# nothing to stop
|
|
try:
|
|
self.logger.info("Stop requested (no connection active)")
|
|
except Exception:
|
|
self.log.insert(tk.END, "Stop requested (no connection active)\n")
|
|
return
|
|
# Use ARTOS stop_session() method
|
|
self.bus_monitor.stop_session()
|
|
try:
|
|
self.logger.info("Stop requested")
|
|
except Exception:
|
|
self.log.insert(tk.END, "Stop requested\n")
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.start_btn.config(state=tk.NORMAL)
|
|
self.stop_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
|
|
def open_diagnostics(self, auto: bool = False):
|
|
# Diagnostics dialog with error details and Retry.
|
|
# When auto=True (startup auto-open), do not force modal grab so
|
|
# the GUI can finish initialization without blocking.
|
|
dlg = tk.Toplevel(self.master)
|
|
dlg.title("Diagnostics")
|
|
dlg.transient(self.master)
|
|
if not auto:
|
|
try:
|
|
dlg.grab_set()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
dlg.focus_force()
|
|
except Exception:
|
|
pass
|
|
|
|
# Center the dialog over the main window
|
|
try:
|
|
self.master.update_idletasks()
|
|
mx = self.master.winfo_rootx()
|
|
my = self.master.winfo_rooty()
|
|
mw = self.master.winfo_width()
|
|
mh = self.master.winfo_height()
|
|
w = 420
|
|
h = 140
|
|
x = mx + max(0, (mw - w) // 2)
|
|
y = my + max(0, (mh - h) // 2)
|
|
dlg.geometry(f"{w}x{h}+{x}+{y}")
|
|
except Exception:
|
|
pass
|
|
|
|
tk.Label(dlg, text="Connection diagnostics", font=(None, 12, 'bold')).pack(padx=12, pady=(8,4))
|
|
|
|
# Determine likely UDP port (allow override via env)
|
|
try:
|
|
port = int(os.environ.get('PYBM_RX_PORT', '61553'))
|
|
except Exception:
|
|
port = 61553
|
|
|
|
# Simple user-facing message
|
|
msg_text = f"The UDP port {port} is likely already in use by another application.\nClose this application to free the port."
|
|
lbl = tk.Label(dlg, text=msg_text, wraplength=380, justify=tk.LEFT)
|
|
lbl.pack(padx=12, pady=(4,8), expand=True, fill=tk.BOTH)
|
|
|
|
btn_fr = tk.Frame(dlg)
|
|
btn_fr.pack(pady=8)
|
|
close_btn = tk.Button(btn_fr, text="Close", command=lambda: self.master.destroy())
|
|
close_btn.pack(side=tk.LEFT, padx=6)
|
|
|
|
def _retry_and_update(self, dialog):
|
|
# Attempt to reload connection manager and clear import_error if successful
|
|
try:
|
|
import importlib, sys
|
|
# reload BusMonitorCore to re-run import logic
|
|
if 'pybusmonitor1553.core.bus_monitor_core' in sys.modules:
|
|
core = sys.modules['pybusmonitor1553.core.bus_monitor_core']
|
|
importlib.reload(core)
|
|
else:
|
|
core = importlib.import_module('pybusmonitor1553.core.bus_monitor_core')
|
|
# try to obtain BusMonitorCore
|
|
BusMonitorCore = getattr(core, 'BusMonitorCore', None)
|
|
if BusMonitorCore is None:
|
|
raise RuntimeError('bus_monitor_core missing BusMonitorCore')
|
|
self.bus_monitor = BusMonitorCore()
|
|
self.import_error = None
|
|
try:
|
|
self.init_btn.config(state=tk.NORMAL)
|
|
self.start_btn.config(state=tk.NORMAL)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
messagebox.showinfo('Diagnostics', 'BusMonitor modules reloaded successfully.')
|
|
except Exception:
|
|
pass
|
|
try:
|
|
dialog.destroy()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
# update dialog text with latest error
|
|
try:
|
|
for w in dialog.winfo_children():
|
|
if isinstance(w, tk.Text):
|
|
w.config(state=tk.NORMAL)
|
|
w.delete('1.0', tk.END)
|
|
w.insert(tk.END, f"Retry failed: {e}\n\nSuggested actions:\n- Close other app using UDP port 61553\n- Update PYBM_RX_PORT/PYBM_RX_IP if needed\n- Restart this app after freeing the port\n")
|
|
w.config(state=tk.DISABLED)
|
|
break
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
|
|
def refresh_messages(self):
|
|
# Incremental tree update: preserve selection and avoid full redraw
|
|
try:
|
|
mdb = self._get_message_db()
|
|
if mdb is None:
|
|
return
|
|
|
|
msgs = mdb.getAllMessages()
|
|
if not msgs:
|
|
return
|
|
|
|
lbl_filter = self.filter_entry.get().strip().lower()
|
|
pmin = pmax = None
|
|
try:
|
|
if self.period_min.get().strip() != "":
|
|
pmin = float(self.period_min.get().strip())
|
|
if self.period_max.get().strip() != "":
|
|
pmax = float(self.period_max.get().strip())
|
|
except Exception:
|
|
pmin = pmax = None
|
|
|
|
# preserve current selection label (if any)
|
|
cur_sel = None
|
|
sel = self.tree.selection()
|
|
if sel:
|
|
try:
|
|
cur_item = sel[0]
|
|
cur_vals = self.tree.item(cur_item, 'values')
|
|
if cur_vals:
|
|
cur_sel = cur_vals[0]
|
|
except Exception:
|
|
cur_sel = None
|
|
|
|
# mark seen labels to remove stale rows afterwards
|
|
seen = set()
|
|
|
|
for k, v in msgs.items():
|
|
if lbl_filter and lbl_filter not in k.lower():
|
|
continue
|
|
|
|
# compute display fields
|
|
try:
|
|
cw_raw = getattr(v.head.cw, 'raw', None)
|
|
wc = getattr(v.head.cw.str, 'wc', '')
|
|
except Exception:
|
|
cw_raw = None
|
|
wc = ''
|
|
try:
|
|
sw_raw = getattr(v.head, 'sw', '')
|
|
except Exception:
|
|
sw_raw = ''
|
|
num = getattr(v, 'sent_count', getattr(v, 'size', ''))
|
|
try:
|
|
errs = getattr(v.head, 'errcode', '')
|
|
except Exception:
|
|
errs = ''
|
|
period = getattr(v, '_time_ms', None)
|
|
if period is None:
|
|
freq = getattr(v, 'freq', None)
|
|
if freq:
|
|
period = 1000.0 / freq
|
|
if period is not None and pmin is not None and period < pmin:
|
|
continue
|
|
if period is not None and pmax is not None and period > pmax:
|
|
continue
|
|
try:
|
|
mc = getattr(v, 'recv_count', '')
|
|
except Exception:
|
|
mc = ''
|
|
|
|
values = (k, hex(cw_raw) if cw_raw is not None else '', sw_raw, num, errs, f"{period:.3f}" if period else '', wc, mc)
|
|
|
|
# update existing row or insert new one
|
|
if k in self._tree_items and self._tree_items[k] in self.tree.get_children(''):
|
|
try:
|
|
self.tree.item(self._tree_items[k], values=values)
|
|
except Exception:
|
|
# fallback: recreate item
|
|
try:
|
|
self.tree.delete(self._tree_items[k])
|
|
except Exception:
|
|
pass
|
|
iid = self.tree.insert('', tk.END, values=values)
|
|
self._tree_items[k] = iid
|
|
else:
|
|
iid = self.tree.insert('', tk.END, values=values)
|
|
self._tree_items[k] = iid
|
|
|
|
seen.add(k)
|
|
|
|
# remove stale items not present anymore
|
|
stale = [lbl for lbl in list(self._tree_items.keys()) if lbl not in seen]
|
|
for lbl in stale:
|
|
try:
|
|
self.tree.delete(self._tree_items[lbl])
|
|
except Exception:
|
|
pass
|
|
del self._tree_items[lbl]
|
|
|
|
# restore selection if possible
|
|
if cur_sel and cur_sel in self._tree_items:
|
|
try:
|
|
self.tree.selection_set(self._tree_items[cur_sel])
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
def on_tree_select(self, event):
|
|
sel = self.tree.selection()
|
|
if not sel:
|
|
return
|
|
item = sel[0]
|
|
vals = self.tree.item(item, 'values')
|
|
if not vals:
|
|
return
|
|
label = vals[0]
|
|
self.show_message_detail(label)
|
|
|
|
def show_message_detail(self, label: str):
|
|
# build table for this label; detail_tree will be populated and updated
|
|
# Do not clear the form here — only clear when we know we'll show
|
|
# an error message or rebuild the details. Clearing before the
|
|
# MessageDB lookup causes the UI to briefly show then disappear
|
|
# if the lookup fails during a refresh.
|
|
self.detail_rows = {}
|
|
self.detail_selected_label = label
|
|
try:
|
|
mdb = self._get_message_db()
|
|
if mdb is None:
|
|
# clear any previous form/tree content before showing diagnostic
|
|
try:
|
|
for w in getattr(self, 'detail_form_container', []).winfo_children():
|
|
try:
|
|
w.destroy()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("Message DB not available", "Open Diagnostics for details."))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, "Message DB not available (import error). Open Diagnostics for details.\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
# Try direct lookup first
|
|
try:
|
|
msg_wrapper = mdb.getMessage(label)
|
|
except Exception:
|
|
# Try alternative access patterns: getAllMessages() dict lookup
|
|
try:
|
|
allm = mdb.getAllMessages()
|
|
# exact key
|
|
if label in allm:
|
|
msg_wrapper = allm[label]
|
|
else:
|
|
# case-insensitive match or startswith
|
|
found = None
|
|
for k in allm.keys():
|
|
if k.lower() == label.lower() or k.startswith(label):
|
|
found = allm[k]
|
|
break
|
|
msg_wrapper = found
|
|
except Exception:
|
|
msg_wrapper = None
|
|
except Exception as e:
|
|
# log diagnostic to the UI log and detail pane
|
|
try:
|
|
self.log.insert(tk.END, f"show_message_detail lookup error: {e}\n")
|
|
except Exception:
|
|
pass
|
|
self.detail_text.insert(tk.END, f"Message {label} not found in MessageDB (error).\n")
|
|
return
|
|
|
|
if not msg_wrapper:
|
|
# clear any existing form before showing 'No data'
|
|
try:
|
|
for w in getattr(self, 'detail_form_container', []).winfo_children():
|
|
try:
|
|
w.destroy()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("No data", f"{label}"))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, f"No wrapper found for {label} in MessageDB\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
try:
|
|
self.log.insert(tk.END, f"No wrapper found for {label} in MessageDB\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# The actual ctypes message instance
|
|
msg = getattr(msg_wrapper, 'message', None)
|
|
# store current wrapper for edit/apply
|
|
try:
|
|
self.current_msg_wrapper = msg_wrapper
|
|
except Exception:
|
|
self.current_msg_wrapper = None
|
|
|
|
# If this is an A message, show editable form instead of tree
|
|
try:
|
|
if label and str(label).upper().startswith('A'):
|
|
try:
|
|
self.show_message_form(label)
|
|
return
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
# store current wrapper for edit/apply
|
|
try:
|
|
self.current_msg_wrapper = msg_wrapper
|
|
except Exception:
|
|
self.current_msg_wrapper = None
|
|
if msg is None:
|
|
# Clear any existing form before showing 'No message'
|
|
try:
|
|
for w in getattr(self, 'detail_form_container', []).winfo_children():
|
|
try:
|
|
w.destroy()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("No message", "<empty>"))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, f"Wrapper for {label} has no 'message' attribute\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Use the same form renderer for B messages (read-only) to avoid toggling
|
|
# between tree and form and to keep the UI stable. This will build a
|
|
# non-editable form for the message so values can be observed live.
|
|
try:
|
|
self.show_message_form(label, editable=False)
|
|
except Exception:
|
|
try:
|
|
self.log.insert(tk.END, f"Failed to render form for {label}\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
def periodic_update(self):
|
|
while self.bus_monitor and self.bus_monitor.get_status()['is_running']:
|
|
try:
|
|
self.refresh_messages()
|
|
except Exception:
|
|
pass
|
|
# schedule UI updates (do UI work on main thread)
|
|
try:
|
|
now = time.time()
|
|
if now - getattr(self, '_last_detail_update', 0) >= 1.0:
|
|
self._last_detail_update = now
|
|
try:
|
|
# update detail values and form on UI thread
|
|
try:
|
|
self.after(0, self.update_detail_values)
|
|
except Exception:
|
|
self.update_detail_values()
|
|
try:
|
|
# refresh and update form if visible
|
|
self.after(0, self._refresh_and_update_form)
|
|
except Exception:
|
|
self._refresh_and_update_form()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.5)
|
|
self.update_loop_running = False
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
|
|
def _flash_item(self, iid, duration_ms=700):
|
|
"""Temporarily apply highlight tag to a tree item, then remove it."""
|
|
try:
|
|
# apply tag
|
|
self.detail_tree.item(iid, tags=('changed',))
|
|
# cancel previous timer if any
|
|
prev = self._flash_items.get(iid)
|
|
if prev:
|
|
try:
|
|
self.after_cancel(prev)
|
|
except Exception:
|
|
pass
|
|
# schedule removal
|
|
after_id = self.after(duration_ms, lambda: self._clear_flash(iid))
|
|
self._flash_items[iid] = after_id
|
|
except Exception:
|
|
pass
|
|
|
|
def _clear_flash(self, iid):
|
|
try:
|
|
# remove tag
|
|
self.detail_tree.item(iid, tags=())
|
|
# clear scheduled id
|
|
if iid in self._flash_items:
|
|
del self._flash_items[iid]
|
|
except Exception:
|
|
pass
|
|
|
|
def update_detail_values(self):
|
|
"""Refresh only the values for the currently selected detail rows (throttled)."""
|
|
label = getattr(self, 'detail_selected_label', None)
|
|
if not label:
|
|
return
|
|
# diagnostic logging toggle
|
|
DEBUG_DETAIL_UPDATES = True
|
|
try:
|
|
mdb = self._get_message_db()
|
|
if mdb is None:
|
|
return
|
|
try:
|
|
msg_wrapper = mdb.getMessage(label)
|
|
except Exception:
|
|
allm = mdb.getAllMessages()
|
|
msg_wrapper = allm.get(label) or None
|
|
if not msg_wrapper:
|
|
return
|
|
msg = getattr(msg_wrapper, 'message', None)
|
|
if msg is None:
|
|
return
|
|
except Exception:
|
|
return
|
|
|
|
# iterate rows and update via accessor if provided
|
|
for name, iid in list(self.detail_rows.items()):
|
|
try:
|
|
# skip header rows
|
|
if str(name).startswith('#'):
|
|
continue
|
|
accessor = self.detail_accessors.get(name)
|
|
if callable(accessor):
|
|
try:
|
|
val = accessor()
|
|
except Exception:
|
|
# fallback: try attribute path resolution against msg
|
|
val = None
|
|
else:
|
|
# try to resolve dotted attribute path against msg
|
|
val = None
|
|
try:
|
|
if '.' in name:
|
|
parts = name.split('.')
|
|
cur = msg
|
|
for p in parts:
|
|
if '[' in p and p.endswith(']'):
|
|
# index access
|
|
key, idx = p[:-1].split('[')
|
|
if key:
|
|
cur = getattr(cur, key)
|
|
cur = cur[int(idx)]
|
|
else:
|
|
cur = getattr(cur, p)
|
|
val = cur
|
|
else:
|
|
val = getattr(msg, name)
|
|
except Exception:
|
|
val = None
|
|
|
|
new_str = self._format_value_for_table(val)
|
|
cur_vals = self.detail_tree.item(iid, 'values')
|
|
cur_val = ''
|
|
try:
|
|
cur_val = cur_vals[1]
|
|
except Exception:
|
|
cur_val = ''
|
|
if DEBUG_DETAIL_UPDATES:
|
|
try:
|
|
self.log.insert(tk.END, f"Detail update check {label}:{name} cur='{cur_val}' new='{new_str}'\n")
|
|
except Exception:
|
|
pass
|
|
if new_str != cur_val:
|
|
try:
|
|
# update display value keeping only the leaf name in the param column
|
|
try:
|
|
leaf_label = name.split('.')[-1]
|
|
except Exception:
|
|
leaf_label = name
|
|
parent_key = '.'.join(name.replace('/', '.').split('.')[:-1])
|
|
if parent_key and parent_key in self.detail_rows:
|
|
disp = f"-> {leaf_label}"
|
|
else:
|
|
disp = leaf_label
|
|
self.detail_tree.item(iid, values=(disp, new_str))
|
|
# flash the changed row
|
|
self._flash_item(iid)
|
|
if DEBUG_DETAIL_UPDATES:
|
|
try:
|
|
self.log.insert(tk.END, f"Detail updated {label}:{name} -> {new_str}\n")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def _refresh_and_update_form(self):
|
|
"""Fetch fresh wrapper for current form label and update form widgets."""
|
|
try:
|
|
label = getattr(self, 'current_form_label', None)
|
|
if not label:
|
|
return
|
|
mdb = self._get_message_db()
|
|
if mdb is None:
|
|
return
|
|
try:
|
|
wrapper = mdb.getMessage(label)
|
|
except Exception:
|
|
allm = mdb.getAllMessages()
|
|
wrapper = allm.get(label)
|
|
if not wrapper:
|
|
return
|
|
# update stored wrapper and refresh widgets
|
|
self.current_msg_wrapper = wrapper
|
|
try:
|
|
self._update_form_values(wrapper)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def _update_form_values(self, msg_wrapper):
|
|
"""Update displayed values in the form widgets from `msg_wrapper` without rebuilding widgets."""
|
|
try:
|
|
msg = getattr(msg_wrapper, 'message', None)
|
|
if msg is None:
|
|
return
|
|
|
|
# DEBUG: Log first 3 fields to see what's happening
|
|
debug_count = 0
|
|
|
|
for full, widget in list(self.form_widgets.items()):
|
|
try:
|
|
# resolve the value from msg
|
|
val = None
|
|
try:
|
|
parts = [p for p in full.replace('/', '.').split('.') if p != '']
|
|
cur = msg
|
|
for p in parts:
|
|
if '[' in p and p.endswith(']'):
|
|
key, idx = p[:-1].split('[')
|
|
if key:
|
|
cur = getattr(cur, key)
|
|
cur = cur[int(idx)]
|
|
else:
|
|
cur = getattr(cur, p)
|
|
|
|
# Extract value - try raw, then value, then the object itself
|
|
raw_val = getattr(cur, 'raw', None)
|
|
value_val = getattr(cur, 'value', None)
|
|
val = raw_val if raw_val is not None else (value_val if value_val is not None else cur)
|
|
|
|
# DEBUG: Log first few fields
|
|
if debug_count < 3:
|
|
self.logger.info(f"Field '{full}': cur={type(cur).__name__}, raw={raw_val}, value={value_val}, final_val={val}")
|
|
debug_count += 1
|
|
|
|
except Exception as e:
|
|
if debug_count < 3:
|
|
self.logger.warning(f"Field '{full}': Exception during value extraction: {e}")
|
|
debug_count += 1
|
|
val = None
|
|
|
|
if widget[0] == 'combobox':
|
|
cb = widget[1]
|
|
enum_items = widget[2]
|
|
try:
|
|
if val is None:
|
|
cb.set('')
|
|
else:
|
|
raw = int(val)
|
|
sel = None
|
|
for (n, v) in enum_items:
|
|
if int(v) == raw:
|
|
sel = f"{n} ({v})"
|
|
break
|
|
if sel is not None:
|
|
cb.set(sel)
|
|
else:
|
|
cb.set('')
|
|
except Exception:
|
|
try:
|
|
cb.set('')
|
|
except Exception:
|
|
pass
|
|
else:
|
|
ent = widget[1]
|
|
try:
|
|
ent.delete(0, tk.END)
|
|
if val is None:
|
|
ent.insert(0, '')
|
|
else:
|
|
ent.insert(0, str(val))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def on_detail_double_click(self, event):
|
|
# Start inline edit of the clicked value cell
|
|
try:
|
|
region = self.detail_tree.identify('region', event.x, event.y)
|
|
if region != 'cell':
|
|
return
|
|
rowid = self.detail_tree.identify_row(event.y)
|
|
col = self.detail_tree.identify_column(event.x)
|
|
if not rowid or col != '#2':
|
|
return
|
|
# get bbox for the cell
|
|
bbox = self.detail_tree.bbox(rowid, col)
|
|
if not bbox:
|
|
return
|
|
x, y, w, h = bbox
|
|
abs_x = self.detail_tree.winfo_rootx() + x
|
|
abs_y = self.detail_tree.winfo_rooty() + y
|
|
cur_vals = self.detail_tree.item(rowid, 'values')
|
|
cur_text = ''
|
|
try:
|
|
cur_text = cur_vals[1]
|
|
except Exception:
|
|
cur_text = ''
|
|
# create a transient Entry widget over the cell (as child of master)
|
|
self._edit_var = tk.StringVar(value=str(cur_text))
|
|
self._edit_entry = tk.Entry(self.master, textvariable=self._edit_var)
|
|
# place relative to tree (use winfo) - use place on root
|
|
rx = self.detail_tree.winfo_rootx() - self.master.winfo_rootx() + x
|
|
ry = self.detail_tree.winfo_rooty() - self.master.winfo_rooty() + y
|
|
self._edit_entry.place(x=rx, y=ry, width=w, height=h)
|
|
self._edit_entry.focus_set()
|
|
self._editing_target = (rowid, col)
|
|
self._edit_entry.bind('<Return>', lambda e: self._commit_edit())
|
|
self._edit_entry.bind('<Escape>', lambda e: self._cancel_edit())
|
|
self._edit_entry.bind('<FocusOut>', lambda e: self._cancel_edit())
|
|
except Exception:
|
|
pass
|
|
|
|
def _commit_edit(self):
|
|
try:
|
|
if not getattr(self, '_editing_target', None):
|
|
return
|
|
rowid, col = self._editing_target
|
|
new_text = self._edit_var.get()
|
|
# update displayed value (keep param label unchanged)
|
|
cur_vals = self.detail_tree.item(rowid, 'values')
|
|
label = cur_vals[0] if cur_vals else ''
|
|
try:
|
|
self.detail_tree.item(rowid, values=(label, new_text))
|
|
except Exception:
|
|
pass
|
|
# find the detail_rows reverse mapping to get the full dotted name
|
|
full_name = None
|
|
for k, iid in self.detail_rows.items():
|
|
if iid == rowid:
|
|
full_name = k
|
|
break
|
|
# apply the new value to the underlying message if available
|
|
if full_name and getattr(self, 'current_msg_wrapper', None):
|
|
try:
|
|
self._apply_edit_to_msg(full_name, new_text, self.current_msg_wrapper)
|
|
try:
|
|
self.log.insert(tk.END, f"Applied edit {full_name} = {new_text}\n")
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
try:
|
|
self.log.insert(tk.END, f"Failed to apply edit: {e}\n")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self._edit_entry.destroy()
|
|
except Exception:
|
|
pass
|
|
self._editing_target = None
|
|
self._edit_entry = None
|
|
self._edit_var = None
|
|
except Exception:
|
|
pass
|
|
|
|
def _cancel_edit(self):
|
|
try:
|
|
if getattr(self, '_edit_entry', None):
|
|
try:
|
|
self._edit_entry.destroy()
|
|
except Exception:
|
|
pass
|
|
self._editing_target = None
|
|
self._edit_entry = None
|
|
self._edit_var = None
|
|
except Exception:
|
|
pass
|
|
|
|
def _apply_edit_to_msg(self, full_name: str, text_value: str, msg_wrapper):
|
|
"""Resolve `full_name` against `msg_wrapper.message` and set the value.
|
|
|
|
Handles dotted names and simple indexed parts. For ctypes fields, tries
|
|
to set `.raw` or `.value` if present; otherwise uses setattr.
|
|
Attempts int then float conversion; falls back to string.
|
|
"""
|
|
msg = getattr(msg_wrapper, 'message', None)
|
|
if msg is None:
|
|
raise RuntimeError('No message object to apply to')
|
|
|
|
# resolve path parts
|
|
parts = [p for p in full_name.replace('/', '.').split('.') if p != '']
|
|
cur = msg
|
|
for p in parts[:-1]:
|
|
# handle index like foo[0]
|
|
if '[' in p and p.endswith(']'):
|
|
key, idx = p[:-1].split('[')
|
|
if key:
|
|
cur = getattr(cur, key)
|
|
cur = cur[int(idx)]
|
|
else:
|
|
cur = getattr(cur, p)
|
|
|
|
last = parts[-1]
|
|
# handle array index on last
|
|
target_parent = cur
|
|
target_attr = last
|
|
if '[' in last and last.endswith(']'):
|
|
key, idx = last[:-1].split('[')
|
|
if key:
|
|
target_parent = getattr(cur, key)
|
|
idx = int(idx)
|
|
# assign into sequence
|
|
# convert text_value
|
|
val = self._coerce_text_to_type(text_value)
|
|
try:
|
|
target_parent[idx] = val
|
|
return
|
|
except Exception:
|
|
# try setattr on element
|
|
target_parent = target_parent[idx]
|
|
target_attr = None
|
|
|
|
# now set on target_parent.target_attr
|
|
val = self._coerce_text_to_type(text_value)
|
|
# prefer .raw or .value
|
|
if target_attr and hasattr(target_parent, target_attr):
|
|
targ = getattr(target_parent, target_attr)
|
|
if hasattr(targ, 'raw'):
|
|
try:
|
|
targ.raw = val
|
|
return
|
|
except Exception:
|
|
pass
|
|
if hasattr(targ, 'value'):
|
|
try:
|
|
targ.value = val
|
|
return
|
|
except Exception:
|
|
pass
|
|
# fallback setattr
|
|
try:
|
|
setattr(target_parent, target_attr, val)
|
|
return
|
|
except Exception:
|
|
raise
|
|
else:
|
|
# if no attribute, try setting on parent directly
|
|
try:
|
|
setattr(target_parent, target_attr, val)
|
|
return
|
|
except Exception:
|
|
# last resort: raise
|
|
raise
|
|
|
|
def _coerce_text_to_type(self, text: str):
|
|
# try int, then float, then strip
|
|
try:
|
|
from .monitor_helpers import coerce_text_to_type
|
|
return coerce_text_to_type(text)
|
|
except Exception:
|
|
try:
|
|
if text.startswith('0x'):
|
|
return int(text, 16)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return int(text)
|
|
except Exception:
|
|
try:
|
|
return float(text)
|
|
except Exception:
|
|
return text
|
|
|
|
def on_send_now(self):
|
|
"""Attempt to send the currently selected message immediately.
|
|
|
|
Tries several manager APIs (`send_now`, `send_message`, `send`) and
|
|
falls back to logging if none available.
|
|
"""
|
|
try:
|
|
label = getattr(self, 'detail_selected_label', None)
|
|
if not label:
|
|
try:
|
|
self.log.insert(tk.END, "No message selected to send\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
mgr = getattr(self, 'manager', None)
|
|
if mgr is None:
|
|
try:
|
|
self.log.insert(tk.END, "Manager not available to send message\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
# try known method names
|
|
for name in ('send_now', 'send_message', 'send'):
|
|
fn = getattr(mgr, name, None)
|
|
if callable(fn):
|
|
try:
|
|
fn(label)
|
|
try:
|
|
self.log.insert(tk.END, f"Sent message {label} via {name}\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, f"Manager does not expose send API; modified message saved for scheduled send: {label}\n")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
def update_status(self):
|
|
try:
|
|
if getattr(self, 'import_error', None):
|
|
try:
|
|
port = int(os.environ.get('PYBM_RX_PORT', '61553'))
|
|
except Exception:
|
|
port = 61553
|
|
# left pane shows connection state
|
|
try:
|
|
self.status_left.config(text=f"Disconnected: UDP {port}", fg="red")
|
|
except Exception:
|
|
pass
|
|
return
|
|
if self.bus_monitor is not None:
|
|
# Use ARTOS get_status() method
|
|
try:
|
|
status = self.bus_monitor.get_status()
|
|
if status['is_running']:
|
|
try:
|
|
self.status_left.config(text="Connected (running)", fg="green")
|
|
except Exception:
|
|
pass
|
|
return
|
|
except Exception:
|
|
pass
|
|
# default
|
|
try:
|
|
self.status_left.config(text="Ready (not running)", fg="orange")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
try:
|
|
self.status_left.config(text="Status unknown", fg="black")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main():
|
|
root = tk.Tk()
|
|
root.title("PyBusMonitor1553 - Monitor")
|
|
root.geometry("1024x768")
|
|
app = MonitorApp(master=root)
|
|
def on_closing():
|
|
try:
|
|
if getattr(app, 'logger_system', None):
|
|
app.logger_system.shutdown()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if getattr(app, '_resource_monitor', None):
|
|
try:
|
|
app._resource_monitor.stop()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
root.destroy()
|
|
|
|
root.protocol("WM_DELETE_WINDOW", on_closing)
|
|
app.mainloop()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|