SXXXXXXX_PyBusMonitor1553/pybusmonitor1553/gui/monitor.py
2025-12-17 10:06:36 +01:00

645 lines
25 KiB
Python

"""Tkinter GUI monitor for PyBusMonitor1553.
Minimal GUI with:
- Treeview showing registered messages (label, CW.raw, size)
- Text log for raw sent/received events
- Buttons: `Initialize`, `Start`, `Stop`, `Refresh`
This scaffold calls `pybusmonitor1553.core.connection_manager` for init/start/stop.
All code, comments and UI text are English.
"""
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
# Import connection manager lazily to avoid import-time socket errors
# get_manager will be imported at runtime inside the app instance
MessageDB = None
import threading
import time
import logging
import os
import sys
# Try to import the user's external tkinter logger. If not available, add externals path.
try:
from tkinter_logger import TkinterLogger, get_logger
except Exception:
try:
externals_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-tkinter-logger'))
if externals_path not in sys.path:
sys.path.insert(0, externals_path)
from tkinter_logger import TkinterLogger, get_logger
except Exception:
TkinterLogger = None
get_logger = None
class MonitorApp(tk.Frame):
def __init__(self, master=None):
super().__init__(master)
self.master = master
self.pack(fill=tk.BOTH, expand=True)
# Do not import or create the connection manager at startup.
# The application will remain idle until the user presses `Initialize`.
self.manager = None
self.import_error = None
self.create_widgets()
self.update_loop_running = False
# No background bind or retry at startup; user must press Initialize.
def create_widgets(self):
# Top Controls Frame
controls = tk.LabelFrame(self, text="Controls")
controls.pack(side=tk.TOP, fill=tk.X, padx=6, pady=6)
btn_fr = tk.Frame(controls)
btn_fr.pack(side=tk.LEFT, padx=4, pady=4)
self.init_btn = tk.Button(btn_fr, text="Initialize", command=self.on_init)
self.init_btn.pack(side=tk.LEFT, padx=4)
self.start_btn = tk.Button(btn_fr, text="Start", command=self.on_start)
self.start_btn.pack(side=tk.LEFT, padx=4)
self.stop_btn = tk.Button(btn_fr, text="Stop", command=self.on_stop)
self.stop_btn.pack(side=tk.LEFT, padx=4)
self.refresh_btn = tk.Button(btn_fr, text="Refresh", command=self.refresh_messages)
self.refresh_btn.pack(side=tk.LEFT, padx=4)
# Start/Stop are disabled until initialization is completed
try:
self.start_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.DISABLED)
except Exception:
pass
# Filters inside controls
filter_fr = tk.Frame(controls)
filter_fr.pack(side=tk.RIGHT, padx=4, pady=4)
tk.Label(filter_fr, text="Label filter:").pack(side=tk.LEFT)
self.filter_entry = tk.Entry(filter_fr, width=20)
self.filter_entry.pack(side=tk.LEFT, padx=4)
tk.Label(filter_fr, text="Period min (ms):").pack(side=tk.LEFT, padx=(8,0))
self.period_min = tk.Entry(filter_fr, width=8)
self.period_min.pack(side=tk.LEFT, padx=4)
tk.Label(filter_fr, text="max(ms):").pack(side=tk.LEFT)
self.period_max = tk.Entry(filter_fr, width=8)
self.period_max.pack(side=tk.LEFT, padx=4)
tk.Button(filter_fr, text="Apply", command=self.refresh_messages).pack(side=tk.LEFT, padx=6)
# Middle: Bus Monitor (left) and Details (right)
middle = tk.Frame(self)
middle.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=6, pady=2)
bus_frame = tk.LabelFrame(middle, text="Bus Monitor")
bus_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0,6))
# Message table (columns per screenshot)
columns = ("label", "cw", "sw", "num", "errs", "period", "wc", "mc")
self.tree = ttk.Treeview(bus_frame, columns=columns, show="headings", height=14)
self.tree.heading("label", text="Name")
self.tree.heading("cw", text="CW")
self.tree.heading("sw", text="SW")
self.tree.heading("num", text="Num")
self.tree.heading("errs", text="Errs")
self.tree.heading("period", text="period")
self.tree.heading("wc", text="wc")
self.tree.heading("mc", text="MC")
self.tree.column("label", width=100)
self.tree.column("cw", width=80)
self.tree.column("sw", width=60)
self.tree.column("num", width=60)
self.tree.column("errs", width=60)
self.tree.column("period", width=80)
self.tree.column("wc", width=40)
self.tree.column("mc", width=60)
self.tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
details_frame = tk.LabelFrame(middle, text="Details")
details_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=False)
tk.Label(details_frame, text="Message details:").pack(anchor=tk.W)
self.detail_text = scrolledtext.ScrolledText(details_frame, width=40, height=18)
self.detail_text.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
# Bottom: Log area
log_frame = tk.LabelFrame(self, text="Log")
log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=False, padx=6, pady=6)
self.log = scrolledtext.ScrolledText(log_frame, height=8)
self.log.pack(fill=tk.BOTH, expand=True, padx=6, pady=6)
# Status bar will be added below the whole UI (outside all frames)
# Initialize external TkinterLogger if available, attach text widget as handler
self.logger_system = None
try:
if TkinterLogger is not None:
self.logger_system = TkinterLogger(self.master)
self.logger_system.setup(enable_console=True, enable_tkinter=True)
self.logger_system.add_tkinter_handler(self.log)
self.logger = logging.getLogger(__name__)
else:
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
self.logger.info("TkinterLogger not available; using basic logging")
except Exception as e:
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
try:
self.log.insert(tk.END, f"Logger init error: {e}\n")
except Exception:
pass
# Bind selection
self.tree.bind('<<TreeviewSelect>>', self.on_tree_select)
# If import of connection manager failed, disable init/start
if getattr(self, 'import_error', None):
try:
self.init_btn.config(state=tk.DISABLED)
self.start_btn.config(state=tk.DISABLED)
except Exception:
pass
# Add a global status bar below everything
self.global_status = tk.Frame(self.master, relief=tk.SUNKEN, bd=1)
# left / center / right panes
self.status_left = tk.Label(self.global_status, text="", anchor=tk.W)
self.status_center = tk.Label(self.global_status, text="", anchor=tk.CENTER)
self.status_right = tk.Label(self.global_status, text="", anchor=tk.E)
self.status_left.pack(side=tk.LEFT, padx=6)
self.status_center.pack(side=tk.LEFT, expand=True)
self.status_right.pack(side=tk.RIGHT, padx=6)
# pack at the bottom of the main window (below this frame)
self.global_status.pack(side=tk.BOTTOM, fill=tk.X)
# If there was an import error, auto-open diagnostics and set status
if getattr(self, 'import_error', None):
self.open_diagnostics(auto=True)
try:
self.update_status()
except Exception:
pass
else:
self.update_status()
def _get_message_db(self):
"""Lazy import of MessageDB so module import-time socket binds won't crash the GUI."""
global MessageDB
if MessageDB is not None:
return MessageDB
try:
import importlib
try:
mod = importlib.import_module('Grifo_E_1553lib.messages.messages')
except Exception:
mod = importlib.import_module('pybusmonitor1553.Grifo_E_1553lib.messages.messages')
MessageDB = mod.MessageDB
return MessageDB
except Exception as e:
# record import error so UI can show diagnostics
self.import_error = str(e)
return None
def on_init(self):
# On-demand import/creation of the connection manager when the user requests initialization.
if self.manager is None:
try:
from pybusmonitor1553.core.connection_manager import get_manager
self.manager = get_manager()
self.import_error = None
except Exception as e:
# record import/bind error and show diagnostics
self.import_error = str(e)
try:
self.open_diagnostics(auto=False)
except Exception:
pass
try:
self.update_status()
except Exception:
pass
return
try:
self.manager.init_library()
try:
self.logger.info("Initialization completed.")
except Exception:
try:
self.log.insert(tk.END, "Initialization completed.\n")
except Exception:
pass
self.refresh_messages()
try:
self.start_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
except Exception:
pass
try:
self.update_status()
except Exception:
pass
except Exception as e:
messagebox.showerror("Error", str(e))
def on_start(self):
if getattr(self, 'import_error', None):
messagebox.showerror("Error", f"Connection library error: {self.import_error}")
return
try:
self.manager.start()
try:
self.logger.info("Start send/receive")
except Exception:
self.log.insert(tk.END, "Start send/receive\n")
if not self.update_loop_running:
self.update_loop_running = True
threading.Thread(target=self.periodic_update, daemon=True).start()
try:
self.update_status()
except Exception:
pass
except Exception as e:
messagebox.showerror("Error", str(e))
try:
self.start_btn.config(state=tk.DISABLED)
self.stop_btn.config(state=tk.NORMAL)
except Exception:
pass
def on_stop(self):
if getattr(self, 'import_error', None):
# nothing to stop
try:
self.logger.info("Stop requested (no connection active)")
except Exception:
self.log.insert(tk.END, "Stop requested (no connection active)\n")
return
self.manager.stop()
try:
self.logger.info("Stop requested")
except Exception:
self.log.insert(tk.END, "Stop requested\n")
try:
self.update_status()
except Exception:
pass
try:
self.start_btn.config(state=tk.NORMAL)
self.stop_btn.config(state=tk.DISABLED)
except Exception:
pass
def open_diagnostics(self, auto: bool = False):
# Diagnostics dialog with error details and Retry.
# When auto=True (startup auto-open), do not force modal grab so
# the GUI can finish initialization without blocking.
dlg = tk.Toplevel(self.master)
dlg.title("Diagnostics")
dlg.transient(self.master)
if not auto:
try:
dlg.grab_set()
except Exception:
pass
try:
dlg.focus_force()
except Exception:
pass
# Center the dialog over the main window
try:
self.master.update_idletasks()
mx = self.master.winfo_rootx()
my = self.master.winfo_rooty()
mw = self.master.winfo_width()
mh = self.master.winfo_height()
w = 420
h = 140
x = mx + max(0, (mw - w) // 2)
y = my + max(0, (mh - h) // 2)
dlg.geometry(f"{w}x{h}+{x}+{y}")
except Exception:
pass
tk.Label(dlg, text="Connection diagnostics", font=(None, 12, 'bold')).pack(padx=12, pady=(8,4))
# Determine likely UDP port (allow override via env)
try:
port = int(os.environ.get('PYBM_RX_PORT', '61553'))
except Exception:
port = 61553
# Simple user-facing message
msg_text = f"The UDP port {port} is likely already in use by another application.\nClose this application to free the port."
lbl = tk.Label(dlg, text=msg_text, wraplength=380, justify=tk.LEFT)
lbl.pack(padx=12, pady=(4,8), expand=True, fill=tk.BOTH)
btn_fr = tk.Frame(dlg)
btn_fr.pack(pady=8)
close_btn = tk.Button(btn_fr, text="Close", command=lambda: self.master.destroy())
close_btn.pack(side=tk.LEFT, padx=6)
def _retry_and_update(self, dialog):
# Attempt to reload connection manager and clear import_error if successful
try:
import importlib, sys
# reload connection manager module to re-run import logic
if 'pybusmonitor1553.core.connection_manager' in sys.modules:
cm = sys.modules['pybusmonitor1553.core.connection_manager']
importlib.reload(cm)
else:
cm = importlib.import_module('pybusmonitor1553.core.connection_manager')
# try to obtain manager
get_manager = getattr(cm, 'get_manager', None)
if get_manager is None:
raise RuntimeError('connection_manager missing get_manager')
self.manager = get_manager()
self.import_error = None
try:
self.init_btn.config(state=tk.NORMAL)
self.start_btn.config(state=tk.NORMAL)
except Exception:
pass
try:
messagebox.showinfo('Diagnostics', 'Connection modules reloaded successfully.')
except Exception:
pass
try:
dialog.destroy()
except Exception:
pass
try:
self.update_status()
except Exception:
pass
except Exception as e:
# update dialog text with latest error
try:
for w in dialog.winfo_children():
if isinstance(w, tk.Text):
w.config(state=tk.NORMAL)
w.delete('1.0', tk.END)
w.insert(tk.END, f"Retry failed: {e}\n\nSuggested actions:\n- Close other app using UDP port 61553\n- Update PYBM_RX_PORT/PYBM_RX_IP if needed\n- Restart this app after freeing the port\n")
w.config(state=tk.DISABLED)
break
except Exception:
pass
try:
self.update_status()
except Exception:
pass
def refresh_messages(self):
self.tree.delete(*self.tree.get_children())
try:
mdb = self._get_message_db()
if mdb is None:
# no message DB available (import error) -> nothing to show
return
msgs = mdb.getAllMessages()
lbl_filter = self.filter_entry.get().strip().lower()
pmin = None
pmax = None
try:
if self.period_min.get().strip() != "":
pmin = float(self.period_min.get().strip())
if self.period_max.get().strip() != "":
pmax = float(self.period_max.get().strip())
except Exception:
pmin = pmax = None
for k, v in msgs.items():
if lbl_filter and lbl_filter not in k.lower():
continue
# cw raw
cw_raw = None
wc = ''
try:
cw_raw = getattr(v.head.cw, 'raw', None)
wc = getattr(v.head.cw.str, 'wc', '')
except Exception:
cw_raw = None
# sw
sw_raw = ''
try:
sw_raw = getattr(v.head, 'sw', '')
except Exception:
sw_raw = ''
# num -> sent_count (how many times this message was sent)
num = getattr(v, 'sent_count', getattr(v, 'size', ''))
# errs
errs = ''
try:
errs = getattr(v.head, 'errcode', '')
except Exception:
errs = ''
# period (ms)
period = getattr(v, '_time_ms', None)
if period is None:
# try compute from freq
freq = getattr(v, 'freq', None)
if freq:
period = 1000.0 / freq
if period is not None and pmin is not None and period < pmin:
continue
if period is not None and pmax is not None and period > pmax:
continue
# mc -> recv_count (how many times we received this message)
mc = ''
try:
mc = getattr(v, 'recv_count', '')
except Exception:
mc = ''
self.tree.insert('', tk.END, values=(k, hex(cw_raw) if cw_raw is not None else '', sw_raw, num, errs, f"{period:.3f}" if period else '', wc, mc))
except Exception:
pass
def on_tree_select(self, event):
sel = self.tree.selection()
if not sel:
return
item = sel[0]
vals = self.tree.item(item, 'values')
if not vals:
return
label = vals[0]
self.show_message_detail(label)
def show_message_detail(self, label: str):
self.detail_text.delete('1.0', tk.END)
try:
mdb = self._get_message_db()
if mdb is None:
self.detail_text.insert(tk.END, f"Message DB not available (import error). Open Diagnostics for details.\n")
return
msg_wrapper = mdb.getMessage(label)
except Exception:
self.detail_text.insert(tk.END, f"Message {label} not found in MessageDB\n")
return
if not msg_wrapper:
self.detail_text.insert(tk.END, f"No data for {label}\n")
return
# The actual ctypes message instance
msg = getattr(msg_wrapper, 'message', None)
if msg is None:
self.detail_text.insert(tk.END, "No message field\n")
return
# Handle known tellbacks
try:
if hasattr(msg, 'rdr_mode_tellback') or hasattr(msg, 'settings_tellback'):
# B7 - status tellback
if hasattr(msg, 'rdr_mode_tellback'):
rb = msg.rdr_mode_tellback
# master mode and flags
try:
mm = rb.get_master_mode()
except Exception:
mm = getattr(rb, 'raw', rb)
self.detail_text.insert(tk.END, f"B7 - RdrStatusTellback:\n")
self.detail_text.insert(tk.END, f" master_mode: {mm}\n")
try:
des = rb.get_des_ctrl()
self.detail_text.insert(tk.END, f" designation_ctrl: {des}\n")
except Exception:
pass
try:
ib = rb.get_ibit()
self.detail_text.insert(tk.END, f" ibit: {ib}\n")
except Exception:
pass
# B6 - settings tellback
if hasattr(msg, 'settings_tellback'):
st = msg.settings_tellback
try:
hist = st.get_history_level()
except Exception:
hist = getattr(st, 'raw', '')
try:
sym = st.get_sym_intensity()
except Exception:
sym = ''
self.detail_text.insert(tk.END, f"B6 - RdrSettingsTellback:\n")
self.detail_text.insert(tk.END, f" history_level: {hist}\n")
self.detail_text.insert(tk.END, f" symbology_intensity: {sym}\n")
# param1/param2 fields (if present)
try:
if hasattr(msg, 'param1_tellback'):
p1 = msg.param1_tellback
# try common getters
out = []
for fn in ('get_rws_submode', 'get_spot', 'get_acm_submode', 'get_gm_submode', 'get_expand', 'get_range_scale', 'get_bars_num', 'get_scan_width'):
if hasattr(p1, fn):
try:
out.append(f" {fn}: {getattr(p1, fn)()}" )
except Exception:
pass
if out:
self.detail_text.insert(tk.END, "param1:\n")
self.detail_text.insert(tk.END, "\n".join(out) + "\n")
except Exception:
pass
try:
if hasattr(msg, 'param2_tellback'):
p2 = msg.param2_tellback
self.detail_text.insert(tk.END, f"param2 raw: {getattr(p2,'raw', '')}\n")
except Exception:
pass
return
# Default: dump raw fields of ctypes message
self.detail_text.insert(tk.END, f"Raw message fields for {label}:\n")
for name in dir(msg):
if name.startswith('_'):
continue
try:
val = getattr(msg, name)
self.detail_text.insert(tk.END, f"{name}: {val}\n")
except Exception:
pass
except Exception as e:
self.detail_text.insert(tk.END, f"Error while decoding: {e}\n")
def periodic_update(self):
while self.manager.is_running():
try:
self.refresh_messages()
except Exception:
pass
time.sleep(0.5)
self.update_loop_running = False
try:
self.update_status()
except Exception:
pass
def update_status(self):
try:
if getattr(self, 'import_error', None):
try:
port = int(os.environ.get('PYBM_RX_PORT', '61553'))
except Exception:
port = 61553
# left pane shows connection state
try:
self.status_left.config(text=f"Disconnected: UDP {port}", fg="red")
except Exception:
pass
return
if self.manager is not None:
# check for running state
try:
is_running = getattr(self.manager, 'is_running', None)
if callable(is_running):
running = is_running()
else:
running = bool(getattr(self.manager, 'running', False))
if running:
try:
self.status_left.config(text="Connected (running)", fg="green")
except Exception:
pass
return
except Exception:
pass
# default
try:
self.status_left.config(text="Ready (not running)", fg="orange")
except Exception:
pass
except Exception:
try:
self.status_left.config(text="Status unknown", fg="black")
except Exception:
pass
def main():
root = tk.Tk()
root.title("PyBusMonitor1553 - Monitor")
root.geometry("1024x768")
app = MonitorApp(master=root)
def on_closing():
try:
if getattr(app, 'logger_system', None):
app.logger_system.shutdown()
except Exception:
pass
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
app.mainloop()
if __name__ == '__main__':
main()