"""Tkinter GUI monitor for PyBusMonitor1553. Minimal GUI with: - Treeview showing registered messages (label, CW.raw, size) - Text log for raw sent/received events - Buttons: `Initialize`, `Start`, `Stop`, `Refresh` This scaffold calls `pybusmonitor1553.core.connection_manager` for init/start/stop. All code, comments and UI text are English. """ import tkinter as tk from tkinter import ttk, scrolledtext, messagebox from pybusmonitor1553.core.connection_manager import get_manager from Grifo_E_1553lib.messages.messages import MessageDB import threading import time import logging import os import sys # Try to import the user's external tkinter logger. If not available, add externals path. try: from tkinter_logger import TkinterLogger, get_logger except Exception: try: externals_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-tkinter-logger')) if externals_path not in sys.path: sys.path.insert(0, externals_path) from tkinter_logger import TkinterLogger, get_logger except Exception: TkinterLogger = None get_logger = None class MonitorApp(tk.Frame): def __init__(self, master=None): super().__init__(master) self.master = master self.pack(fill=tk.BOTH, expand=True) self.manager = get_manager() self.create_widgets() self.update_loop_running = False def create_widgets(self): # Top Controls Frame controls = tk.LabelFrame(self, text="Controls") controls.pack(side=tk.TOP, fill=tk.X, padx=6, pady=6) btn_fr = tk.Frame(controls) btn_fr.pack(side=tk.LEFT, padx=4, pady=4) self.init_btn = tk.Button(btn_fr, text="Initialize", command=self.on_init) self.init_btn.pack(side=tk.LEFT, padx=4) self.start_btn = tk.Button(btn_fr, text="Start", command=self.on_start) self.start_btn.pack(side=tk.LEFT, padx=4) self.stop_btn = tk.Button(btn_fr, text="Stop", command=self.on_stop) self.stop_btn.pack(side=tk.LEFT, padx=4) self.refresh_btn = tk.Button(btn_fr, text="Refresh", command=self.refresh_messages) self.refresh_btn.pack(side=tk.LEFT, padx=4) # Filters inside controls filter_fr = tk.Frame(controls) filter_fr.pack(side=tk.RIGHT, padx=4, pady=4) tk.Label(filter_fr, text="Label filter:").pack(side=tk.LEFT) self.filter_entry = tk.Entry(filter_fr, width=20) self.filter_entry.pack(side=tk.LEFT, padx=4) tk.Label(filter_fr, text="Period min (ms):").pack(side=tk.LEFT, padx=(8,0)) self.period_min = tk.Entry(filter_fr, width=8) self.period_min.pack(side=tk.LEFT, padx=4) tk.Label(filter_fr, text="max(ms):").pack(side=tk.LEFT) self.period_max = tk.Entry(filter_fr, width=8) self.period_max.pack(side=tk.LEFT, padx=4) tk.Button(filter_fr, text="Apply", command=self.refresh_messages).pack(side=tk.LEFT, padx=6) # Middle: Bus Monitor (left) and Details (right) middle = tk.Frame(self) middle.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=6, pady=2) bus_frame = tk.LabelFrame(middle, text="Bus Monitor") bus_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0,6)) # Message table (columns per screenshot) columns = ("label", "cw", "sw", "num", "errs", "period", "wc", "mc") self.tree = ttk.Treeview(bus_frame, columns=columns, show="headings", height=14) self.tree.heading("label", text="Name") self.tree.heading("cw", text="CW") self.tree.heading("sw", text="SW") self.tree.heading("num", text="Num") self.tree.heading("errs", text="Errs") self.tree.heading("period", text="period") self.tree.heading("wc", text="wc") self.tree.heading("mc", text="MC") self.tree.column("label", width=100) self.tree.column("cw", width=80) self.tree.column("sw", width=60) self.tree.column("num", width=60) self.tree.column("errs", width=60) self.tree.column("period", width=80) self.tree.column("wc", width=40) self.tree.column("mc", width=60) self.tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) details_frame = tk.LabelFrame(middle, text="Details") details_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=False) tk.Label(details_frame, text="Message details:").pack(anchor=tk.W) self.detail_text = scrolledtext.ScrolledText(details_frame, width=40, height=18) self.detail_text.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) # Bottom: Log area log_frame = tk.LabelFrame(self, text="Log") log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=False, padx=6, pady=6) self.log = scrolledtext.ScrolledText(log_frame, height=8) self.log.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) # Initialize external TkinterLogger if available, attach text widget as handler self.logger_system = None try: if TkinterLogger is not None: self.logger_system = TkinterLogger(self.master) self.logger_system.setup(enable_console=True, enable_tkinter=True) self.logger_system.add_tkinter_handler(self.log) self.logger = logging.getLogger(__name__) else: logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) self.logger.info("TkinterLogger not available; using basic logging") except Exception as e: logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) try: self.log.insert(tk.END, f"Logger init error: {e}\n") except Exception: pass # Bind selection self.tree.bind('<>', self.on_tree_select) def on_init(self): try: self.manager.init_library() try: self.logger.info("Initialization completed.") except Exception: self.log.insert(tk.END, "Initialization completed.\n") self.refresh_messages() except Exception as e: messagebox.showerror("Error", str(e)) def on_start(self): try: self.manager.start() try: self.logger.info("Start send/receive") except Exception: self.log.insert(tk.END, "Start send/receive\n") if not self.update_loop_running: self.update_loop_running = True threading.Thread(target=self.periodic_update, daemon=True).start() except Exception as e: messagebox.showerror("Error", str(e)) def on_stop(self): self.manager.stop() try: self.logger.info("Stop requested") except Exception: self.log.insert(tk.END, "Stop requested\n") def refresh_messages(self): self.tree.delete(*self.tree.get_children()) try: msgs = MessageDB.getAllMessages() lbl_filter = self.filter_entry.get().strip().lower() pmin = None pmax = None try: if self.period_min.get().strip() != "": pmin = float(self.period_min.get().strip()) if self.period_max.get().strip() != "": pmax = float(self.period_max.get().strip()) except Exception: pmin = pmax = None for k, v in msgs.items(): if lbl_filter and lbl_filter not in k.lower(): continue # cw raw cw_raw = None wc = '' try: cw_raw = getattr(v.head.cw, 'raw', None) wc = getattr(v.head.cw.str, 'wc', '') except Exception: cw_raw = None # sw sw_raw = '' try: sw_raw = getattr(v.head, 'sw', '') except Exception: sw_raw = '' # num -> size (bytes) as fallback num = getattr(v, 'size', '') # errs errs = '' try: errs = getattr(v.head, 'errcode', '') except Exception: errs = '' # period (ms) period = getattr(v, '_time_ms', None) if period is None: # try compute from freq freq = getattr(v, 'freq', None) if freq: period = 1000.0 / freq if period is not None and pmin is not None and period < pmin: continue if period is not None and pmax is not None and period > pmax: continue mc = '' try: mc = getattr(v, 'mcounter', '') except Exception: mc = '' self.tree.insert('', tk.END, values=(k, hex(cw_raw) if cw_raw is not None else '', sw_raw, num, errs, f"{period:.3f}" if period else '', wc, mc)) except Exception: pass def on_tree_select(self, event): sel = self.tree.selection() if not sel: return item = sel[0] vals = self.tree.item(item, 'values') if not vals: return label = vals[0] self.show_message_detail(label) def show_message_detail(self, label: str): self.detail_text.delete('1.0', tk.END) try: msg_wrapper = MessageDB.getMessage(label) except Exception: self.detail_text.insert(tk.END, f"Message {label} not found in MessageDB\n") return if not msg_wrapper: self.detail_text.insert(tk.END, f"No data for {label}\n") return # The actual ctypes message instance msg = getattr(msg_wrapper, 'message', None) if msg is None: self.detail_text.insert(tk.END, "No message field\n") return # Handle known tellbacks try: if hasattr(msg, 'rdr_mode_tellback') or hasattr(msg, 'settings_tellback'): # B7 - status tellback if hasattr(msg, 'rdr_mode_tellback'): rb = msg.rdr_mode_tellback # master mode and flags try: mm = rb.get_master_mode() except Exception: mm = getattr(rb, 'raw', rb) self.detail_text.insert(tk.END, f"B7 - RdrStatusTellback:\n") self.detail_text.insert(tk.END, f" master_mode: {mm}\n") try: des = rb.get_des_ctrl() self.detail_text.insert(tk.END, f" designation_ctrl: {des}\n") except Exception: pass try: ib = rb.get_ibit() self.detail_text.insert(tk.END, f" ibit: {ib}\n") except Exception: pass # B6 - settings tellback if hasattr(msg, 'settings_tellback'): st = msg.settings_tellback try: hist = st.get_history_level() except Exception: hist = getattr(st, 'raw', '') try: sym = st.get_sym_intensity() except Exception: sym = '' self.detail_text.insert(tk.END, f"B6 - RdrSettingsTellback:\n") self.detail_text.insert(tk.END, f" history_level: {hist}\n") self.detail_text.insert(tk.END, f" symbology_intensity: {sym}\n") # param1/param2 fields (if present) try: if hasattr(msg, 'param1_tellback'): p1 = msg.param1_tellback # try common getters out = [] for fn in ('get_rws_submode', 'get_spot', 'get_acm_submode', 'get_gm_submode', 'get_expand', 'get_range_scale', 'get_bars_num', 'get_scan_width'): if hasattr(p1, fn): try: out.append(f" {fn}: {getattr(p1, fn)()}" ) except Exception: pass if out: self.detail_text.insert(tk.END, "param1:\n") self.detail_text.insert(tk.END, "\n".join(out) + "\n") except Exception: pass try: if hasattr(msg, 'param2_tellback'): p2 = msg.param2_tellback self.detail_text.insert(tk.END, f"param2 raw: {getattr(p2,'raw', '')}\n") except Exception: pass return # Default: dump raw fields of ctypes message self.detail_text.insert(tk.END, f"Raw message fields for {label}:\n") for name in dir(msg): if name.startswith('_'): continue try: val = getattr(msg, name) self.detail_text.insert(tk.END, f"{name}: {val}\n") except Exception: pass except Exception as e: self.detail_text.insert(tk.END, f"Error while decoding: {e}\n") def periodic_update(self): while self.manager.is_running(): try: self.refresh_messages() except Exception: pass time.sleep(0.5) self.update_loop_running = False def main(): root = tk.Tk() root.title("PyBusMonitor1553 - Monitor") root.geometry("800x600") app = MonitorApp(master=root) def on_closing(): try: if getattr(app, 'logger_system', None): app.logger_system.shutdown() except Exception: pass root.destroy() root.protocol("WM_DELETE_WINDOW", on_closing) app.mainloop() if __name__ == '__main__': main()