128 lines
5.0 KiB
Python
128 lines
5.0 KiB
Python
import socket
|
|
import time
|
|
import ctypes
|
|
from typing import List
|
|
|
|
from ..lib1553.headers import UDP1553Header, UDP1553MessageHeader, CommandWordUnion
|
|
from ..lib1553.messages.msg_a1 import MsgA1
|
|
from ..lib1553.messages.msg_a2 import MsgA2
|
|
from ..lib1553.messages.msg_a3 import MsgA3
|
|
from ..lib1553.messages.msg_a4 import MsgA4
|
|
from ..lib1553.messages.msg_a5 import MsgA5
|
|
from ..lib1553.constants import TargetHistory, AltitudeBlock, RadarMode
|
|
|
|
|
|
def _struct_to_bytes(s) -> bytes:
|
|
return ctypes.string_at(ctypes.addressof(s), ctypes.sizeof(s))
|
|
|
|
|
|
def _create_packet(msg_obj) -> bytes:
|
|
"""Wrap a 1553 message into the proprietary UDP packet used by this project."""
|
|
|
|
# Build 1553 message header (UDP1553Message equivalent)
|
|
wc_val = 32 if len(msg_obj.data) == 32 else len(msg_obj.data)
|
|
if wc_val == 32:
|
|
wc_val = 0
|
|
|
|
cw_union = CommandWordUnion(rt_addr=1, sub_addr=msg_obj.SUBADDRESS, word_count=wc_val, transmit=msg_obj.IS_TRANSMIT)
|
|
msg_hdr = UDP1553MessageHeader(command_word_union=cw_union)
|
|
|
|
# The legacy implementation appends inverted CW and a control end marker (0x3E3E)
|
|
payload = msg_obj.pack()
|
|
|
|
# inverted cw (bitwise NOT of cw.raw) as uint16
|
|
inverted_cw = (~msg_hdr.command_word.raw) & 0xFFFF if hasattr(msg_hdr.command_word, 'raw') else (~cw_union.raw) & 0xFFFF
|
|
|
|
pkt = _struct_to_bytes(msg_hdr)
|
|
pkt += payload
|
|
# append inverted cw then control marker 0x3E3E
|
|
pkt += ctypes.string_at(ctypes.addressof(ctypes.c_uint16(inverted_cw)), ctypes.sizeof(ctypes.c_uint16))
|
|
pkt += ctypes.string_at(ctypes.addressof(ctypes.c_uint16(UDP1553MessageHeader.MARKER_END)), ctypes.sizeof(ctypes.c_uint16))
|
|
|
|
return pkt
|
|
|
|
|
|
|
|
_mcounter = 0
|
|
|
|
|
|
def perform_initialization(target_ip: str = "127.0.0.1", target_port: int = 51553, repeat: int = 1, delay: float = 0.05) -> List[bytes]:
|
|
"""Send the initial set of 1553 control messages to the server.
|
|
|
|
Default messages and field values mirror the legacy `test_1553.py` script.
|
|
|
|
Returns the list of raw packets that were sent.
|
|
"""
|
|
sent_packets = []
|
|
global _mcounter
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
for _ in range(max(1, int(repeat))):
|
|
# A1
|
|
a1 = MsgA1()
|
|
a1.target_history = TargetHistory.LEVEL_4
|
|
a1.symbol_intensity = 127
|
|
a1.beacon_delay = 12.5
|
|
a1.altitude_block = AltitudeBlock.TPBK
|
|
pkt = _create_packet(a1)
|
|
udp_header = UDP1553Header()
|
|
udp_header.mcounter = _mcounter
|
|
_mcounter += 1
|
|
header_bytes = _struct_to_bytes(udp_header)
|
|
final_pkt = header_bytes + pkt + ctypes.string_at(ctypes.addressof(ctypes.c_uint16(UDP1553Header.MARKER_END_1553)), ctypes.sizeof(ctypes.c_uint16))
|
|
sock.sendto(final_pkt, (target_ip, int(target_port)))
|
|
sent_packets.append(final_pkt)
|
|
time.sleep(delay)
|
|
|
|
# A2
|
|
a2 = MsgA2()
|
|
a2.master_mode = RadarMode.RWS
|
|
pkt = _create_packet(a2)
|
|
udp_header = UDP1553Header()
|
|
udp_header.mcounter = _mcounter
|
|
_mcounter += 1
|
|
final_pkt = _struct_to_bytes(udp_header) + pkt + ctypes.string_at(ctypes.addressof(ctypes.c_uint16(UDP1553Header.MARKER_END_1553)), ctypes.sizeof(ctypes.c_uint16))
|
|
sock.sendto(final_pkt, (target_ip, int(target_port)))
|
|
sent_packets.append(final_pkt)
|
|
time.sleep(delay)
|
|
|
|
# A3
|
|
a3 = MsgA3()
|
|
pkt = _create_packet(a3)
|
|
udp_header = UDP1553Header()
|
|
udp_header.mcounter = _mcounter
|
|
_mcounter += 1
|
|
final_pkt = _struct_to_bytes(udp_header) + pkt + ctypes.string_at(ctypes.addressof(ctypes.c_uint16(UDP1553Header.MARKER_END_1553)), ctypes.sizeof(ctypes.c_uint16))
|
|
sock.sendto(final_pkt, (target_ip, int(target_port)))
|
|
sent_packets.append(final_pkt)
|
|
time.sleep(delay)
|
|
|
|
# A4
|
|
a4 = MsgA4()
|
|
a4.validity_and_slew = 0
|
|
pkt = _create_packet(a4)
|
|
udp_header = UDP1553Header()
|
|
udp_header.mcounter = _mcounter
|
|
_mcounter += 1
|
|
final_pkt = _struct_to_bytes(udp_header) + pkt + ctypes.string_at(ctypes.addressof(ctypes.c_uint16(UDP1553Header.MARKER_END_1553)), ctypes.sizeof(ctypes.c_uint16))
|
|
sock.sendto(final_pkt, (target_ip, int(target_port)))
|
|
sent_packets.append(final_pkt)
|
|
time.sleep(delay)
|
|
|
|
# A5
|
|
a5 = MsgA5()
|
|
# default timetag = 0 as in legacy script
|
|
pkt = _create_packet(a5)
|
|
udp_header = UDP1553Header()
|
|
udp_header.mcounter = _mcounter
|
|
_mcounter += 1
|
|
final_pkt = _struct_to_bytes(udp_header) + pkt + ctypes.string_at(ctypes.addressof(ctypes.c_uint16(UDP1553Header.MARKER_END_1553)), ctypes.sizeof(ctypes.c_uint16))
|
|
sock.sendto(final_pkt, (target_ip, int(target_port)))
|
|
sent_packets.append(final_pkt)
|
|
time.sleep(delay)
|
|
|
|
finally:
|
|
sock.close()
|
|
|
|
return sent_packets
|