SXXXXXXX_PyBusMonitor1553/pybusmonitor1553/core/dispatcher.py
2025-12-09 15:52:18 +01:00

94 lines
3.5 KiB
Python

import ctypes
from ..lib1553.headers import UDP1553Header, UDP1553MessageHeader, CommandWordUnion
from ..lib1553 import messages as msgs
class MessageDispatcher:
"""
Parses raw UDP packets into high-level 1553 Message objects.
"""
def __init__(self):
# Registry: Map (Subaddress, IsTransmit) -> MessageClass
self._registry = {}
self._init_registry()
def _init_registry(self):
"""
Dynamically registers all message classes defined in lib1553.messages.
Uses the SUBADDRESS and IS_TRANSMIT constants defined in each class.
"""
# List of all message classes imported
message_classes = [
msgs.MsgA1, msgs.MsgA2, msgs.MsgA3, msgs.MsgA4,
msgs.MsgA5, msgs.MsgA6, msgs.MsgA7, msgs.MsgA8,
msgs.MsgB1, msgs.MsgB2, msgs.MsgB3, msgs.MsgB4,
msgs.MsgB5, msgs.MsgB6, msgs.MsgB7, msgs.MsgB8,
msgs.MsgB9, msgs.MsgB10, msgs.MsgB11
]
for cls in message_classes:
key = (cls.SUBADDRESS, cls.IS_TRANSMIT)
self._registry[key] = cls
# Debug log
# dir_str = "TX" if cls.IS_TRANSMIT else "RX"
# print(f"[Dispatcher] Registered {cls.__name__}: SA={cls.SUBADDRESS} Dir={dir_str}")
def parse_packet(self, raw_data):
"""
Decodes a raw byte buffer.
Args:
raw_data (bytes): The full UDP payload.
Returns:
tuple: (header_obj, message_obj) or (None, None) if invalid.
"""
# 1. Check minimum size (UDP Header + 1553 Header)
min_size = ctypes.sizeof(UDP1553Header) + ctypes.sizeof(UDP1553MessageHeader)
if len(raw_data) < min_size:
print(f"[Dispatcher] Packet too short: {len(raw_data)}")
return None, None
# 2. Parse Proprietary UDP Header
udp_header_size = ctypes.sizeof(UDP1553Header)
udp_header_bytes = raw_data[:udp_header_size]
udp_header = UDP1553Header.from_buffer_copy(udp_header_bytes)
# Validate Marker
if udp_header.marker1553 != UDP1553Header.MARKER_1553:
print(f"[Dispatcher] Invalid Magic Number: {hex(udp_header.marker1553)}")
return None, None
# 3. Parse 1553 Message Header
msg_header_start = udp_header_size
msg_header_end = udp_header_size + ctypes.sizeof(UDP1553MessageHeader)
msg_header_bytes = raw_data[msg_header_start:msg_header_end]
msg_header = UDP1553MessageHeader.from_buffer_copy(msg_header_bytes)
# 4. Extract logic info from Command Word
# Note: CommandWordUnion allows bit-field access
cw = msg_header.command_word.struct
subaddress = cw.subaddress
is_transmit = (cw.tr_bit == 1)
# 5. Find the corresponding class
key = (subaddress, is_transmit)
msg_class = self._registry.get(key)
if not msg_class:
dir_str = "TX" if is_transmit else "RX"
print(f"[Dispatcher] Unknown Message: SA={subaddress} Dir={dir_str}")
return udp_header, None
# 6. Extract Data Words
# The data starts after the 1553 Message Header
data_bytes = raw_data[msg_header_end:]
# Instantiate the message class with the data
try:
message_obj = msg_class(data_bytes)
return udp_header, message_obj
except Exception as e:
print(f"[Dispatcher] Error parsing {msg_class.__name__}: {e}")
return udp_header, None