"""Tkinter GUI monitor for PyBusMonitor1553. Minimal GUI with: - Treeview showing registered messages (label, CW.raw, size) - Text log for raw sent/received events - Buttons: `Initialize`, `Start`, `Stop`, `Refresh` This scaffold calls `pybusmonitor1553.core.connection_manager` for init/start/stop. All code, comments and UI text are English. """ import tkinter as tk from tkinter import ttk, scrolledtext, messagebox # Import connection manager lazily to avoid import-time socket errors # get_manager will be imported at runtime inside the app instance MessageDB = None import threading import time import logging import os import sys # Try to import the user's external tkinter logger. If not available, add externals path. try: from tkinter_logger import TkinterLogger, get_logger except Exception: try: externals_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-tkinter-logger')) if externals_path not in sys.path: sys.path.insert(0, externals_path) from tkinter_logger import TkinterLogger, get_logger except Exception: TkinterLogger = None get_logger = None # Optional resource monitor (externals/python-resource-monitor) TkinterResourceMonitor = None try: externals_rm = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'externals', 'python-resource-monitor')) if externals_rm not in sys.path: sys.path.insert(0, externals_rm) # Attempt to import the Tkinter integration class try: from resource_monitor import TkinterResourceMonitor # module provides class at top-level except Exception: # fallback to package-like import try: from target_simulator.utils.resource_monitor import TkinterResourceMonitor except Exception: TkinterResourceMonitor = None except Exception: TkinterResourceMonitor = None class MonitorApp(tk.Frame): def __init__(self, master=None): super().__init__(master) self.master = master self.pack(fill=tk.BOTH, expand=True) # Do not import or create the connection manager at startup. # The application will remain idle until the user presses `Initialize`. self.manager = None self.import_error = None self.create_widgets() self.update_loop_running = False # cache tree item ids by message label for incremental updates self._tree_items = {} # parents we want to keep always expanded in detail tree self._always_open_parents = {} # No background bind or retry at startup; user must press Initialize. def create_widgets(self): # Top Controls Frame controls = tk.LabelFrame(self, text="Controls") controls.pack(side=tk.TOP, fill=tk.X, padx=6, pady=6) btn_fr = tk.Frame(controls) btn_fr.pack(side=tk.LEFT, padx=4, pady=4) self.init_btn = tk.Button(btn_fr, text="Initialize", command=self.on_init) self.init_btn.pack(side=tk.LEFT, padx=4) self.start_btn = tk.Button(btn_fr, text="Start", command=self.on_start) self.start_btn.pack(side=tk.LEFT, padx=4) self.stop_btn = tk.Button(btn_fr, text="Stop", command=self.on_stop) self.stop_btn.pack(side=tk.LEFT, padx=4) self.refresh_btn = tk.Button(btn_fr, text="Refresh", command=self.refresh_messages) self.refresh_btn.pack(side=tk.LEFT, padx=4) # Start/Stop are disabled until initialization is completed try: self.start_btn.config(state=tk.DISABLED) self.stop_btn.config(state=tk.DISABLED) except Exception: pass # Filters inside controls filter_fr = tk.Frame(controls) filter_fr.pack(side=tk.RIGHT, padx=4, pady=4) tk.Label(filter_fr, text="Label filter:").pack(side=tk.LEFT) self.filter_entry = tk.Entry(filter_fr, width=20) self.filter_entry.pack(side=tk.LEFT, padx=4) tk.Label(filter_fr, text="Period min (ms):").pack(side=tk.LEFT, padx=(8,0)) self.period_min = tk.Entry(filter_fr, width=8) self.period_min.pack(side=tk.LEFT, padx=4) tk.Label(filter_fr, text="max(ms):").pack(side=tk.LEFT) self.period_max = tk.Entry(filter_fr, width=8) self.period_max.pack(side=tk.LEFT, padx=4) tk.Button(filter_fr, text="Apply", command=self.refresh_messages).pack(side=tk.LEFT, padx=6) # Middle: Bus Monitor (left) and Details (right) middle = tk.Frame(self) middle.pack(side=tk.TOP, fill=tk.BOTH, expand=True, padx=6, pady=2) bus_frame = tk.LabelFrame(middle, text="Bus Monitor") bus_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0,6)) # Message table (columns per screenshot) columns = ("label", "cw", "sw", "num", "errs", "period", "wc", "mc") self.tree = ttk.Treeview(bus_frame, columns=columns, show="headings", height=14) self.tree.heading("label", text="Name") self.tree.heading("cw", text="CW") self.tree.heading("sw", text="SW") self.tree.heading("num", text="Num") self.tree.heading("errs", text="Errs") self.tree.heading("period", text="period") self.tree.heading("wc", text="wc") self.tree.heading("mc", text="MC") self.tree.column("label", width=100) self.tree.column("cw", width=80) self.tree.column("sw", width=60) self.tree.column("num", width=60) self.tree.column("errs", width=60) self.tree.column("period", width=80) self.tree.column("wc", width=40) self.tree.column("mc", width=60) self.tree.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) # details pane moved to dedicated submodule try: from .details_pane import DetailsPane DETAILS_WIDTH = 520 DETAILS_HEIGHT = 540 self._details = DetailsPane(middle, app=self, width=DETAILS_WIDTH, height=DETAILS_HEIGHT) # ensure attributes expected by existing methods exist self.form_widgets = getattr(self, 'form_widgets', {}) self.current_form_label = getattr(self, 'current_form_label', None) self.current_msg_wrapper = getattr(self, 'current_msg_wrapper', None) self.detail_rows = getattr(self, 'detail_rows', {}) self.detail_selected_label = getattr(self, 'detail_selected_label', None) self.detail_accessors = getattr(self, 'detail_accessors', {}) self._flash_items = getattr(self, '_flash_items', {}) self._last_detail_update = 0.0 try: self.detail_tree.tag_configure('changed', background='lightyellow') except Exception: pass except Exception: # fallback: create minimal placeholder frame if import fails details_frame = tk.LabelFrame(middle, text="Details") details_frame.pack(side=tk.RIGHT, fill=tk.Y, expand=False, padx=6) tk.Label(details_frame, text="Message details:").pack(anchor=tk.W) self.detail_tree = ttk.Treeview(details_frame, columns=("param","value"), show="headings") self.detail_tree.heading('param', text='Parameter') self.detail_tree.heading('value', text='Value') self.detail_tree.pack(fill=tk.BOTH, expand=True) # Bottom: Log area log_frame = tk.LabelFrame(self, text="Log") log_frame.pack(side=tk.BOTTOM, fill=tk.BOTH, expand=False, padx=6, pady=6) self.log = scrolledtext.ScrolledText(log_frame, height=8) self.log.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) # Status bar will be added below the whole UI (outside all frames) # Initialize external TkinterLogger if available, attach text widget as handler self.logger_system = None try: if TkinterLogger is not None: self.logger_system = TkinterLogger(self.master) self.logger_system.setup(enable_console=True, enable_tkinter=True) self.logger_system.add_tkinter_handler(self.log) self.logger = logging.getLogger(__name__) else: logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) self.logger.info("TkinterLogger not available; using basic logging") except Exception as e: logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) try: self.log.insert(tk.END, f"Logger init error: {e}\n") except Exception: pass # Bind selection self.tree.bind('<>', self.on_tree_select) # If import of connection manager failed, disable init/start if getattr(self, 'import_error', None): try: self.init_btn.config(state=tk.DISABLED) self.start_btn.config(state=tk.DISABLED) except Exception: pass # Add a global status bar below everything self.global_status = tk.Frame(self.master, relief=tk.SUNKEN, bd=1) # left / center / right panes self.status_left = tk.Label(self.global_status, text="", anchor=tk.W) self.status_center = tk.Label(self.global_status, text="", anchor=tk.CENTER) # right pane: may use a StringVar updated by external resource monitor self.status_resource_var = tk.StringVar() self.status_right = tk.Label(self.global_status, textvariable=self.status_resource_var, anchor=tk.E) self.status_left.pack(side=tk.LEFT, padx=6) self.status_center.pack(side=tk.LEFT, expand=True) self.status_right.pack(side=tk.RIGHT, padx=6) # pack at the bottom of the main window (below this frame) self.global_status.pack(side=tk.BOTTOM, fill=tk.X) # If there was an import error, auto-open diagnostics and set status if getattr(self, 'import_error', None): self.open_diagnostics(auto=True) try: self.update_status() except Exception: pass else: self.update_status() # Start resource monitor if available self._resource_monitor = None try: if TkinterResourceMonitor is not None: # create and start monitor with a 1s poll interval try: self._resource_monitor = TkinterResourceMonitor(self.master, self.status_resource_var, poll_interval=1.0) # start() returns True if psutil available and monitor started try: self._resource_monitor.start() except Exception: pass except Exception: self._resource_monitor = None except Exception: self._resource_monitor = None def _get_message_db(self): """Lazy import of MessageDB so module import-time socket binds won't crash the GUI.""" global MessageDB if MessageDB is not None: return MessageDB try: import importlib try: mod = importlib.import_module('Grifo_E_1553lib.messages.messages') except Exception: mod = importlib.import_module('pybusmonitor1553.Grifo_E_1553lib.messages.messages') MessageDB = mod.MessageDB return MessageDB except Exception as e: # record import error so UI can show diagnostics self.import_error = str(e) return None def _fmt_enum(self, val, enum_names): """Format a numeric `val` with possible enum name(s). `enum_names` is a list of candidate enum class names to try in the `Grifo_E_1553lib.data_types.enums` module. Returns a string like "3 (RWS)" or the original value if no mapping found. """ # delegate to helper to keep monitor.py small try: from .monitor_helpers import fmt_enum return fmt_enum(val, enum_names) except Exception: return str(val) def _format_ctypes_obj(self, obj, indent=0, max_depth=3): """Recursively format ctypes-backed message fields into readable text. - Prints simple ctypes numbers as integers - For nested objects, recurses up to `max_depth` - Uses `_fmt_enum` to annotate enum-like values where possible """ try: from .monitor_helpers import format_ctypes_obj return format_ctypes_obj(obj, indent=indent, max_depth=max_depth) except Exception: pad = ' ' * indent return f"{pad}{repr(obj)}\n" def _format_value_for_table(self, val): """Return a short string suitable for the details table's value column.""" try: from .monitor_helpers import format_value_for_table return format_value_for_table(val) except Exception: try: if val is None: return '' return str(val) except Exception: return '' def _get_enum_items(self, class_name): """Return a list of (name, value) tuples for an enum class name, or None.""" try: from .monitor_helpers import get_enum_items return get_enum_items(class_name) except Exception: return None def show_message_form(self, label: str, editable: bool = True): """Build an editable form for message `label` (A messages). The form is placed into `self.detail_form_container`. Existing form widgets are destroyed and rebuilt for the new message. """ try: # Use the form container for showing message details; do not toggle the # tree visibility here. The DetailsPane places the form inside a # scrollable canvas so it remains visible while updating. # if same form is already shown, refresh widget values safely if getattr(self, 'current_form_label', None) == label and self.form_widgets: try: # use the existing helper which fetches fresh wrapper and # updates widgets without rebuilding the form self._refresh_and_update_form() except Exception: pass return mdb = self._get_message_db() if mdb is None: return try: msg_wrapper = mdb.getMessage(label) except Exception: allm = mdb.getAllMessages() msg_wrapper = allm.get(label) if not msg_wrapper: return msg = getattr(msg_wrapper, 'message', None) if msg is None: return # clear previous form only when we know we're going to build a new one for w in self.detail_form_container.winfo_children(): try: w.destroy() except Exception: pass self.form_widgets = {} self.current_form_label = label # header with Apply button at top hdr = tk.Frame(self.detail_form_container) hdr.pack(fill=tk.X, padx=6, pady=(6,0)) lbl_hdr = tk.Label(hdr, text=f"Edit message {label}", anchor=tk.W) lbl_hdr.pack(side=tk.LEFT) # For editable forms show Apply button, otherwise present a label if editable: apply_btn = tk.Button(hdr, text='Apply', command=lambda: self._apply_form_values(msg_wrapper)) apply_btn.pack(side=tk.RIGHT, padx=6) else: tk.Label(hdr, text='Read-only', anchor=tk.E).pack(side=tk.RIGHT, padx=6) # recursive builder for fields frm = tk.Frame(self.detail_form_container) frm.pack(fill=tk.BOTH, expand=True, padx=6, pady=6) self._build_form_fields(frm, msg, prefix='', editable=editable) # show form container try: self.detail_form_container.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) except Exception: pass except Exception as e: try: self.log.insert(tk.END, f"Error building form: {e}\n") except Exception: pass def _build_form_fields(self, parent, obj, prefix='', max_depth=3, _depth=0, editable=True): """Recursively create labeled widgets for public fields of obj.""" if _depth > max_depth: return row = 0 # helper to detect struct-like ctypes objects def _is_struct_like(v): if v is None: return False if isinstance(v, (int, float, str, bytes)): return False if callable(v): return False # ctypes.Structure expose _fields_ if hasattr(v, '_fields_'): return True # objects with many public attributes are likely struct-like public = [n for n in dir(v) if not n.startswith('_')] if len(public) > 2: return True return False for name in [n for n in dir(obj) if not n.startswith('_')]: try: val = getattr(obj, name) except Exception: continue if callable(val): continue full = f"{prefix}.{name}" if prefix else name # Special-case: ctypes Union with a '.str' structure (bitfields) if hasattr(val, 'str') and _is_struct_like(getattr(val, 'str')): subfrm = ttk.LabelFrame(parent, text=name) subfrm.pack(fill=tk.X, padx=6, pady=4) # recurse into the .str sub-structure so fields become e.g. settings.spare try: self._build_form_fields(subfrm, getattr(val, 'str'), prefix=full, max_depth=max_depth, _depth=_depth+1) except Exception: pass continue # group nested structs (ctypes structs or objects with many public attrs) if _is_struct_like(val) and not (hasattr(val, 'raw') or hasattr(val, 'value')): subfrm = ttk.LabelFrame(parent, text=name) subfrm.pack(fill=tk.X, padx=6, pady=4) self._build_form_fields(subfrm, val, prefix=full, max_depth=max_depth, _depth=_depth+1) continue # create label + widget row_fr = tk.Frame(parent) row_fr.pack(fill=tk.X, padx=2, pady=1) lbl = tk.Label(row_fr, text=name, width=28, anchor=tk.W) lbl.pack(side=tk.LEFT) # enum choices? enum_items = None try: enum_items = self._get_enum_items(val.__class__.__name__) except Exception: enum_items = None widget = None if enum_items: # build combobox with names # show human-friendly label first, then numeric value choices = [f"{n} ({v})" for (n, v) in enum_items] var = tk.StringVar() # combobox is selectable for editable forms, otherwise disabled cb_state = 'readonly' if editable else 'disabled' cb = ttk.Combobox(row_fr, values=choices, textvariable=var, state=cb_state) # set current value try: raw = int(getattr(val, 'raw', getattr(val, 'value', val))) # find matching choice sel = None for (n, v) in enum_items: if int(v) == raw: sel = f"{n} ({v})" break if sel is not None: var.set(sel) except Exception: pass cb.pack(side=tk.RIGHT, fill=tk.X, expand=True) widget = ('combobox', cb, enum_items) else: # numeric or text entry var = tk.StringVar() try: cur = getattr(val, 'raw', getattr(val, 'value', val)) if cur is None: var.set('') else: var.set(str(cur)) except Exception: var.set('') ent = tk.Entry(row_fr, textvariable=var) # disable entry if not editable if not editable: try: ent.config(state='disabled') except Exception: pass ent.pack(side=tk.RIGHT, fill=tk.X, expand=True) widget = ('entry', ent) # register created widget for later updates; values will be filled # by `_update_form_values` when a message wrapper is available. try: self.form_widgets[full] = widget except Exception: pass def _apply_form_values(self, msg_wrapper): """Read widgets from `self.form_widgets` and apply values to message.""" for full, widget in list(self.form_widgets.items()): try: if widget[0] == 'combobox': cb = widget[1] sel = cb.get() if not sel: continue # sel like 'NAME (3)'; try to extract numeric value inside parentheses try: start = sel.rfind('(') end = sel.rfind(')') if start != -1 and end != -1 and end > start: num = sel[start+1:end].strip() val = int(num) else: # fallback to first token num = sel.split()[0] val = int(num) except Exception: val = sel else: ent = widget[1] txt = ent.get() val = self._coerce_text_to_type(txt) # apply try: self._apply_edit_to_msg(full, str(val), msg_wrapper) except Exception as e: try: self.log.insert(tk.END, f"Failed to set {full}: {e}\n") except Exception: pass except Exception: pass try: self.log.insert(tk.END, "Applied form values\n") except Exception: pass def _add_detail_row(self, name: str, value: str, accessor=None): """Insert or update a row in the details table (param -> value).""" try: # support hierarchical names separated by '.' or '/' def _ensure_hierarchy(full_name): parts = [p for p in full_name.replace('/', '.').split('.') if p != ''] parent = '' acc = [] for p in parts[:-1]: acc.append(p) key = '.'.join(acc) if key in self.detail_rows: parent = self.detail_rows[key] continue # create parent node with empty value try: iid = self.detail_tree.insert(parent, tk.END, values=(p, '')) # ensure parent is expanded so nested fields remain visible try: self.detail_tree.item(iid, open=True) self._always_open_parents[key] = True except Exception: pass self.detail_rows[key] = iid parent = iid except Exception: # fallback: try to insert at root try: iid = self.detail_tree.insert('', tk.END, values=(p, '')) self.detail_rows[key] = iid parent = iid except Exception: parent = '' leaf = parts[-1] if parts else full_name leaf_parent_key = '.'.join(parts[:-1]) if parts[:-1] else '' leaf_parent = self.detail_rows.get(leaf_parent_key, '') return leaf_parent, leaf if name in self.detail_rows: iid = self.detail_rows[name] try: # keep the param column stable; show only the leaf label leaf = name.split('.')[-1] parent_key = '.'.join(name.replace('/', '.').split('.')[:-1]) if parent_key and parent_key in self.detail_rows: disp = f"-> {leaf}" else: disp = leaf self.detail_tree.item(iid, values=(disp, value)) # if this row represents a parent that we marked to remain open, # ensure it's still expanded after value updates try: parent_key = name if parent_key in self._always_open_parents: self.detail_tree.item(iid, open=True) except Exception: pass if accessor is not None: self.detail_accessors[name] = accessor return iid except Exception: pass # insert hierarchical node parent_iid, leaf_label = _ensure_hierarchy(name) try: leaf_parent_key = '.'.join(name.replace('/', '.').split('.')[:-1]) if leaf_parent_key: disp_label = f"-> {leaf_label}" else: disp_label = leaf_label iid = self.detail_tree.insert(parent_iid, tk.END, values=(disp_label, value)) self.detail_rows[name] = iid if accessor is not None: self.detail_accessors[name] = accessor return iid except Exception: try: leaf_parent_key = '.'.join(name.replace('/', '.').split('.')[:-1]) if leaf_parent_key: disp_label = f"-> {leaf_label}" else: disp_label = leaf_label iid = self.detail_tree.insert('', tk.END, values=(disp_label, value)) self.detail_rows[name] = iid if accessor is not None: self.detail_accessors[name] = accessor return iid except Exception: return None except Exception: try: # best-effort: insert without tracking # best-effort hierarchical insert parent_iid, leaf_label = (lambda n: ('' , n))(name) try: leaf_parent_key = '.'.join(name.replace('/', '.').split('.')[:-1]) if leaf_parent_key: disp_label = f"-> {leaf_label}" else: disp_label = leaf_label iid = self.detail_tree.insert(parent_iid, tk.END, values=(disp_label, str(value))) if accessor is not None: self.detail_accessors[name] = accessor return iid except Exception: return None except Exception: return None def _add_detail_rows_from_obj(self, prefix: str, obj, max_depth: int = 4, _depth: int = 0): """Recursively add detail rows for public fields of `obj`. - `prefix` is a string prepended to field names (e.g. 'param1'). - Stops recursing after `max_depth` to avoid noisy dumps. - Uses `_format_value_for_table` and `_fmt_enum` to annotate enum-like values. """ if _depth > max_depth: try: self._add_detail_row(prefix or '', self._format_value_for_table(obj)) except Exception: pass return if obj is None: try: self._add_detail_row(prefix or '', '') except Exception: pass return # primitive types if isinstance(obj, (int, float, str)): try: self._add_detail_row(prefix or '', str(obj)) except Exception: pass return # Enum-like or ctypes scalar handling def _is_struct_like_local(v): # heuristics similar to form builder: ctypes.Structure or many public attrs if v is None: return False if isinstance(v, (int, float, str, bytes)): return False if hasattr(v, '_fields_'): return True public = [n for n in dir(v) if not n.startswith('_')] if len(public) > 2: return True return False if hasattr(obj, 'raw') or hasattr(obj, 'value'): # If the object also looks like a struct (has nested fields), recurse if _is_struct_like_local(obj): # fall through to attribute iteration to expand nested fields pass else: try: s = None try: raw = int(getattr(obj, 'raw')) if hasattr(obj, 'raw') else int(getattr(obj, 'value')) s = self._fmt_enum(raw, [obj.__class__.__name__]) except Exception: try: s = str(getattr(obj, 'raw', getattr(obj, 'value', obj))) except Exception: s = repr(obj) self._add_detail_row(prefix or obj.__class__.__name__, s) except Exception: try: self._add_detail_row(prefix or '', repr(obj)) except Exception: pass return # dict-like try: if isinstance(obj, dict): for k, v in obj.items(): name = f"{prefix}.{k}" if prefix else str(k) self._add_detail_rows_from_obj(name, v, max_depth=max_depth, _depth=_depth+1) return except Exception: pass # sequence try: if isinstance(obj, (list, tuple)): for i, v in enumerate(obj): name = f"{prefix}[{i}]" if prefix else f"[{i}]" self._add_detail_rows_from_obj(name, v, max_depth=max_depth, _depth=_depth+1) return except Exception: pass # otherwise iterate public attributes import inspect names = [n for n in dir(obj) if not n.startswith('_')] for n in names: try: v = getattr(obj, n) except Exception: continue field_name = f"{prefix}.{n}" if prefix else n # If attribute is a ctypes Union with a '.str' bitfield, expand it try: if hasattr(v, 'str') and (hasattr(getattr(v, 'str'), '__dict__') or hasattr(getattr(v, 'str'), '_fields_')): try: self._add_detail_rows_from_obj(field_name, getattr(v, 'str'), max_depth=max_depth, _depth=_depth+1) continue except Exception: pass except Exception: pass # If callable, try to call zero-arg getters (get_*/is_*) to obtain a value if callable(v): called = False try: sig = inspect.signature(v) if len(sig.parameters) == 0: try: val = v() self._add_detail_row(field_name, self._format_value_for_table(val)) called = True except Exception: called = False except Exception: # fallback: attempt to call and ignore errors try: val = v() self._add_detail_row(field_name, self._format_value_for_table(val)) called = True except Exception: called = False if called: continue # otherwise skip arbitrary callables continue # Primitive or scalar-like values if isinstance(v, (int, float, str)) or hasattr(v, 'raw') or hasattr(v, 'value'): try: self._add_detail_row(field_name, self._format_value_for_table(v)) except Exception: pass else: # Recurse into nested objects try: self._add_detail_rows_from_obj(field_name, v, max_depth=max_depth, _depth=_depth+1) except Exception: pass def on_init(self): # On-demand import/creation of the connection manager when the user requests initialization. if self.manager is None: try: from pybusmonitor1553.core.connection_manager import get_manager self.manager = get_manager() self.import_error = None except Exception as e: # record import/bind error and show diagnostics self.import_error = str(e) try: self.open_diagnostics(auto=False) except Exception: pass try: self.update_status() except Exception: pass return try: self.manager.init_library() try: self.logger.info("Initialization completed.") except Exception: try: self.log.insert(tk.END, "Initialization completed.\n") except Exception: pass self.refresh_messages() try: self.start_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.DISABLED) except Exception: pass try: self.update_status() except Exception: pass except Exception as e: messagebox.showerror("Error", str(e)) def on_start(self): if getattr(self, 'import_error', None): messagebox.showerror("Error", f"Connection library error: {self.import_error}") return try: self.manager.start() try: self.logger.info("Start send/receive") except Exception: self.log.insert(tk.END, "Start send/receive\n") if not self.update_loop_running: self.update_loop_running = True threading.Thread(target=self.periodic_update, daemon=True).start() try: self.update_status() except Exception: pass except Exception as e: messagebox.showerror("Error", str(e)) try: self.start_btn.config(state=tk.DISABLED) self.stop_btn.config(state=tk.NORMAL) except Exception: pass def on_stop(self): if getattr(self, 'import_error', None): # nothing to stop try: self.logger.info("Stop requested (no connection active)") except Exception: self.log.insert(tk.END, "Stop requested (no connection active)\n") return self.manager.stop() try: self.logger.info("Stop requested") except Exception: self.log.insert(tk.END, "Stop requested\n") try: self.update_status() except Exception: pass try: self.start_btn.config(state=tk.NORMAL) self.stop_btn.config(state=tk.DISABLED) except Exception: pass def open_diagnostics(self, auto: bool = False): # Diagnostics dialog with error details and Retry. # When auto=True (startup auto-open), do not force modal grab so # the GUI can finish initialization without blocking. dlg = tk.Toplevel(self.master) dlg.title("Diagnostics") dlg.transient(self.master) if not auto: try: dlg.grab_set() except Exception: pass try: dlg.focus_force() except Exception: pass # Center the dialog over the main window try: self.master.update_idletasks() mx = self.master.winfo_rootx() my = self.master.winfo_rooty() mw = self.master.winfo_width() mh = self.master.winfo_height() w = 420 h = 140 x = mx + max(0, (mw - w) // 2) y = my + max(0, (mh - h) // 2) dlg.geometry(f"{w}x{h}+{x}+{y}") except Exception: pass tk.Label(dlg, text="Connection diagnostics", font=(None, 12, 'bold')).pack(padx=12, pady=(8,4)) # Determine likely UDP port (allow override via env) try: port = int(os.environ.get('PYBM_RX_PORT', '61553')) except Exception: port = 61553 # Simple user-facing message msg_text = f"The UDP port {port} is likely already in use by another application.\nClose this application to free the port." lbl = tk.Label(dlg, text=msg_text, wraplength=380, justify=tk.LEFT) lbl.pack(padx=12, pady=(4,8), expand=True, fill=tk.BOTH) btn_fr = tk.Frame(dlg) btn_fr.pack(pady=8) close_btn = tk.Button(btn_fr, text="Close", command=lambda: self.master.destroy()) close_btn.pack(side=tk.LEFT, padx=6) def _retry_and_update(self, dialog): # Attempt to reload connection manager and clear import_error if successful try: import importlib, sys # reload connection manager module to re-run import logic if 'pybusmonitor1553.core.connection_manager' in sys.modules: cm = sys.modules['pybusmonitor1553.core.connection_manager'] importlib.reload(cm) else: cm = importlib.import_module('pybusmonitor1553.core.connection_manager') # try to obtain manager get_manager = getattr(cm, 'get_manager', None) if get_manager is None: raise RuntimeError('connection_manager missing get_manager') self.manager = get_manager() self.import_error = None try: self.init_btn.config(state=tk.NORMAL) self.start_btn.config(state=tk.NORMAL) except Exception: pass try: messagebox.showinfo('Diagnostics', 'Connection modules reloaded successfully.') except Exception: pass try: dialog.destroy() except Exception: pass try: self.update_status() except Exception: pass except Exception as e: # update dialog text with latest error try: for w in dialog.winfo_children(): if isinstance(w, tk.Text): w.config(state=tk.NORMAL) w.delete('1.0', tk.END) w.insert(tk.END, f"Retry failed: {e}\n\nSuggested actions:\n- Close other app using UDP port 61553\n- Update PYBM_RX_PORT/PYBM_RX_IP if needed\n- Restart this app after freeing the port\n") w.config(state=tk.DISABLED) break except Exception: pass try: self.update_status() except Exception: pass def refresh_messages(self): # Incremental tree update: preserve selection and avoid full redraw try: mdb = self._get_message_db() if mdb is None: return msgs = mdb.getAllMessages() lbl_filter = self.filter_entry.get().strip().lower() pmin = pmax = None try: if self.period_min.get().strip() != "": pmin = float(self.period_min.get().strip()) if self.period_max.get().strip() != "": pmax = float(self.period_max.get().strip()) except Exception: pmin = pmax = None # preserve current selection label (if any) cur_sel = None sel = self.tree.selection() if sel: try: cur_item = sel[0] cur_vals = self.tree.item(cur_item, 'values') if cur_vals: cur_sel = cur_vals[0] except Exception: cur_sel = None # mark seen labels to remove stale rows afterwards seen = set() for k, v in msgs.items(): if lbl_filter and lbl_filter not in k.lower(): continue # compute display fields try: cw_raw = getattr(v.head.cw, 'raw', None) wc = getattr(v.head.cw.str, 'wc', '') except Exception: cw_raw = None wc = '' try: sw_raw = getattr(v.head, 'sw', '') except Exception: sw_raw = '' num = getattr(v, 'sent_count', getattr(v, 'size', '')) try: errs = getattr(v.head, 'errcode', '') except Exception: errs = '' period = getattr(v, '_time_ms', None) if period is None: freq = getattr(v, 'freq', None) if freq: period = 1000.0 / freq if period is not None and pmin is not None and period < pmin: continue if period is not None and pmax is not None and period > pmax: continue try: mc = getattr(v, 'recv_count', '') except Exception: mc = '' values = (k, hex(cw_raw) if cw_raw is not None else '', sw_raw, num, errs, f"{period:.3f}" if period else '', wc, mc) # update existing row or insert new one if k in self._tree_items and self._tree_items[k] in self.tree.get_children(''): try: self.tree.item(self._tree_items[k], values=values) except Exception: # fallback: recreate item try: self.tree.delete(self._tree_items[k]) except Exception: pass iid = self.tree.insert('', tk.END, values=values) self._tree_items[k] = iid else: iid = self.tree.insert('', tk.END, values=values) self._tree_items[k] = iid seen.add(k) # remove stale items not present anymore stale = [lbl for lbl in list(self._tree_items.keys()) if lbl not in seen] for lbl in stale: try: self.tree.delete(self._tree_items[lbl]) except Exception: pass del self._tree_items[lbl] # restore selection if possible if cur_sel and cur_sel in self._tree_items: try: self.tree.selection_set(self._tree_items[cur_sel]) except Exception: pass except Exception: pass def on_tree_select(self, event): sel = self.tree.selection() if not sel: return item = sel[0] vals = self.tree.item(item, 'values') if not vals: return label = vals[0] self.show_message_detail(label) def show_message_detail(self, label: str): # build table for this label; detail_tree will be populated and updated # Do not clear the form here — only clear when we know we'll show # an error message or rebuild the details. Clearing before the # MessageDB lookup causes the UI to briefly show then disappear # if the lookup fails during a refresh. self.detail_rows = {} self.detail_selected_label = label try: mdb = self._get_message_db() if mdb is None: # clear any previous form/tree content before showing diagnostic try: for w in getattr(self, 'detail_form_container', []).winfo_children(): try: w.destroy() except Exception: pass except Exception: pass try: self.detail_tree.insert('', tk.END, values=("Message DB not available", "Open Diagnostics for details.")) except Exception: pass try: self.log.insert(tk.END, "Message DB not available (import error). Open Diagnostics for details.\n") except Exception: pass return # Try direct lookup first try: msg_wrapper = mdb.getMessage(label) except Exception: # Try alternative access patterns: getAllMessages() dict lookup try: allm = mdb.getAllMessages() # exact key if label in allm: msg_wrapper = allm[label] else: # case-insensitive match or startswith found = None for k in allm.keys(): if k.lower() == label.lower() or k.startswith(label): found = allm[k] break msg_wrapper = found except Exception: msg_wrapper = None except Exception as e: # log diagnostic to the UI log and detail pane try: self.log.insert(tk.END, f"show_message_detail lookup error: {e}\n") except Exception: pass self.detail_text.insert(tk.END, f"Message {label} not found in MessageDB (error).\n") return if not msg_wrapper: # clear any existing form before showing 'No data' try: for w in getattr(self, 'detail_form_container', []).winfo_children(): try: w.destroy() except Exception: pass except Exception: pass try: self.detail_tree.insert('', tk.END, values=("No data", f"{label}")) except Exception: pass try: self.log.insert(tk.END, f"No wrapper found for {label} in MessageDB\n") except Exception: pass return try: self.log.insert(tk.END, f"No wrapper found for {label} in MessageDB\n") except Exception: pass return # The actual ctypes message instance msg = getattr(msg_wrapper, 'message', None) # store current wrapper for edit/apply try: self.current_msg_wrapper = msg_wrapper except Exception: self.current_msg_wrapper = None # If this is an A message, show editable form instead of tree try: if label and str(label).upper().startswith('A'): try: self.show_message_form(label) return except Exception: pass except Exception: pass # store current wrapper for edit/apply try: self.current_msg_wrapper = msg_wrapper except Exception: self.current_msg_wrapper = None if msg is None: # Clear any existing form before showing 'No message' try: for w in getattr(self, 'detail_form_container', []).winfo_children(): try: w.destroy() except Exception: pass except Exception: pass try: self.detail_tree.insert('', tk.END, values=("No message", "")) except Exception: pass try: self.log.insert(tk.END, f"Wrapper for {label} has no 'message' attribute\n") except Exception: pass return # Use the same form renderer for B messages (read-only) to avoid toggling # between tree and form and to keep the UI stable. This will build a # non-editable form for the message so values can be observed live. try: self.show_message_form(label, editable=False) except Exception: try: self.log.insert(tk.END, f"Failed to render form for {label}\n") except Exception: pass return def periodic_update(self): while self.manager.is_running(): try: self.refresh_messages() except Exception: pass # schedule UI updates (do UI work on main thread) try: now = time.time() if now - getattr(self, '_last_detail_update', 0) >= 1.0: self._last_detail_update = now try: # update detail values and form on UI thread try: self.after(0, self.update_detail_values) except Exception: self.update_detail_values() try: # refresh and update form if visible self.after(0, self._refresh_and_update_form) except Exception: self._refresh_and_update_form() except Exception: pass except Exception: pass time.sleep(0.5) self.update_loop_running = False try: self.update_status() except Exception: pass def _flash_item(self, iid, duration_ms=700): """Temporarily apply highlight tag to a tree item, then remove it.""" try: # apply tag self.detail_tree.item(iid, tags=('changed',)) # cancel previous timer if any prev = self._flash_items.get(iid) if prev: try: self.after_cancel(prev) except Exception: pass # schedule removal after_id = self.after(duration_ms, lambda: self._clear_flash(iid)) self._flash_items[iid] = after_id except Exception: pass def _clear_flash(self, iid): try: # remove tag self.detail_tree.item(iid, tags=()) # clear scheduled id if iid in self._flash_items: del self._flash_items[iid] except Exception: pass def update_detail_values(self): """Refresh only the values for the currently selected detail rows (throttled).""" label = getattr(self, 'detail_selected_label', None) if not label: return # diagnostic logging toggle DEBUG_DETAIL_UPDATES = True try: mdb = self._get_message_db() if mdb is None: return try: msg_wrapper = mdb.getMessage(label) except Exception: allm = mdb.getAllMessages() msg_wrapper = allm.get(label) or None if not msg_wrapper: return msg = getattr(msg_wrapper, 'message', None) if msg is None: return except Exception: return # iterate rows and update via accessor if provided for name, iid in list(self.detail_rows.items()): try: # skip header rows if str(name).startswith('#'): continue accessor = self.detail_accessors.get(name) if callable(accessor): try: val = accessor() except Exception: # fallback: try attribute path resolution against msg val = None else: # try to resolve dotted attribute path against msg val = None try: if '.' in name: parts = name.split('.') cur = msg for p in parts: if '[' in p and p.endswith(']'): # index access key, idx = p[:-1].split('[') if key: cur = getattr(cur, key) cur = cur[int(idx)] else: cur = getattr(cur, p) val = cur else: val = getattr(msg, name) except Exception: val = None new_str = self._format_value_for_table(val) cur_vals = self.detail_tree.item(iid, 'values') cur_val = '' try: cur_val = cur_vals[1] except Exception: cur_val = '' if DEBUG_DETAIL_UPDATES: try: self.log.insert(tk.END, f"Detail update check {label}:{name} cur='{cur_val}' new='{new_str}'\n") except Exception: pass if new_str != cur_val: try: # update display value keeping only the leaf name in the param column try: leaf_label = name.split('.')[-1] except Exception: leaf_label = name parent_key = '.'.join(name.replace('/', '.').split('.')[:-1]) if parent_key and parent_key in self.detail_rows: disp = f"-> {leaf_label}" else: disp = leaf_label self.detail_tree.item(iid, values=(disp, new_str)) # flash the changed row self._flash_item(iid) if DEBUG_DETAIL_UPDATES: try: self.log.insert(tk.END, f"Detail updated {label}:{name} -> {new_str}\n") except Exception: pass except Exception: pass except Exception: pass def _refresh_and_update_form(self): """Fetch fresh wrapper for current form label and update form widgets.""" try: label = getattr(self, 'current_form_label', None) if not label: return mdb = self._get_message_db() if mdb is None: return try: wrapper = mdb.getMessage(label) except Exception: allm = mdb.getAllMessages() wrapper = allm.get(label) if not wrapper: return # update stored wrapper and refresh widgets self.current_msg_wrapper = wrapper try: self._update_form_values(wrapper) except Exception: pass except Exception: pass def _update_form_values(self, msg_wrapper): """Update displayed values in the form widgets from `msg_wrapper` without rebuilding widgets.""" try: msg = getattr(msg_wrapper, 'message', None) if msg is None: return for full, widget in list(self.form_widgets.items()): try: # resolve the value from msg val = None try: parts = [p for p in full.replace('/', '.').split('.') if p != ''] cur = msg for p in parts: if '[' in p and p.endswith(']'): key, idx = p[:-1].split('[') if key: cur = getattr(cur, key) cur = cur[int(idx)] else: cur = getattr(cur, p) val = getattr(cur, 'raw', getattr(cur, 'value', cur)) except Exception: val = None if widget[0] == 'combobox': cb = widget[1] enum_items = widget[2] try: if val is None: cb.set('') else: raw = int(val) sel = None for (n, v) in enum_items: if int(v) == raw: sel = f"{n} ({v})" break if sel is not None: cb.set(sel) else: cb.set('') except Exception: try: cb.set('') except Exception: pass else: ent = widget[1] try: ent.delete(0, tk.END) if val is None: ent.insert(0, '') else: ent.insert(0, str(val)) except Exception: pass except Exception: pass except Exception: pass def on_detail_double_click(self, event): # Start inline edit of the clicked value cell try: region = self.detail_tree.identify('region', event.x, event.y) if region != 'cell': return rowid = self.detail_tree.identify_row(event.y) col = self.detail_tree.identify_column(event.x) if not rowid or col != '#2': return # get bbox for the cell bbox = self.detail_tree.bbox(rowid, col) if not bbox: return x, y, w, h = bbox abs_x = self.detail_tree.winfo_rootx() + x abs_y = self.detail_tree.winfo_rooty() + y cur_vals = self.detail_tree.item(rowid, 'values') cur_text = '' try: cur_text = cur_vals[1] except Exception: cur_text = '' # create a transient Entry widget over the cell (as child of master) self._edit_var = tk.StringVar(value=str(cur_text)) self._edit_entry = tk.Entry(self.master, textvariable=self._edit_var) # place relative to tree (use winfo) - use place on root rx = self.detail_tree.winfo_rootx() - self.master.winfo_rootx() + x ry = self.detail_tree.winfo_rooty() - self.master.winfo_rooty() + y self._edit_entry.place(x=rx, y=ry, width=w, height=h) self._edit_entry.focus_set() self._editing_target = (rowid, col) self._edit_entry.bind('', lambda e: self._commit_edit()) self._edit_entry.bind('', lambda e: self._cancel_edit()) self._edit_entry.bind('', lambda e: self._cancel_edit()) except Exception: pass def _commit_edit(self): try: if not getattr(self, '_editing_target', None): return rowid, col = self._editing_target new_text = self._edit_var.get() # update displayed value (keep param label unchanged) cur_vals = self.detail_tree.item(rowid, 'values') label = cur_vals[0] if cur_vals else '' try: self.detail_tree.item(rowid, values=(label, new_text)) except Exception: pass # find the detail_rows reverse mapping to get the full dotted name full_name = None for k, iid in self.detail_rows.items(): if iid == rowid: full_name = k break # apply the new value to the underlying message if available if full_name and getattr(self, 'current_msg_wrapper', None): try: self._apply_edit_to_msg(full_name, new_text, self.current_msg_wrapper) try: self.log.insert(tk.END, f"Applied edit {full_name} = {new_text}\n") except Exception: pass except Exception as e: try: self.log.insert(tk.END, f"Failed to apply edit: {e}\n") except Exception: pass try: self._edit_entry.destroy() except Exception: pass self._editing_target = None self._edit_entry = None self._edit_var = None except Exception: pass def _cancel_edit(self): try: if getattr(self, '_edit_entry', None): try: self._edit_entry.destroy() except Exception: pass self._editing_target = None self._edit_entry = None self._edit_var = None except Exception: pass def _apply_edit_to_msg(self, full_name: str, text_value: str, msg_wrapper): """Resolve `full_name` against `msg_wrapper.message` and set the value. Handles dotted names and simple indexed parts. For ctypes fields, tries to set `.raw` or `.value` if present; otherwise uses setattr. Attempts int then float conversion; falls back to string. """ msg = getattr(msg_wrapper, 'message', None) if msg is None: raise RuntimeError('No message object to apply to') # resolve path parts parts = [p for p in full_name.replace('/', '.').split('.') if p != ''] cur = msg for p in parts[:-1]: # handle index like foo[0] if '[' in p and p.endswith(']'): key, idx = p[:-1].split('[') if key: cur = getattr(cur, key) cur = cur[int(idx)] else: cur = getattr(cur, p) last = parts[-1] # handle array index on last target_parent = cur target_attr = last if '[' in last and last.endswith(']'): key, idx = last[:-1].split('[') if key: target_parent = getattr(cur, key) idx = int(idx) # assign into sequence # convert text_value val = self._coerce_text_to_type(text_value) try: target_parent[idx] = val return except Exception: # try setattr on element target_parent = target_parent[idx] target_attr = None # now set on target_parent.target_attr val = self._coerce_text_to_type(text_value) # prefer .raw or .value if target_attr and hasattr(target_parent, target_attr): targ = getattr(target_parent, target_attr) if hasattr(targ, 'raw'): try: targ.raw = val return except Exception: pass if hasattr(targ, 'value'): try: targ.value = val return except Exception: pass # fallback setattr try: setattr(target_parent, target_attr, val) return except Exception: raise else: # if no attribute, try setting on parent directly try: setattr(target_parent, target_attr, val) return except Exception: # last resort: raise raise def _coerce_text_to_type(self, text: str): # try int, then float, then strip try: from .monitor_helpers import coerce_text_to_type return coerce_text_to_type(text) except Exception: try: if text.startswith('0x'): return int(text, 16) except Exception: pass try: return int(text) except Exception: try: return float(text) except Exception: return text def on_send_now(self): """Attempt to send the currently selected message immediately. Tries several manager APIs (`send_now`, `send_message`, `send`) and falls back to logging if none available. """ try: label = getattr(self, 'detail_selected_label', None) if not label: try: self.log.insert(tk.END, "No message selected to send\n") except Exception: pass return mgr = getattr(self, 'manager', None) if mgr is None: try: self.log.insert(tk.END, "Manager not available to send message\n") except Exception: pass return # try known method names for name in ('send_now', 'send_message', 'send'): fn = getattr(mgr, name, None) if callable(fn): try: fn(label) try: self.log.insert(tk.END, f"Sent message {label} via {name}\n") except Exception: pass return except Exception: pass try: self.log.insert(tk.END, f"Manager does not expose send API; modified message saved for scheduled send: {label}\n") except Exception: pass except Exception: pass def update_status(self): try: if getattr(self, 'import_error', None): try: port = int(os.environ.get('PYBM_RX_PORT', '61553')) except Exception: port = 61553 # left pane shows connection state try: self.status_left.config(text=f"Disconnected: UDP {port}", fg="red") except Exception: pass return if self.manager is not None: # check for running state try: is_running = getattr(self.manager, 'is_running', None) if callable(is_running): running = is_running() else: running = bool(getattr(self.manager, 'running', False)) if running: try: self.status_left.config(text="Connected (running)", fg="green") except Exception: pass return except Exception: pass # default try: self.status_left.config(text="Ready (not running)", fg="orange") except Exception: pass except Exception: try: self.status_left.config(text="Status unknown", fg="black") except Exception: pass def main(): root = tk.Tk() root.title("PyBusMonitor1553 - Monitor") root.geometry("1024x768") app = MonitorApp(master=root) def on_closing(): try: if getattr(app, 'logger_system', None): app.logger_system.shutdown() except Exception: pass try: if getattr(app, '_resource_monitor', None): try: app._resource_monitor.stop() except Exception: pass except Exception: pass root.destroy() root.protocol("WM_DELETE_WINDOW", on_closing) app.mainloop() if __name__ == '__main__': main()