prima versione della gui

This commit is contained in:
VALLONGOL 2025-12-11 14:00:24 +01:00
parent 6a7eb45402
commit a78fba39cc
5 changed files with 790 additions and 9 deletions

View File

@ -1,11 +1,14 @@
import time
import sys
import os
import argparse
import tkinter as tk
from .core.network import UdpHandler
from .core.dispatcher import MessageDispatcher
from .core.scheduler import TrafficScheduler
from .core.controller import RadarController
from .utils.printer import dump_message
from .gui.main_window import BusMonitorApp
#Configuration
RX_IP = os.getenv("PYBM_RX_IP", "127.0.0.1")
@ -13,9 +16,11 @@ RX_PORT = int(os.getenv("PYBM_RX_PORT", str(61553)))
TARGET_IP = os.getenv("PYBM_TARGET_IP", "127.0.0.1")
TARGET_PORT = int(os.getenv("PYBM_TARGET_PORT", "51553"))
def main():
def main_cli():
"""Command-line interface mode (original behavior)"""
print("--------------------------------------------------")
print(" PyBusMonitor1553 - Active Controller")
print(" PyBusMonitor1553 - Active Controller (CLI Mode)")
print("--------------------------------------------------")
# 1. Initialize Components
@ -23,7 +28,6 @@ def main():
network = UdpHandler(rx_ip=RX_IP, rx_port=RX_PORT)
# 2. Initialize Radar Logic Controller
# This sets up the default A1, A2, etc. messages
radar_ctrl = RadarController()
# 3. Initialize Scheduler with the Controller
@ -34,7 +38,6 @@ def main():
header, messages = dispatcher.parse_packet(data)
if messages:
# Filter for B-messages (RT -> BC, IS_TRANSMIT=True)
for msg in messages:
if msg.IS_TRANSMIT:
print(f"\n[RX] {msg.__class__.__name__} from RT{header.ta if header else '?'}")
@ -45,8 +48,6 @@ def main():
# 5. Start everything
network.register_callback(on_packet)
network.start()
# Send defaults immediately via scheduler loop
scheduler.start()
print(f"System Running.")
@ -55,7 +56,6 @@ def main():
try:
while True:
# Here we could accept user input to call radar_ctrl.set_master_mode(...)
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")
@ -64,5 +64,144 @@ def main():
network.stop()
sys.exit(0)
def main_gui():
"""Graphical user interface mode"""
# Enable quiet mode for scheduler
from .core import scheduler as sched_module
sched_module.QUIET_MODE = True
print("--------------------------------------------------")
print(" PyBusMonitor1553 - Active Controller (GUI Mode)")
print("--------------------------------------------------")
# Initialize Tkinter
root = tk.Tk()
app = BusMonitorApp(root)
# Initialize Components
dispatcher = MessageDispatcher()
network = UdpHandler(rx_ip=RX_IP, rx_port=RX_PORT)
radar_ctrl = RadarController()
scheduler = TrafficScheduler(network, radar_ctrl, TARGET_IP, TARGET_PORT)
# Counters for status bar
tx_count = [0]
rx_count = [0]
# Define the callback for received messages
def on_packet(data, addr):
header, messages = dispatcher.parse_packet(data)
rx_count[0] += 1
if messages:
for msg in messages:
if msg.IS_TRANSMIT:
msg_name = msg.__class__.__name__
# Convert MsgB6 -> B6, MsgA1 -> A1, etc.
short_name = msg_name.replace("Msg", "")
# Extract raw words if available
raw_words = []
if hasattr(msg, '_raw_data') and msg._raw_data:
import struct
data_bytes = msg._raw_data
for i in range(0, len(data_bytes), 2):
if i + 1 < len(data_bytes):
word = struct.unpack('<H', data_bytes[i:i+2])[0]
raw_words.append(word)
# Update GUI (thread-safe via after())
root.after(0, lambda n=short_name, m=msg, r=raw_words:
app.update_message_stats(n, m, r))
# Update status bar
root.after(0, lambda: app.update_connection_status(
True, tx_count[0], rx_count[0], ""
))
# Wrap scheduler send to count TX
original_send = network.send
def counted_send(data, ip, port):
tx_count[0] += 1
return original_send(data, ip, port)
network.send = counted_send
# Wire up menu actions
def on_connect():
network.start()
scheduler.start()
app.update_connection_status(True, tx_count[0], rx_count[0], "")
def on_disconnect():
scheduler.stop()
network.stop()
app.update_connection_status(False, tx_count[0], rx_count[0], "")
app._on_connect = on_connect
app._on_disconnect = on_disconnect
# Start networking
network.register_callback(on_packet)
network.start()
scheduler.start()
# Update initial status
app.update_connection_status(True, 0, 0, "")
print(f"GUI Running.")
print(f"RX: {RX_IP}:{RX_PORT} | TX: {TARGET_IP}:{TARGET_PORT}")
# Handle window close
def on_closing():
scheduler.stop()
network.stop()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
# Run the GUI main loop
try:
root.mainloop()
except KeyboardInterrupt:
on_closing()
def main():
"""Main entry point with mode selection"""
global TARGET_IP, TARGET_PORT
parser = argparse.ArgumentParser(
description='PyBusMonitor1553 - MIL-STD-1553 Bus Monitor',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
python -m pybusmonitor1553 # Launch GUI mode (default)
python -m pybusmonitor1553 --cli # Launch CLI mode
python -m pybusmonitor1553 --gui # Launch GUI mode explicitly
'''
)
parser.add_argument('--cli', action='store_true',
help='Run in command-line interface mode')
parser.add_argument('--gui', action='store_true',
help='Run in graphical user interface mode (default)')
parser.add_argument('--target', type=str, default=None,
help=f'Target IP address (default: {TARGET_IP})')
parser.add_argument('--port', type=int, default=None,
help=f'Target port (default: {TARGET_PORT})')
args = parser.parse_args()
# Update globals if specified
if args.target:
TARGET_IP = args.target
if args.port:
TARGET_PORT = args.port
if args.cli:
main_cli()
else:
main_gui()
if __name__ == "__main__":
main()

View File

@ -7,6 +7,9 @@ from .packet_builder_simple import SimplePacketBuilder
# Debug flag - set to True for detailed packet logging
DEBUG_PACKETS = False
# Quiet mode - set to True to suppress scheduler output (for GUI mode)
QUIET_MODE = False
# Use simple format (like qg1553overudp.cpp) instead of full UDP1553 protocol
# Set to False to use the full UDP1553 protocol (like avddriverudp.cpp)
USE_SIMPLE_FORMAT = False # Server expects 0x1553 marker, so use full format
@ -79,6 +82,7 @@ class TrafficScheduler:
"""Sends all messages in a single frame using full UDP1553 format."""
pkt = self.builder.build_frame(messages)
self.udp.send(pkt, self.target_ip, self.target_port)
if not QUIET_MODE:
msg_names = [m.__class__.__name__ for m in messages]
print(f"[Scheduler] Sent UDP1553 FRAME [{', '.join(msg_names)}] -> {self.target_ip}:{self.target_port}")
if DEBUG_PACKETS:

View File

@ -0,0 +1,6 @@
"""
PyBusMonitor1553 GUI Module
"""
from .main_window import BusMonitorApp, run_gui
__all__ = ['BusMonitorApp', 'run_gui']

View File

@ -0,0 +1,626 @@
"""
PyBusMonitor1553 - Main GUI Window
Tkinter-based interface for MIL-STD-1553 Bus Monitoring
"""
import tkinter as tk
from tkinter import ttk
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from collections import OrderedDict
from ..lib1553.fields import Field
@dataclass
class MessageStats:
"""Statistics for a single message type"""
name: str
cw: str # Command Word formatted string
sw: str # Status Word formatted string
count: int = 0
errors: int = 0
last_update: float = 0.0
period_ms: float = 0.0
word_count: int = 0
is_transmit: bool = False # True = RT->BC (B-msg), False = BC->RT (A-msg)
last_data: Optional[Any] = None
raw_words: List[int] = field(default_factory=list)
class BusMonitorApp:
"""Main application window for PyBusMonitor1553"""
def __init__(self, root: tk.Tk):
self.root = root
self.root.title("PyBusMonitor1553 - MIL-STD-1553 Bus Monitor")
self.root.geometry("1200x800")
self.root.minsize(800, 600)
# Message statistics storage
self.message_stats: Dict[str, MessageStats] = OrderedDict()
self._init_message_stats()
# Selected message for detail view
self.selected_message: Optional[str] = None
# Connection status
self.is_connected = False
self.tx_count = 0
self.rx_count = 0
self.last_error = ""
# Build UI
self._create_menu()
self._create_main_layout()
self._create_status_bar()
# Update timer
self._update_id = None
def _init_message_stats(self):
"""Initialize message statistics for all known message types"""
# A-messages (BC -> RT, Receive by RT)
a_messages = [
("A1", 1, False, 10), # Settings
("A2", 2, False, 3), # Operation Command
("A3", 3, False, 32), # Graphics Command
("A4", 4, False, 31), # Navigation Data
("A5", 5, False, 23), # INU/GPS Data
("A6", 6, False, 31), # Weapon Aiming
("A7", 7, False, 31), # Reserved
("A8", 8, False, 30), # Reserved
]
# B-messages (RT -> BC, Transmit by RT)
b_messages = [
("B1", 11, True, 29), # Target Report #1
("B2", 12, True, 27), # Target Report #2
("B3", 13, True, 27), # Target Report #3
("B4", 14, True, 27), # Terrain Avoidance
("B5", 15, True, 27), # Reserved
("B6", 16, True, 23), # Settings Tell-Back
("B7", 17, True, 3), # Status Tell-Back
("B8", 18, True, 32), # Reserved
]
for name, sa, is_tx, wc in a_messages + b_messages:
direction = "T" if is_tx else "R"
cw_str = f"20-{direction}-{sa}-{wc}"
self.message_stats[name] = MessageStats(
name=name,
cw=cw_str,
sw="0-0-0",
word_count=wc,
is_transmit=is_tx
)
def _create_menu(self):
"""Create application menu bar"""
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# File menu
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="File", menu=file_menu)
file_menu.add_command(label="Connect", command=self._on_connect)
file_menu.add_command(label="Disconnect", command=self._on_disconnect)
file_menu.add_separator()
file_menu.add_command(label="Exit", command=self.root.quit)
# View menu
view_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="View", menu=view_menu)
view_menu.add_command(label="Reset Counters", command=self._reset_counters)
# Help menu
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Help", menu=help_menu)
help_menu.add_command(label="About", command=self._show_about)
def _create_main_layout(self):
"""Create the main application layout with notebook tabs"""
# Main container with PanedWindow for resizable split
self.main_paned = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)
self.main_paned.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Left panel - Message list
left_frame = ttk.Frame(self.main_paned)
self.main_paned.add(left_frame, weight=1)
# Right panel - Notebook with tabs
right_frame = ttk.Frame(self.main_paned)
self.main_paned.add(right_frame, weight=2)
# Create message list (left side)
self._create_message_list(left_frame)
# Create notebook tabs (right side)
self._create_notebook(right_frame)
def _create_message_list(self, parent):
"""Create the message list TreeView (like GrifoScope)"""
# Header
header = ttk.Label(parent, text="1553 Bus Messages", font=('Helvetica', 11, 'bold'))
header.pack(pady=(0, 5))
# TreeView with scrollbar
tree_frame = ttk.Frame(parent)
tree_frame.pack(fill=tk.BOTH, expand=True)
columns = ('name', 'cw', 'sw', 'count', 'errs', 'period', 'wc')
self.msg_tree = ttk.Treeview(tree_frame, columns=columns, show='headings',
selectmode='browse')
# Column headers
self.msg_tree.heading('name', text='Name')
self.msg_tree.heading('cw', text='CW')
self.msg_tree.heading('sw', text='SW')
self.msg_tree.heading('count', text='Num')
self.msg_tree.heading('errs', text='Errs')
self.msg_tree.heading('period', text='Period')
self.msg_tree.heading('wc', text='WC')
# Column widths
self.msg_tree.column('name', width=50, anchor='center')
self.msg_tree.column('cw', width=90, anchor='center')
self.msg_tree.column('sw', width=70, anchor='center')
self.msg_tree.column('count', width=60, anchor='center')
self.msg_tree.column('errs', width=50, anchor='center')
self.msg_tree.column('period', width=70, anchor='center')
self.msg_tree.column('wc', width=40, anchor='center')
# Scrollbar
scrollbar = ttk.Scrollbar(tree_frame, orient=tk.VERTICAL, command=self.msg_tree.yview)
self.msg_tree.configure(yscrollcommand=scrollbar.set)
self.msg_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Bind selection event
self.msg_tree.bind('<<TreeviewSelect>>', self._on_message_select)
# Configure row tags for coloring
self.msg_tree.tag_configure('a_msg', background='#E8F4E8') # Light green for A-msgs
self.msg_tree.tag_configure('b_msg', background='#E8E8F4') # Light blue for B-msgs
self.msg_tree.tag_configure('error', foreground='red')
# Populate initial data
self._populate_message_list()
def _populate_message_list(self):
"""Populate the message list with initial data"""
for name, stats in self.message_stats.items():
tag = 'b_msg' if stats.is_transmit else 'a_msg'
period_str = f"{stats.period_ms:.1f}" if stats.period_ms > 0 else "-"
self.msg_tree.insert('', 'end', iid=name, values=(
name,
stats.cw,
stats.sw,
stats.count,
stats.errors,
period_str,
stats.word_count
), tags=(tag,))
def _create_notebook(self, parent):
"""Create the right-side notebook with tabs"""
self.notebook = ttk.Notebook(parent)
self.notebook.pack(fill=tk.BOTH, expand=True)
# Tab 1: Message Detail
self.detail_frame = ttk.Frame(self.notebook)
self.notebook.add(self.detail_frame, text="Message Detail")
self._create_detail_tab(self.detail_frame)
# Tab 2: Radar Status Dashboard
self.dashboard_frame = ttk.Frame(self.notebook)
self.notebook.add(self.dashboard_frame, text="Radar Status")
self._create_dashboard_tab(self.dashboard_frame)
# Tab 3: Raw Data
self.raw_frame = ttk.Frame(self.notebook)
self.notebook.add(self.raw_frame, text="Raw Data")
self._create_raw_tab(self.raw_frame)
def _create_detail_tab(self, parent):
"""Create the message detail view tab"""
# Header with selected message name
self.detail_header = ttk.Label(parent, text="Select a message to view details",
font=('Helvetica', 12, 'bold'))
self.detail_header.pack(pady=10)
# Scrollable frame for fields
canvas = tk.Canvas(parent)
scrollbar = ttk.Scrollbar(parent, orient="vertical", command=canvas.yview)
self.detail_scroll_frame = ttk.Frame(canvas)
self.detail_scroll_frame.bind(
"<Configure>",
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
)
canvas.create_window((0, 0), window=self.detail_scroll_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# Store reference to canvas for updates
self.detail_canvas = canvas
# Dictionary to hold field labels for updating
self.detail_field_labels: Dict[str, ttk.Label] = {}
def _create_dashboard_tab(self, parent):
"""Create the radar status dashboard tab"""
# Title
title = ttk.Label(parent, text="Radar Status Dashboard", font=('Helvetica', 14, 'bold'))
title.pack(pady=10)
# Main frame with grid
dashboard = ttk.Frame(parent)
dashboard.pack(fill=tk.BOTH, expand=True, padx=20, pady=10)
# Left column - Mode and Status
left_col = ttk.LabelFrame(dashboard, text="Radar Mode", padding=10)
left_col.grid(row=0, column=0, padx=10, pady=5, sticky='nsew')
self.dashboard_labels = {}
# Mode indicator
ttk.Label(left_col, text="Master Mode:").grid(row=0, column=0, sticky='e', padx=5)
self.dashboard_labels['mode'] = ttk.Label(left_col, text="---", font=('Helvetica', 11, 'bold'))
self.dashboard_labels['mode'].grid(row=0, column=1, sticky='w', padx=5)
ttk.Label(left_col, text="Standby:").grid(row=1, column=0, sticky='e', padx=5)
self.dashboard_labels['standby'] = ttk.Label(left_col, text="---")
self.dashboard_labels['standby'].grid(row=1, column=1, sticky='w', padx=5)
ttk.Label(left_col, text="RF Radiation:").grid(row=2, column=0, sticky='e', padx=5)
self.dashboard_labels['rf'] = ttk.Label(left_col, text="---")
self.dashboard_labels['rf'].grid(row=2, column=1, sticky='w', padx=5)
ttk.Label(left_col, text="Transition:").grid(row=3, column=0, sticky='e', padx=5)
self.dashboard_labels['transition'] = ttk.Label(left_col, text="---")
self.dashboard_labels['transition'].grid(row=3, column=1, sticky='w', padx=5)
# Right column - Health
right_col = ttk.LabelFrame(dashboard, text="Health Status", padding=10)
right_col.grid(row=0, column=1, padx=10, pady=5, sticky='nsew')
health_items = [
('radar_failed', 'Radar'),
('array_failed', 'Array'),
('transmitter_failed', 'Transmitter'),
('receiver_failed', 'Receiver'),
('processor_failed', 'Processor'),
('tx_overtemp', 'TX Overtemp'),
]
for i, (key, label) in enumerate(health_items):
ttk.Label(right_col, text=f"{label}:").grid(row=i, column=0, sticky='e', padx=5)
self.dashboard_labels[key] = ttk.Label(right_col, text="---", width=8)
self.dashboard_labels[key].grid(row=i, column=1, sticky='w', padx=5)
# Bottom - Scan Settings
bottom_col = ttk.LabelFrame(dashboard, text="Scan Parameters", padding=10)
bottom_col.grid(row=1, column=0, columnspan=2, padx=10, pady=5, sticky='nsew')
scan_items = [
('range_scale', 'Range Scale'),
('bar_scan', 'Bars'),
('azimuth_scan', 'Az Scan'),
]
for i, (key, label) in enumerate(scan_items):
ttk.Label(bottom_col, text=f"{label}:").grid(row=0, column=i*2, sticky='e', padx=5)
self.dashboard_labels[key] = ttk.Label(bottom_col, text="---", width=10)
self.dashboard_labels[key].grid(row=0, column=i*2+1, sticky='w', padx=5)
dashboard.columnconfigure(0, weight=1)
dashboard.columnconfigure(1, weight=1)
def _create_raw_tab(self, parent):
"""Create the raw data view tab"""
# Text widget for raw hex data
self.raw_text = tk.Text(parent, font=('Courier', 10), wrap=tk.WORD)
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.raw_text.yview)
self.raw_text.configure(yscrollcommand=scrollbar.set)
self.raw_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.raw_text.insert('1.0', "Select a message to view raw data...")
self.raw_text.configure(state='disabled')
def _create_status_bar(self):
"""Create the status bar at the bottom"""
status_frame = ttk.Frame(self.root)
status_frame.pack(side=tk.BOTTOM, fill=tk.X)
# Connection status
self.status_connection = ttk.Label(status_frame, text="● Disconnected", foreground='red')
self.status_connection.pack(side=tk.LEFT, padx=10)
# TX/RX counters
self.status_tx = ttk.Label(status_frame, text="TX: 0")
self.status_tx.pack(side=tk.LEFT, padx=10)
self.status_rx = ttk.Label(status_frame, text="RX: 0")
self.status_rx.pack(side=tk.LEFT, padx=10)
# Error display
self.status_error = ttk.Label(status_frame, text="", foreground='red')
self.status_error.pack(side=tk.RIGHT, padx=10)
# Separator
ttk.Separator(self.root, orient=tk.HORIZONTAL).pack(side=tk.BOTTOM, fill=tk.X)
def _on_message_select(self, event):
"""Handle message selection in the tree view"""
selection = self.msg_tree.selection()
if selection:
msg_name = selection[0]
self.selected_message = msg_name
self._update_detail_view(msg_name)
self._update_raw_view(msg_name)
def _update_detail_view(self, msg_name: str):
"""Update the detail view for the selected message"""
stats = self.message_stats.get(msg_name)
if not stats:
return
# Update header
direction = "RT → BC" if stats.is_transmit else "BC → RT"
self.detail_header.config(text=f"{msg_name} - {direction}")
# Clear existing fields
for widget in self.detail_scroll_frame.winfo_children():
widget.destroy()
self.detail_field_labels.clear()
# If we have message data, show its fields
if stats.last_data:
msg_obj = stats.last_data
cls = msg_obj.__class__
# Get all Field descriptors
fields = []
for name, obj in cls.__dict__.items():
if isinstance(obj, Field):
fields.append((name, obj))
# Sort by word_index then start_bit
fields.sort(key=lambda x: (x[1].word_index, x[1].start_bit))
# Create field displays
current_word = -1
for field_name, field_desc in fields:
# Add word separator
if field_desc.word_index != current_word:
current_word = field_desc.word_index
sep = ttk.Separator(self.detail_scroll_frame, orient=tk.HORIZONTAL)
sep.pack(fill=tk.X, pady=2)
word_label = ttk.Label(self.detail_scroll_frame,
text=f"Word {current_word:02d}",
font=('Helvetica', 9, 'bold'))
word_label.pack(anchor='w', padx=5)
# Field row
row_frame = ttk.Frame(self.detail_scroll_frame)
row_frame.pack(fill=tk.X, padx=10, pady=1)
name_label = ttk.Label(row_frame, text=f"{field_name}:", width=30, anchor='e')
name_label.pack(side=tk.LEFT)
try:
value = getattr(msg_obj, field_name)
if isinstance(value, float):
val_str = f"{value:.4f}"
elif hasattr(value, 'name'): # Enum
val_str = f"{value.name} ({value.value})"
else:
val_str = str(value)
except Exception as e:
val_str = f"<Error: {e}>"
value_label = ttk.Label(row_frame, text=val_str, width=30, anchor='w')
value_label.pack(side=tk.LEFT, padx=5)
self.detail_field_labels[field_name] = value_label
else:
no_data = ttk.Label(self.detail_scroll_frame,
text="No data received yet for this message",
font=('Helvetica', 10, 'italic'))
no_data.pack(pady=20)
def _update_raw_view(self, msg_name: str):
"""Update the raw data view for the selected message"""
stats = self.message_stats.get(msg_name)
if not stats:
return
self.raw_text.configure(state='normal')
self.raw_text.delete('1.0', tk.END)
text = f"Message: {msg_name}\n"
text += f"Command Word: {stats.cw}\n"
text += f"Status Word: {stats.sw}\n"
text += f"Word Count: {stats.word_count}\n"
text += f"Message Count: {stats.count}\n"
text += f"Errors: {stats.errors}\n\n"
if stats.raw_words:
text += "Raw Data Words (hex):\n"
text += "-" * 40 + "\n"
for i, word in enumerate(stats.raw_words):
text += f" Word {i:02d}: 0x{word:04X} ({word:5d})\n"
else:
text += "No raw data available.\n"
self.raw_text.insert('1.0', text)
self.raw_text.configure(state='disabled')
def _update_dashboard(self, b6_data, b7_data):
"""Update the radar dashboard with B6/B7 data"""
if b7_data:
# Mode
mode = getattr(b7_data, 'master_mode_tb', None)
if mode:
self.dashboard_labels['mode'].config(
text=mode.name if hasattr(mode, 'name') else str(mode)
)
# Standby
stby = getattr(b7_data, 'standby_status', None)
if stby is not None:
text = stby.name if hasattr(stby, 'name') else str(stby)
color = 'orange' if 'ON' in text.upper() else 'green'
self.dashboard_labels['standby'].config(text=text, foreground=color)
# RF Radiation
rf = getattr(b7_data, 'rf_radiation_status', None)
if rf is not None:
text = rf.name if hasattr(rf, 'name') else str(rf)
color = 'green' if 'ON' in text.upper() else 'gray'
self.dashboard_labels['rf'].config(text=text, foreground=color)
# Transition
trans = getattr(b7_data, 'transition_status', None)
if trans is not None:
text = trans.name if hasattr(trans, 'name') else str(trans)
self.dashboard_labels['transition'].config(text=text)
# Scan parameters
range_s = getattr(b7_data, 'range_scale_tb', None)
if range_s:
self.dashboard_labels['range_scale'].config(
text=range_s.name if hasattr(range_s, 'name') else str(range_s)
)
bar_s = getattr(b7_data, 'bar_scan_tb', None)
if bar_s:
self.dashboard_labels['bar_scan'].config(
text=bar_s.name if hasattr(bar_s, 'name') else str(bar_s)
)
az_s = getattr(b7_data, 'azimuth_scan_tb', None)
if az_s:
self.dashboard_labels['azimuth_scan'].config(
text=az_s.name if hasattr(az_s, 'name') else str(az_s)
)
if b6_data:
# Health status from B6
health_fields = [
'radar_failed', 'array_failed', 'transmitter_failed',
'receiver_failed', 'processor_failed', 'tx_overtemp'
]
for field in health_fields:
value = getattr(b6_data, field, None)
if value is not None:
if value == 0:
self.dashboard_labels[field].config(text="OK", foreground='green')
else:
self.dashboard_labels[field].config(text="FAIL", foreground='red')
def update_message_stats(self, msg_name: str, msg_obj, raw_words: List[int] = None):
"""Update statistics for a received message"""
if msg_name not in self.message_stats:
return
stats = self.message_stats[msg_name]
now = time.time()
# Calculate period
if stats.last_update > 0:
stats.period_ms = (now - stats.last_update) * 1000
stats.last_update = now
stats.count += 1
stats.last_data = msg_obj
if raw_words:
stats.raw_words = raw_words
# Update tree view
period_str = f"{stats.period_ms:.1f}" if stats.period_ms > 0 else "-"
self.msg_tree.item(msg_name, values=(
msg_name,
stats.cw,
stats.sw,
stats.count,
stats.errors,
period_str,
stats.word_count
))
# Update detail view if this message is selected
if self.selected_message == msg_name:
self._update_detail_view(msg_name)
self._update_raw_view(msg_name)
# Update dashboard for B6/B7
if msg_name == 'B6':
self._update_dashboard(msg_obj, None)
elif msg_name == 'B7':
self._update_dashboard(None, msg_obj)
def update_connection_status(self, connected: bool, tx: int = 0, rx: int = 0, error: str = ""):
"""Update the status bar"""
self.is_connected = connected
self.tx_count = tx
self.rx_count = rx
self.last_error = error
if connected:
self.status_connection.config(text="● Connected", foreground='green')
else:
self.status_connection.config(text="● Disconnected", foreground='red')
self.status_tx.config(text=f"TX: {tx}")
self.status_rx.config(text=f"RX: {rx}")
self.status_error.config(text=error)
def _on_connect(self):
"""Handle connect menu action"""
# This will be wired up to the actual network handler
pass
def _on_disconnect(self):
"""Handle disconnect menu action"""
pass
def _reset_counters(self):
"""Reset all message counters"""
for name, stats in self.message_stats.items():
stats.count = 0
stats.errors = 0
stats.period_ms = 0.0
stats.last_update = 0.0
self.msg_tree.item(name, values=(
name, stats.cw, stats.sw, 0, 0, "-", stats.word_count
))
def _show_about(self):
"""Show about dialog"""
from tkinter import messagebox
messagebox.showinfo(
"About PyBusMonitor1553",
"PyBusMonitor1553\n\n"
"MIL-STD-1553 Bus Monitor\n"
"Version 1.0\n\n"
"Python implementation for GRIFO-F radar interface"
)
def run_gui():
"""Launch the GUI application"""
root = tk.Tk()
app = BusMonitorApp(root)
return root, app

View File

@ -28,6 +28,9 @@ class MessageBase:
# Note: 1553 messages max length is 32 words.
self._data = [0] * 32
# Store raw bytes for GUI display
self._raw_data = None
if raw_bytes:
self.unpack(raw_bytes)
@ -60,6 +63,9 @@ class MessageBase:
if not raw_bytes:
return
# Store raw bytes for GUI display
self._raw_data = raw_bytes
# Calculate how many words we can read
num_bytes = len(raw_bytes)
num_words = num_bytes // 2