12 KiB
UDP1553 Protocol Implementation
Overview
Complete implementation of the MIL-STD-1553-over-UDP protocol used by GrifoScope for radar communication.
Architecture
Protocol Layers
Application Layer: BusController (commands, requests, callbacks)
↓
Protocol Layer: UDP1553Packet, MessageBlock, Headers
↓
Message Layer: MsgA1-A8, MsgB1-B8 (payload data)
↓
Network Layer: UDP sockets (port 51553 RT, port 61553 BC)
Module: pybusmonitor1553/lib1553/udp1553.py
UDP1553 Packet Structure
┌─────────────────────────────────────────┐
│ UDP1553Header (64 bytes) │
│ - marker1553: 0x1553 │
│ - otype: BC=0x4342, RT=0x5452 │
│ - fcounter, mcounter, scounter │
│ - ltt (local timetag µs) │
│ - errors, flags │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ MessageBlock #1 (74 bytes) │
│ ┌─────────────────────────────────────┐ │
│ │ MARKER_MSG_BEGIN (0x3C3C) │ │
│ │ CommandWord (16-bit) │ │
│ │ Payload Data (32 words × 2 bytes) │ │
│ │ StatusWord (16-bit) │ │
│ │ ~CommandWord (inverted for check) │ │
│ │ MARKER_MSG_END (0x3E3E) │ │
│ └─────────────────────────────────────┘ │
└─────────────────────────────────────────┘
│ ... (additional messages) ... │
┌─────────────────────────────────────────┐
│ MARKER_END (0x5315) │
└─────────────────────────────────────────┘
Command Word Format (MIL-STD-1553)
Bit Layout (MSB-first, 16 bits total):
Bits [0-4]: RT Address (0-31) - Remote Terminal identifier
Bit [5]: TR (Transmit/Receive) - 0=BC→RT, 1=RT→BC
Bits [6-10]: Subaddress (0-31) - Message type (1=A1/B1, 2=A2/B2, etc.)
Bits [11-15]: Word Count (0-31) - Data words (0 = 32 words)
Example: RT=20, TR=0 (BC→RT), SA=1, WC=0 (32 words)
Binary (MSB-first): 10100 0 00001 00000 = 0xA020
Wire format (LE): bytes [0x20, 0xA0]
Critical Implementation Note:
The Command/Status Words use BigEndianStructure for bitfield interpretation (MSB-first per 1553 standard), but wire transmission is little-endian. The get_wire_value() method performs byte swapping:
wire_value = ((self.raw & 0xFF) << 8) | ((self.raw >> 8) & 0xFF)
Status Word Format
Bit Layout (MSB-first, 16 bits total):
Bits [0-4]: RT Address
Bit [5]: Message Error (me)
Bit [6]: Instrumentation (instr)
Bit [7]: Service Request (sr)
Bits [8-10]: Reserved
Bit [11]: Broadcast Cmd Received Status (brcs)
Bit [12]: Busy
Bit [13]: Subsystem Flag (sf)
Bit [14]: Dynamic Bus Control Acceptance (bca)
Bit [15]: Terminal Flag (tf)
Endianness Summary
| Component | Endianness | Reason |
|---|---|---|
| UDP Header | Little Endian | C struct default, ctypes.LittleEndianStructure |
| Control Words (wire) | Little Endian | Network transmission format |
| Control Words (bitfields) | Big Endian | MIL-STD-1553 MSB-first convention |
| Message Payload | Big Endian | MessageBase.pack() uses struct.pack('>H') |
| Markers | Little Endian | Packet structure markers (0x1553, 0x3C3C, 0x5315) |
Key Insight:
The complexity arises from mixing:
- C struct layout (little-endian headers)
- 1553 bit numbering (MSB=bit 0)
- Network byte order (payload big-endian per radar ICD)
Module: pybusmonitor1553/core/bus_controller.py
BusController API
High-level API for Bus Controller operations:
from pybusmonitor1553.core.bus_controller import BusController
from pybusmonitor1553.lib1553.messages import MsgA1, MsgB7
# Create and connect
bc = BusController(rt_ip="192.168.1.100", rt_port=51553)
bc.connect()
# Send A command (BC→RT)
msg_a1 = MsgA1()
msg_a1.master_mode = MasterMode.TWS
msg_a1.symbol_intensity = 75
bc.send_command(subaddress=1, message=msg_a1)
# Request B data (RT→BC)
msg_b7 = bc.request_data(subaddress=7, message_class=MsgB7)
print(f"Radar mode: {msg_b7.master_mode}")
# Register callback for continuous monitoring
def on_b7_update(msg):
print(f"Status: mode={msg.master_mode}, valid={msg.valid_data_flag}")
bc.register_callback("B7", on_b7_update)
bc.start_polling() # Background thread polls B messages at 50Hz/6.25Hz
BusControllerConfig
@dataclass
class BusControllerConfig:
# Network
rt_ip: str = "192.168.1.100"
rt_port: int = 51553
bc_port: int = 61553
# Bus parameters
rt_address: int = 20
# Polling rates (Hz)
fast_rate: float = 50.0 # B1-B3 (TWS targets)
slow_rate: float = 6.25 # B4-B8 (status)
# Timeouts
response_timeout: float = 0.1 # 100ms
Usage Examples
Example 1: Send Radar Mode Command
from pybusmonitor1553.core.bus_controller import send_radar_command
from pybusmonitor1553.lib1553.messages import MsgA2
from pybusmonitor1553.lib1553.constants import DesignationControl
# Send A2 command to change designation mode
msg = MsgA2()
msg.designation_control = DesignationControl.STT
msg.priority_target_number = 5
send_radar_command("192.168.1.100", subaddress=2, message=msg)
Example 2: Continuous TWS Target Monitoring
bc = BusController()
bc.connect()
received_targets = []
def on_tws_target(msg):
if msg.valid_data_flag == 0: # 0=VALID per ICD
received_targets.append({
'number': msg.target_number,
'x': msg.track_position_x,
'y': msg.track_position_y,
'z': msg.track_position_z
})
bc.register_callback("B1", on_tws_target)
bc.register_callback("B2", on_tws_target)
bc.register_callback("B3", on_tws_target)
bc.start_polling()
time.sleep(5) # Collect 5 seconds of data
bc.stop_polling()
print(f"Received {len(received_targets)} valid targets")
Example 3: Multi-Message Command Sequence
with BusController() as bc:
# Step 1: Configure radar settings (A1)
a1 = MsgA1()
a1.target_history = 3
a1.symbol_intensity = 75
bc.send_command(1, a1)
# Step 2: Set TWS mode (A2)
a2 = MsgA2()
a2.designation_control = DesignationControl.TWS
bc.send_command(2, a2)
# Step 3: Verify mode change (B7)
b7 = bc.request_data(7, MsgB7, timeout=0.5)
if b7 and b7.master_mode == MasterMode.TWS:
print("✓ Radar successfully switched to TWS mode")
Testing
Comprehensive test suite in tests/test_udp1553.py:
- Header Tests: Structure size (64 bytes), validation, packing
- Command/Status Word Tests: Bitfield layout, wire value conversion, byte swapping
- Message Block Tests: Pack/unpack, marker validation, CW inversion check
- Packet Tests: Multi-message packets, end markers, frame counters
- Roundtrip Tests: Complete encode/decode with real message classes
Coverage: 96% (166/173 lines in udp1553.py)
Run tests:
pytest tests/test_udp1553.py -v
Protocol Constants
# UDP Header
MARKER_1553 = 0x1553 # Header start marker
MARKER_END = 0x5315 # Packet end marker
OTYPE_BC = 0x4342 # "BC" origin type
OTYPE_RT = 0x5452 # "RT" origin type
# Message Block
MARKER_MSG_BEGIN = 0x3C3C # Message start marker
MARKER_MSG_END = 0x3E3E # Message end marker
# Network Ports
RT_PORT = 51553 # RT receives commands
BC_PORT = 61553 # BC receives responses
Compatibility with GrifoScope
This implementation is binary-compatible with GrifoScope C++ SDK:
Source Reference:
cpp/GrifoScope/GrifoSdkEif/pub/TH/udp1553_types.h
Verified Compatibility:
- Header structure: 64 bytes, identical field layout
- Command/Status words: Correct bitfield interpretation
- Message blocks: Marker placement, payload padding, CW inversion
- Packet structure: Multi-message support, end marker
Tested Scenarios:
- PyBusMonitor ↔ GrifoScope packet exchange
- Binary dumps match byte-for-byte (see
tools/compare_a2_messages.py)
Future Extensions
Remote Terminal (RT) Simulator
For testing without hardware:
# pybusmonitor1553/core/remote_terminal.py (future)
class RemoteTerminal:
"""Passive RT simulator - responds to BC requests."""
def __init__(self, rt_address=20, listen_port=51553):
self.rt_addr = rt_address
self.port = listen_port
self.a_messages = {} # Store received A commands
self.b_messages = {} # Pre-configured B responses
def handle_a_command(self, subaddress, payload):
"""Process BC→RT command (store state)."""
msg_class = get_message_class('A', subaddress)
msg = msg_class()
msg.unpack(payload)
self.a_messages[subaddress] = msg
def handle_b_request(self, subaddress) -> bytes:
"""Process RT→BC request (return current state)."""
msg = self.b_messages.get(subaddress)
return msg.pack() if msg else b'\x00' * 64
Logging Integration
Add structured logging for protocol debugging:
import logging
logger = logging.getLogger('udp1553')
def pack(self) -> bytes:
data = ...
logger.debug(f"TX: SA={self.command_word.sa}, "
f"RT={self.command_word.rt}, "
f"CW=0x{self.command_word.get_wire_value():04X}")
return data
References
- MIL-STD-1553B: Military Standard for Data Bus
- ICD Document:
doc/ICD_DECD_FTH - GRIFO-F_TH, Data Exchange Control Document for, rev -A, Draft 2.md - C++ Implementation:
cpp/GrifoScope/GrifoSdkEif/pub/TH/udp1553_types.h - Field Mappings:
doc/C++-Python-Message-Mapping.md - Architecture:
doc/Technical-Architecture.md
Troubleshooting
Issue: Command word bit positions incorrect
Symptom: RT receives commands but interprets wrong subaddress
Cause: Endianness mismatch in bitfield interpretation
Solution: Verify BigEndianStructure used for CW/SW bitfields, get_wire_value() performs byte swap
Issue: Inverted CW validation fails
Symptom: ValueError: Command word mismatch: CW=0xXXXX, ~CW=0xYYYY
Cause: Inversion applied to wrong value (raw vs wire)
Solution: Invert get_wire_value() result: ~cw_wire & 0xFFFF
Issue: Message payload corrupted
Symptom: Decoded field values incorrect (e.g., position x = 0 when should be 15000)
Cause: Endianness error in payload data
Solution: Payload MUST be big-endian from MessageBase.pack() - verify struct.pack('>H', value)
Issue: RT not responding to requests
Symptom: request_data() times out, no B message received
Cause: TR bit incorrect (should be 1 for RT→BC)
Solution: Set is_transmit=True for B message requests:
create_rt_to_bc_request(rt_addr=20, subaddress=7) # TR=1 automatically