984 lines
39 KiB
Python
984 lines
39 KiB
Python
"""Tkinter GUI monitor for PyBusMonitor1553.
|
|
|
|
Minimal GUI with:
|
|
- Treeview showing registered messages (label, CW.raw, size)
|
|
- Text log for raw sent/received events
|
|
- Buttons: `Initialize`, `Start`, `Stop`, `Refresh`
|
|
|
|
This scaffold calls `pybusmonitor1553.core.connection_manager` for init/start/stop.
|
|
All code, comments and UI text are English.
|
|
"""
|
|
import tkinter as tk
|
|
from tkinter import ttk, scrolledtext, messagebox
|
|
# Import connection manager lazily to avoid import-time socket errors
|
|
# get_manager will be imported at runtime inside the app instance
|
|
MessageDB = None
|
|
import threading
|
|
import time
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
# Try to import the user's external tkinter logger. If not available, add externals path.
|
|
try:
|
|
from tkinter_logger import TkinterLogger, get_logger
|
|
except Exception:
|
|
try:
|
|
externals_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-tkinter-logger'))
|
|
if externals_path not in sys.path:
|
|
sys.path.insert(0, externals_path)
|
|
from tkinter_logger import TkinterLogger, get_logger
|
|
except Exception:
|
|
TkinterLogger = None
|
|
get_logger = None
|
|
|
|
# Optional resource monitor (externals/python-resource-monitor)
|
|
TkinterResourceMonitor = None
|
|
try:
|
|
externals_rm = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-resource-monitor'))
|
|
if externals_rm not in sys.path:
|
|
sys.path.insert(0, externals_rm)
|
|
# Attempt to import the Tkinter integration class
|
|
try:
|
|
from resource_monitor import TkinterResourceMonitor # module provides class at top-level
|
|
except Exception:
|
|
# fallback to package-like import
|
|
try:
|
|
from target_simulator.utils.resource_monitor import TkinterResourceMonitor
|
|
except Exception:
|
|
TkinterResourceMonitor = None
|
|
except Exception:
|
|
TkinterResourceMonitor = None
|
|
|
|
|
|
class MonitorApp(tk.Frame):
|
|
def __init__(self, master=None):
|
|
super().__init__(master)
|
|
self.master = master
|
|
self.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# Do not import or create the connection manager at startup.
|
|
# The application will remain idle until the user presses `Initialize`.
|
|
self.manager = None
|
|
self.import_error = None
|
|
self.create_widgets()
|
|
self.update_loop_running = False
|
|
# cache tree item ids by message label for incremental updates
|
|
self._tree_items = {}
|
|
# No background bind or retry at startup; user must press Initialize.
|
|
|
|
def create_widgets(self):
|
|
# Top Controls Frame
|
|
controls = tk.LabelFrame(self, text="Controls")
|
|
controls.pack(side=tk.TOP, fill=tk.X, padx=6, pady=6)
|
|
|
|
btn_fr = tk.Frame(controls)
|
|
btn_fr.pack(side=tk.LEFT, padx=4, pady=4)
|
|
self.init_btn = tk.Button(btn_fr, text="Initialize", command=self.on_init)
|
|
self.init_btn.pack(side=tk.LEFT, padx=4)
|
|
self.start_btn = tk.Button(btn_fr, text="Start", command=self.on_start)
|
|
self.start_btn.pack(side=tk.LEFT, padx=4)
|
|
self.stop_btn = tk.Button(btn_fr, text="Stop", command=self.on_stop)
|
|
self.stop_btn.pack(side=tk.LEFT, padx=4)
|
|
self.refresh_btn = tk.Button(btn_fr, text="Refresh", command=self.refresh_messages)
|
|
self.refresh_btn.pack(side=tk.LEFT, padx=4)
|
|
# Start/Stop are disabled until initialization is completed
|
|
try:
|
|
self.start_btn.config(state=tk.DISABLED)
|
|
self.stop_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
|
|
# Filters inside controls
|
|
filter_fr = tk.Frame(controls)
|
|
filter_fr.pack(side=tk.RIGHT, padx=4, pady=4)
|
|
tk.Label(filter_fr, text="Label filter:").pack(side=tk.LEFT)
|
|
self.filter_entry = tk.Entry(filter_fr, width=20)
|
|
self.filter_entry.pack(side=tk.LEFT, padx=4)
|
|
tk.Label(filter_fr, text="Period min (ms):").pack(side=tk.LEFT, padx=(8,0))
|
|
self.period_min = tk.Entry(filter_fr, width=8)
|
|
self.period_min.pack(side=tk.LEFT, padx=4)
|
|
tk.Label(filter_fr, text="max(ms):").pack(side=tk.LEFT)
|
|
self.period_max = tk.Entry(filter_fr, width=8)
|
|
self.period_max.pack(side=tk.LEFT, padx=4)
|
|
tk.Button(filter_fr, text="Apply", command=self.refresh_messages).pack(side=tk.LEFT, padx=6)
|
|
|
|
# Middle: Bus Monitor (left) and Details (right)
|
|
middle = tk.Frame(self)
|
|
middle.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=6, pady=2)
|
|
|
|
bus_frame = tk.LabelFrame(middle, text="Bus Monitor")
|
|
bus_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0,6))
|
|
|
|
# Message table (columns per screenshot)
|
|
columns = ("label", "cw", "sw", "num", "errs", "period", "wc", "mc")
|
|
self.tree = ttk.Treeview(bus_frame, columns=columns, show="headings", height=14)
|
|
self.tree.heading("label", text="Name")
|
|
self.tree.heading("cw", text="CW")
|
|
self.tree.heading("sw", text="SW")
|
|
self.tree.heading("num", text="Num")
|
|
self.tree.heading("errs", text="Errs")
|
|
self.tree.heading("period", text="period")
|
|
self.tree.heading("wc", text="wc")
|
|
self.tree.heading("mc", text="MC")
|
|
self.tree.column("label", width=100)
|
|
self.tree.column("cw", width=80)
|
|
self.tree.column("sw", width=60)
|
|
self.tree.column("num", width=60)
|
|
self.tree.column("errs", width=60)
|
|
self.tree.column("period", width=80)
|
|
self.tree.column("wc", width=40)
|
|
self.tree.column("mc", width=60)
|
|
self.tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
|
|
|
|
details_frame = tk.LabelFrame(middle, text="Details")
|
|
details_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=False)
|
|
tk.Label(details_frame, text="Message details:").pack(anchor=tk.W)
|
|
# Use a Treeview for stable tabular details: parameter | value
|
|
self.detail_tree = ttk.Treeview(details_frame, columns=("param", "value"), show="headings", height=18)
|
|
self.detail_tree.heading("param", text="Parameter")
|
|
self.detail_tree.heading("value", text="Value")
|
|
self.detail_tree.column("param", width=180)
|
|
self.detail_tree.column("value", width=320)
|
|
vsb = ttk.Scrollbar(details_frame, orient=tk.VERTICAL, command=self.detail_tree.yview)
|
|
self.detail_tree.configure(yscrollcommand=vsb.set)
|
|
self.detail_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(6,0), pady=6)
|
|
vsb.pack(side=tk.RIGHT, fill=tk.Y, padx=(0,6), pady=6)
|
|
# map param name -> item id for quick updates
|
|
self.detail_rows = {}
|
|
self.detail_selected_label = None
|
|
|
|
# Bottom: Log area
|
|
log_frame = tk.LabelFrame(self, text="Log")
|
|
log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=False, padx=6, pady=6)
|
|
self.log = scrolledtext.ScrolledText(log_frame, height=8)
|
|
self.log.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
|
|
|
|
# Status bar will be added below the whole UI (outside all frames)
|
|
|
|
# Initialize external TkinterLogger if available, attach text widget as handler
|
|
self.logger_system = None
|
|
try:
|
|
if TkinterLogger is not None:
|
|
self.logger_system = TkinterLogger(self.master)
|
|
self.logger_system.setup(enable_console=True, enable_tkinter=True)
|
|
self.logger_system.add_tkinter_handler(self.log)
|
|
self.logger = logging.getLogger(__name__)
|
|
else:
|
|
logging.basicConfig(level=logging.INFO)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info("TkinterLogger not available; using basic logging")
|
|
except Exception as e:
|
|
logging.basicConfig(level=logging.INFO)
|
|
self.logger = logging.getLogger(__name__)
|
|
try:
|
|
self.log.insert(tk.END, f"Logger init error: {e}\n")
|
|
except Exception:
|
|
pass
|
|
|
|
# Bind selection
|
|
self.tree.bind('<<TreeviewSelect>>', self.on_tree_select)
|
|
|
|
# If import of connection manager failed, disable init/start
|
|
if getattr(self, 'import_error', None):
|
|
try:
|
|
self.init_btn.config(state=tk.DISABLED)
|
|
self.start_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
|
|
# Add a global status bar below everything
|
|
self.global_status = tk.Frame(self.master, relief=tk.SUNKEN, bd=1)
|
|
# left / center / right panes
|
|
self.status_left = tk.Label(self.global_status, text="", anchor=tk.W)
|
|
self.status_center = tk.Label(self.global_status, text="", anchor=tk.CENTER)
|
|
# right pane: may use a StringVar updated by external resource monitor
|
|
self.status_resource_var = tk.StringVar()
|
|
self.status_right = tk.Label(self.global_status, textvariable=self.status_resource_var, anchor=tk.E)
|
|
self.status_left.pack(side=tk.LEFT, padx=6)
|
|
self.status_center.pack(side=tk.LEFT, expand=True)
|
|
self.status_right.pack(side=tk.RIGHT, padx=6)
|
|
# pack at the bottom of the main window (below this frame)
|
|
self.global_status.pack(side=tk.BOTTOM, fill=tk.X)
|
|
|
|
# If there was an import error, auto-open diagnostics and set status
|
|
if getattr(self, 'import_error', None):
|
|
self.open_diagnostics(auto=True)
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
self.update_status()
|
|
# Start resource monitor if available
|
|
self._resource_monitor = None
|
|
try:
|
|
if TkinterResourceMonitor is not None:
|
|
# create and start monitor with a 1s poll interval
|
|
try:
|
|
self._resource_monitor = TkinterResourceMonitor(self.master, self.status_resource_var, poll_interval=1.0)
|
|
# start() returns True if psutil available and monitor started
|
|
try:
|
|
self._resource_monitor.start()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
self._resource_monitor = None
|
|
except Exception:
|
|
self._resource_monitor = None
|
|
|
|
def _get_message_db(self):
|
|
"""Lazy import of MessageDB so module import-time socket binds won't crash the GUI."""
|
|
global MessageDB
|
|
if MessageDB is not None:
|
|
return MessageDB
|
|
try:
|
|
import importlib
|
|
try:
|
|
mod = importlib.import_module('Grifo_E_1553lib.messages.messages')
|
|
except Exception:
|
|
mod = importlib.import_module('pybusmonitor1553.Grifo_E_1553lib.messages.messages')
|
|
MessageDB = mod.MessageDB
|
|
return MessageDB
|
|
except Exception as e:
|
|
# record import error so UI can show diagnostics
|
|
self.import_error = str(e)
|
|
return None
|
|
|
|
def _fmt_enum(self, val, enum_names):
|
|
"""Format a numeric `val` with possible enum name(s).
|
|
|
|
`enum_names` is a list of candidate enum class names to try in the
|
|
`Grifo_E_1553lib.data_types.enums` module. Returns a string like
|
|
"3 (RWS)" or the original value if no mapping found.
|
|
"""
|
|
try:
|
|
import importlib
|
|
enums_mod = None
|
|
try:
|
|
enums_mod = importlib.import_module('Grifo_E_1553lib.data_types.enums')
|
|
except Exception:
|
|
try:
|
|
enums_mod = importlib.import_module('pybusmonitor1553.Grifo_E_1553lib.data_types.enums')
|
|
except Exception:
|
|
enums_mod = None
|
|
if enums_mod is None:
|
|
return str(val)
|
|
for name in enum_names:
|
|
enum_cls = getattr(enums_mod, name, None)
|
|
if enum_cls is not None:
|
|
try:
|
|
# if val is already enum instance, get value
|
|
v = int(val)
|
|
try:
|
|
enum_name = enum_cls(v).name
|
|
except Exception:
|
|
enum_name = str(getattr(enum_cls, v, ''))
|
|
return f"{v} ({enum_name})"
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return str(val)
|
|
|
|
def _format_ctypes_obj(self, obj, indent=0, max_depth=3):
|
|
"""Recursively format ctypes-backed message fields into readable text.
|
|
|
|
- Prints simple ctypes numbers as integers
|
|
- For nested objects, recurses up to `max_depth`
|
|
- Uses `_fmt_enum` to annotate enum-like values where possible
|
|
"""
|
|
pad = ' ' * indent
|
|
out_lines = []
|
|
if obj is None:
|
|
return f"{pad}<None>\n"
|
|
# primitive types
|
|
try:
|
|
# ctypes scalar like c_uint16 etc. -> has value attribute or can be int()
|
|
if hasattr(obj, 'value') and not hasattr(obj, '__dict__'):
|
|
try:
|
|
return f"{pad}{int(obj.value)}\n"
|
|
except Exception:
|
|
return f"{pad}{obj}\n"
|
|
# plain int/float/str
|
|
if isinstance(obj, (int, float, str)):
|
|
return f"{pad}{obj}\n"
|
|
except Exception:
|
|
pass
|
|
|
|
# If object has a 'raw' attribute, prefer that as a primitive value
|
|
if hasattr(obj, 'raw') and not hasattr(obj, '__dict__'):
|
|
try:
|
|
raw = int(obj.raw)
|
|
# Try mapping by class name
|
|
cls_name = obj.__class__.__name__
|
|
enum_str = self._fmt_enum(raw, [cls_name])
|
|
return f"{pad}{raw} ({enum_str if enum_str!=str(raw) else cls_name})\n"
|
|
except Exception:
|
|
return f"{pad}{repr(obj)}\n"
|
|
|
|
# If object is a ctypes-like struct or has attributes, iterate fields
|
|
if max_depth <= 0:
|
|
return f"{pad}{obj}\n"
|
|
|
|
# Try to iterate public attributes
|
|
names = [n for n in dir(obj) if not n.startswith('_')]
|
|
simple_fields = []
|
|
for n in names:
|
|
try:
|
|
v = getattr(obj, n)
|
|
except Exception:
|
|
continue
|
|
# skip methods
|
|
if callable(v):
|
|
continue
|
|
# skip module/class descriptors
|
|
if isinstance(v, (type,)):
|
|
continue
|
|
# format recursively
|
|
try:
|
|
if hasattr(v, '__dict__') or hasattr(v, 'raw') or isinstance(v, (int, float, str)):
|
|
nested = self._format_ctypes_obj(v, indent=indent+1, max_depth=max_depth-1)
|
|
simple_fields.append(f"{pad}{n}:\n{nested}")
|
|
else:
|
|
# fallback repr
|
|
simple_fields.append(f"{pad}{n}: {v}\n")
|
|
except Exception:
|
|
simple_fields.append(f"{pad}{n}: <error>\n")
|
|
|
|
if simple_fields:
|
|
return '\n'.join(simple_fields) + '\n'
|
|
return f"{pad}{repr(obj)}\n"
|
|
|
|
def _format_value_for_table(self, val):
|
|
"""Return a short string suitable for the details table's value column."""
|
|
try:
|
|
# simple primitives
|
|
if val is None:
|
|
return "<None>"
|
|
if isinstance(val, (int, float, str)):
|
|
return str(val)
|
|
# prefer common ctypes attrs
|
|
if hasattr(val, 'raw'):
|
|
try:
|
|
return self._fmt_enum(int(getattr(val, 'raw')), [val.__class__.__name__])
|
|
except Exception:
|
|
try:
|
|
return str(getattr(val, 'raw'))
|
|
except Exception:
|
|
return repr(val)
|
|
if hasattr(val, 'value'):
|
|
try:
|
|
return str(int(val.value))
|
|
except Exception:
|
|
try:
|
|
return str(val.value)
|
|
except Exception:
|
|
return repr(val)
|
|
# fallback: attempt a shallow ctypes-style formatting
|
|
try:
|
|
s = self._format_ctypes_obj(val, indent=0, max_depth=1)
|
|
return s.strip().splitlines()[0]
|
|
except Exception:
|
|
return repr(val)
|
|
except Exception:
|
|
return '<unrepresentable>'
|
|
|
|
def _add_detail_row(self, name: str, value: str):
|
|
"""Insert or update a row in the details table (param -> value)."""
|
|
try:
|
|
if name in self.detail_rows:
|
|
iid = self.detail_rows[name]
|
|
try:
|
|
# keep the param column stable
|
|
self.detail_tree.item(iid, values=(name, value))
|
|
return iid
|
|
except Exception:
|
|
pass
|
|
# insert new row
|
|
iid = self.detail_tree.insert('', tk.END, values=(name, value))
|
|
self.detail_rows[name] = iid
|
|
return iid
|
|
except Exception:
|
|
try:
|
|
# best-effort: insert without tracking
|
|
return self.detail_tree.insert('', tk.END, values=(name, str(value)))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def on_init(self):
|
|
# On-demand import/creation of the connection manager when the user requests initialization.
|
|
if self.manager is None:
|
|
try:
|
|
from pybusmonitor1553.core.connection_manager import get_manager
|
|
self.manager = get_manager()
|
|
self.import_error = None
|
|
except Exception as e:
|
|
# record import/bind error and show diagnostics
|
|
self.import_error = str(e)
|
|
try:
|
|
self.open_diagnostics(auto=False)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
try:
|
|
self.manager.init_library()
|
|
try:
|
|
self.logger.info("Initialization completed.")
|
|
except Exception:
|
|
try:
|
|
self.log.insert(tk.END, "Initialization completed.\n")
|
|
except Exception:
|
|
pass
|
|
self.refresh_messages()
|
|
try:
|
|
self.start_btn.config(state=tk.NORMAL)
|
|
self.stop_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|
|
|
|
def on_start(self):
|
|
if getattr(self, 'import_error', None):
|
|
messagebox.showerror("Error", f"Connection library error: {self.import_error}")
|
|
return
|
|
try:
|
|
self.manager.start()
|
|
try:
|
|
self.logger.info("Start send/receive")
|
|
except Exception:
|
|
self.log.insert(tk.END, "Start send/receive\n")
|
|
if not self.update_loop_running:
|
|
self.update_loop_running = True
|
|
threading.Thread(target=self.periodic_update, daemon=True).start()
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|
|
try:
|
|
self.start_btn.config(state=tk.DISABLED)
|
|
self.stop_btn.config(state=tk.NORMAL)
|
|
except Exception:
|
|
pass
|
|
|
|
def on_stop(self):
|
|
if getattr(self, 'import_error', None):
|
|
# nothing to stop
|
|
try:
|
|
self.logger.info("Stop requested (no connection active)")
|
|
except Exception:
|
|
self.log.insert(tk.END, "Stop requested (no connection active)\n")
|
|
return
|
|
self.manager.stop()
|
|
try:
|
|
self.logger.info("Stop requested")
|
|
except Exception:
|
|
self.log.insert(tk.END, "Stop requested\n")
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.start_btn.config(state=tk.NORMAL)
|
|
self.stop_btn.config(state=tk.DISABLED)
|
|
except Exception:
|
|
pass
|
|
|
|
def open_diagnostics(self, auto: bool = False):
|
|
# Diagnostics dialog with error details and Retry.
|
|
# When auto=True (startup auto-open), do not force modal grab so
|
|
# the GUI can finish initialization without blocking.
|
|
dlg = tk.Toplevel(self.master)
|
|
dlg.title("Diagnostics")
|
|
dlg.transient(self.master)
|
|
if not auto:
|
|
try:
|
|
dlg.grab_set()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
dlg.focus_force()
|
|
except Exception:
|
|
pass
|
|
|
|
# Center the dialog over the main window
|
|
try:
|
|
self.master.update_idletasks()
|
|
mx = self.master.winfo_rootx()
|
|
my = self.master.winfo_rooty()
|
|
mw = self.master.winfo_width()
|
|
mh = self.master.winfo_height()
|
|
w = 420
|
|
h = 140
|
|
x = mx + max(0, (mw - w) // 2)
|
|
y = my + max(0, (mh - h) // 2)
|
|
dlg.geometry(f"{w}x{h}+{x}+{y}")
|
|
except Exception:
|
|
pass
|
|
|
|
tk.Label(dlg, text="Connection diagnostics", font=(None, 12, 'bold')).pack(padx=12, pady=(8,4))
|
|
|
|
# Determine likely UDP port (allow override via env)
|
|
try:
|
|
port = int(os.environ.get('PYBM_RX_PORT', '61553'))
|
|
except Exception:
|
|
port = 61553
|
|
|
|
# Simple user-facing message
|
|
msg_text = f"The UDP port {port} is likely already in use by another application.\nClose this application to free the port."
|
|
lbl = tk.Label(dlg, text=msg_text, wraplength=380, justify=tk.LEFT)
|
|
lbl.pack(padx=12, pady=(4,8), expand=True, fill=tk.BOTH)
|
|
|
|
btn_fr = tk.Frame(dlg)
|
|
btn_fr.pack(pady=8)
|
|
close_btn = tk.Button(btn_fr, text="Close", command=lambda: self.master.destroy())
|
|
close_btn.pack(side=tk.LEFT, padx=6)
|
|
|
|
def _retry_and_update(self, dialog):
|
|
# Attempt to reload connection manager and clear import_error if successful
|
|
try:
|
|
import importlib, sys
|
|
# reload connection manager module to re-run import logic
|
|
if 'pybusmonitor1553.core.connection_manager' in sys.modules:
|
|
cm = sys.modules['pybusmonitor1553.core.connection_manager']
|
|
importlib.reload(cm)
|
|
else:
|
|
cm = importlib.import_module('pybusmonitor1553.core.connection_manager')
|
|
# try to obtain manager
|
|
get_manager = getattr(cm, 'get_manager', None)
|
|
if get_manager is None:
|
|
raise RuntimeError('connection_manager missing get_manager')
|
|
self.manager = get_manager()
|
|
self.import_error = None
|
|
try:
|
|
self.init_btn.config(state=tk.NORMAL)
|
|
self.start_btn.config(state=tk.NORMAL)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
messagebox.showinfo('Diagnostics', 'Connection modules reloaded successfully.')
|
|
except Exception:
|
|
pass
|
|
try:
|
|
dialog.destroy()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
# update dialog text with latest error
|
|
try:
|
|
for w in dialog.winfo_children():
|
|
if isinstance(w, tk.Text):
|
|
w.config(state=tk.NORMAL)
|
|
w.delete('1.0', tk.END)
|
|
w.insert(tk.END, f"Retry failed: {e}\n\nSuggested actions:\n- Close other app using UDP port 61553\n- Update PYBM_RX_PORT/PYBM_RX_IP if needed\n- Restart this app after freeing the port\n")
|
|
w.config(state=tk.DISABLED)
|
|
break
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
|
|
def refresh_messages(self):
|
|
# Incremental tree update: preserve selection and avoid full redraw
|
|
try:
|
|
mdb = self._get_message_db()
|
|
if mdb is None:
|
|
return
|
|
msgs = mdb.getAllMessages()
|
|
lbl_filter = self.filter_entry.get().strip().lower()
|
|
pmin = pmax = None
|
|
try:
|
|
if self.period_min.get().strip() != "":
|
|
pmin = float(self.period_min.get().strip())
|
|
if self.period_max.get().strip() != "":
|
|
pmax = float(self.period_max.get().strip())
|
|
except Exception:
|
|
pmin = pmax = None
|
|
|
|
# preserve current selection label (if any)
|
|
cur_sel = None
|
|
sel = self.tree.selection()
|
|
if sel:
|
|
try:
|
|
cur_item = sel[0]
|
|
cur_vals = self.tree.item(cur_item, 'values')
|
|
if cur_vals:
|
|
cur_sel = cur_vals[0]
|
|
except Exception:
|
|
cur_sel = None
|
|
|
|
# mark seen labels to remove stale rows afterwards
|
|
seen = set()
|
|
|
|
for k, v in msgs.items():
|
|
if lbl_filter and lbl_filter not in k.lower():
|
|
continue
|
|
|
|
# compute display fields
|
|
try:
|
|
cw_raw = getattr(v.head.cw, 'raw', None)
|
|
wc = getattr(v.head.cw.str, 'wc', '')
|
|
except Exception:
|
|
cw_raw = None
|
|
wc = ''
|
|
try:
|
|
sw_raw = getattr(v.head, 'sw', '')
|
|
except Exception:
|
|
sw_raw = ''
|
|
num = getattr(v, 'sent_count', getattr(v, 'size', ''))
|
|
try:
|
|
errs = getattr(v.head, 'errcode', '')
|
|
except Exception:
|
|
errs = ''
|
|
period = getattr(v, '_time_ms', None)
|
|
if period is None:
|
|
freq = getattr(v, 'freq', None)
|
|
if freq:
|
|
period = 1000.0 / freq
|
|
if period is not None and pmin is not None and period < pmin:
|
|
continue
|
|
if period is not None and pmax is not None and period > pmax:
|
|
continue
|
|
try:
|
|
mc = getattr(v, 'recv_count', '')
|
|
except Exception:
|
|
mc = ''
|
|
|
|
values = (k, hex(cw_raw) if cw_raw is not None else '', sw_raw, num, errs, f"{period:.3f}" if period else '', wc, mc)
|
|
|
|
# update existing row or insert new one
|
|
if k in self._tree_items and self._tree_items[k] in self.tree.get_children(''):
|
|
try:
|
|
self.tree.item(self._tree_items[k], values=values)
|
|
except Exception:
|
|
# fallback: recreate item
|
|
try:
|
|
self.tree.delete(self._tree_items[k])
|
|
except Exception:
|
|
pass
|
|
iid = self.tree.insert('', tk.END, values=values)
|
|
self._tree_items[k] = iid
|
|
else:
|
|
iid = self.tree.insert('', tk.END, values=values)
|
|
self._tree_items[k] = iid
|
|
|
|
seen.add(k)
|
|
|
|
# remove stale items not present anymore
|
|
stale = [lbl for lbl in list(self._tree_items.keys()) if lbl not in seen]
|
|
for lbl in stale:
|
|
try:
|
|
self.tree.delete(self._tree_items[lbl])
|
|
except Exception:
|
|
pass
|
|
del self._tree_items[lbl]
|
|
|
|
# restore selection if possible
|
|
if cur_sel and cur_sel in self._tree_items:
|
|
try:
|
|
self.tree.selection_set(self._tree_items[cur_sel])
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
def on_tree_select(self, event):
|
|
sel = self.tree.selection()
|
|
if not sel:
|
|
return
|
|
item = sel[0]
|
|
vals = self.tree.item(item, 'values')
|
|
if not vals:
|
|
return
|
|
label = vals[0]
|
|
self.show_message_detail(label)
|
|
|
|
def show_message_detail(self, label: str):
|
|
# build table for this label; detail_tree will be populated and updated
|
|
try:
|
|
self.detail_tree.delete(*self.detail_tree.get_children())
|
|
except Exception:
|
|
pass
|
|
self.detail_rows = {}
|
|
self.detail_selected_label = label
|
|
try:
|
|
mdb = self._get_message_db()
|
|
if mdb is None:
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("Message DB not available", "Open Diagnostics for details."))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, "Message DB not available (import error). Open Diagnostics for details.\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
return
|
|
# Try direct lookup first
|
|
try:
|
|
msg_wrapper = mdb.getMessage(label)
|
|
except Exception:
|
|
# Try alternative access patterns: getAllMessages() dict lookup
|
|
try:
|
|
allm = mdb.getAllMessages()
|
|
# exact key
|
|
if label in allm:
|
|
msg_wrapper = allm[label]
|
|
else:
|
|
# case-insensitive match or startswith
|
|
found = None
|
|
for k in allm.keys():
|
|
if k.lower() == label.lower() or k.startswith(label):
|
|
found = allm[k]
|
|
break
|
|
msg_wrapper = found
|
|
except Exception:
|
|
msg_wrapper = None
|
|
except Exception as e:
|
|
# log diagnostic to the UI log and detail pane
|
|
try:
|
|
self.log.insert(tk.END, f"show_message_detail lookup error: {e}\n")
|
|
except Exception:
|
|
pass
|
|
self.detail_text.insert(tk.END, f"Message {label} not found in MessageDB (error).\n")
|
|
return
|
|
|
|
if not msg_wrapper:
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("No data", f"{label}"))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, f"No wrapper found for {label} in MessageDB\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
try:
|
|
self.log.insert(tk.END, f"No wrapper found for {label} in MessageDB\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# The actual ctypes message instance
|
|
msg = getattr(msg_wrapper, 'message', None)
|
|
if msg is None:
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("No message", "<empty>"))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, f"Wrapper for {label} has no 'message' attribute\n")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Handle known tellbacks
|
|
try:
|
|
if hasattr(msg, 'rdr_mode_tellback') or hasattr(msg, 'settings_tellback'):
|
|
# B7 - status tellback
|
|
if hasattr(msg, 'rdr_mode_tellback'):
|
|
rb = msg.rdr_mode_tellback
|
|
# master mode and flags
|
|
try:
|
|
mm = rb.get_master_mode()
|
|
except Exception:
|
|
mm = getattr(rb, 'raw', rb)
|
|
try:
|
|
mm_str = self._fmt_enum(mm, ['RdrModes', 'RdrModes'])
|
|
except Exception:
|
|
mm_str = str(mm)
|
|
self._add_detail_row('master_mode', mm_str)
|
|
try:
|
|
des = rb.get_des_ctrl()
|
|
des_str = self._fmt_enum(des, ['DesControl', 'DesignationStatus'])
|
|
self._add_detail_row('designation_ctrl', des_str)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
ib = rb.get_ibit()
|
|
ib_str = self._fmt_enum(ib, ['IbitRequest', 'BITReportAvailable'])
|
|
self._add_detail_row('ibit', ib_str)
|
|
except Exception:
|
|
pass
|
|
|
|
# B6 - settings tellback
|
|
if hasattr(msg, 'settings_tellback'):
|
|
st = msg.settings_tellback
|
|
try:
|
|
hist = st.get_history_level()
|
|
except Exception:
|
|
hist = getattr(st, 'raw', '')
|
|
try:
|
|
sym = st.get_sym_intensity()
|
|
except Exception:
|
|
sym = ''
|
|
self._add_detail_row('history_level', self._fmt_enum(hist, ['TargetHistory']))
|
|
self._add_detail_row('symbology_intensity', str(sym))
|
|
|
|
# param1/param2 fields (if present)
|
|
try:
|
|
if hasattr(msg, 'param1_tellback'):
|
|
p1 = msg.param1_tellback
|
|
for fn, enum_names in (
|
|
('get_rws_submode', ['RwsSubmode']),
|
|
('get_spot', ['SpotSelection','SpotSelection']),
|
|
('get_acm_submode', ['AcmSubmode']),
|
|
('get_gm_submode', ['GmSubmode','GmSubmode']),
|
|
('get_expand', ['Expand']),
|
|
('get_range_scale', ['RangeScale']),
|
|
('get_bars_num', ['BarsNum']),
|
|
('get_scan_width', ['ScanWidth','AzimuthScanWidth']),
|
|
):
|
|
if hasattr(p1, fn):
|
|
try:
|
|
v = getattr(p1, fn)()
|
|
v_str = self._fmt_enum(v, enum_names)
|
|
self._add_detail_row(fn, v_str)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if hasattr(msg, 'param2_tellback'):
|
|
p2 = msg.param2_tellback
|
|
self._add_detail_row('param2_raw', str(getattr(p2,'raw', '')))
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Default: dump raw fields of ctypes message
|
|
self.detail_tree.insert('', tk.END, values=(f"# {label}", ""))
|
|
# default: show top-level public attributes as rows
|
|
try:
|
|
for name in dir(msg):
|
|
if name.startswith('_'):
|
|
continue
|
|
try:
|
|
val = getattr(msg, name)
|
|
except Exception:
|
|
continue
|
|
if callable(val):
|
|
continue
|
|
# add row and remember item id
|
|
vstr = self._format_value_for_table(val)
|
|
iid = self.detail_tree.insert('', tk.END, values=(name, vstr))
|
|
self.detail_rows[name] = iid
|
|
except Exception as e:
|
|
# fallback: ensure at least one row
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("<error>", str(e)))
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
try:
|
|
self.detail_tree.insert('', tk.END, values=("<error>", str(e)))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.log.insert(tk.END, f"Error while decoding: {e}\n")
|
|
except Exception:
|
|
pass
|
|
|
|
def periodic_update(self):
|
|
while self.manager.is_running():
|
|
try:
|
|
self.refresh_messages()
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.5)
|
|
self.update_loop_running = False
|
|
try:
|
|
self.update_status()
|
|
except Exception:
|
|
pass
|
|
|
|
def update_status(self):
|
|
try:
|
|
if getattr(self, 'import_error', None):
|
|
try:
|
|
port = int(os.environ.get('PYBM_RX_PORT', '61553'))
|
|
except Exception:
|
|
port = 61553
|
|
# left pane shows connection state
|
|
try:
|
|
self.status_left.config(text=f"Disconnected: UDP {port}", fg="red")
|
|
except Exception:
|
|
pass
|
|
return
|
|
if self.manager is not None:
|
|
# check for running state
|
|
try:
|
|
is_running = getattr(self.manager, 'is_running', None)
|
|
if callable(is_running):
|
|
running = is_running()
|
|
else:
|
|
running = bool(getattr(self.manager, 'running', False))
|
|
if running:
|
|
try:
|
|
self.status_left.config(text="Connected (running)", fg="green")
|
|
except Exception:
|
|
pass
|
|
return
|
|
except Exception:
|
|
pass
|
|
# default
|
|
try:
|
|
self.status_left.config(text="Ready (not running)", fg="orange")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
try:
|
|
self.status_left.config(text="Status unknown", fg="black")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main():
|
|
root = tk.Tk()
|
|
root.title("PyBusMonitor1553 - Monitor")
|
|
root.geometry("1024x768")
|
|
app = MonitorApp(master=root)
|
|
def on_closing():
|
|
try:
|
|
if getattr(app, 'logger_system', None):
|
|
app.logger_system.shutdown()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if getattr(app, '_resource_monitor', None):
|
|
try:
|
|
app._resource_monitor.stop()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
root.destroy()
|
|
|
|
root.protocol("WM_DELETE_WINDOW", on_closing)
|
|
app.mainloop()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|