SXXXXXXX_PyBusMonitor1553/tools/udp_diagnostic.py
2025-12-11 13:38:49 +01:00

215 lines
6.5 KiB
Python

"""
Diagnostic script to test UDP communication with the Radar server.
This script:
1. Sends packets in both formats (full UDP1553 and simple)
2. Listens for any response
3. Helps diagnose communication issues
"""
import socket
import time
import binascii
import struct
import sys
import os
# Add parent to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pybusmonitor1553.lib1553.messages import MsgA1, MsgA2, MsgA4, MsgA5
from pybusmonitor1553.core.packet_builder import PacketBuilder
from pybusmonitor1553.core.packet_builder_simple import SimplePacketBuilder
from pybusmonitor1553.lib1553.constants import TargetHistory, RadarMode
# Configuration
SERVER_IP = os.getenv("SERVER_IP", "127.0.0.1")
SERVER_PORT = int(os.getenv("SERVER_PORT", "51553"))
LOCAL_PORT = int(os.getenv("LOCAL_PORT", "61553"))
def hex_dump(data: bytes, prefix: str = "") -> str:
"""Create a hex dump."""
hex_str = binascii.hexlify(data).decode('ascii')
formatted = ' '.join(hex_str[i:i+4] for i in range(0, len(hex_str), 4))
return f"{prefix}[{len(data)} bytes] {formatted}"
def test_full_format():
"""Test with full UDP1553 format."""
print("\n" + "="*60)
print("TEST 1: Full UDP1553 Format (current implementation)")
print("="*60)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("0.0.0.0", LOCAL_PORT))
sock.settimeout(2.0)
builder = PacketBuilder()
# Prepare messages
a1 = MsgA1()
a1.target_history = TargetHistory.LEVEL_4
a1.symbol_intensity = 127
a2 = MsgA2()
a2.master_mode = RadarMode.RWS
print(f"\nSending to {SERVER_IP}:{SERVER_PORT}, listening on port {LOCAL_PORT}")
try:
for i in range(3):
# Send A1
pkt = builder.build_packet(a1, is_request=False)
sock.sendto(pkt, (SERVER_IP, SERVER_PORT))
print(f"\n[TX {i+1}] MsgA1:")
print(hex_dump(pkt[:40], " First 40B: "))
# Send A2
pkt = builder.build_packet(a2, is_request=False)
sock.sendto(pkt, (SERVER_IP, SERVER_PORT))
print(f"[TX {i+1}] MsgA2:")
print(hex_dump(pkt[:40], " First 40B: "))
# Try to receive
try:
data, addr = sock.recvfrom(4096)
print(f"\n[RX] Got {len(data)} bytes from {addr}!")
print(hex_dump(data, " "))
except socket.timeout:
print(f"[RX] No response (timeout)")
time.sleep(0.5)
finally:
sock.close()
def test_simple_format():
"""Test with simple format from qg1553overudp.cpp."""
print("\n" + "="*60)
print("TEST 2: Simple Format (qg1553overudp.cpp style)")
print("="*60)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("0.0.0.0", LOCAL_PORT + 1)) # Use different port
sock.settimeout(2.0)
builder = SimplePacketBuilder()
# Prepare messages
a1 = MsgA1()
a1.target_history = TargetHistory.LEVEL_4
a1.symbol_intensity = 127
a2 = MsgA2()
a2.master_mode = RadarMode.RWS
print(f"\nSending to {SERVER_IP}:{SERVER_PORT}, listening on port {LOCAL_PORT + 1}")
try:
for i in range(3):
# Send frame with A1 and A2
pkt = builder.build_frame([a1, a2])
sock.sendto(pkt, (SERVER_IP, SERVER_PORT))
print(f"\n[TX {i+1}] Frame with A1+A2:")
print(hex_dump(pkt[:80], " First 80B: "))
# Try to receive
try:
data, addr = sock.recvfrom(4096)
print(f"\n[RX] Got {len(data)} bytes from {addr}!")
print(hex_dump(data, " "))
except socket.timeout:
print(f"[RX] No response (timeout)")
time.sleep(0.5)
finally:
sock.close()
def test_broadcast():
"""Test sending to broadcast address."""
print("\n" + "="*60)
print("TEST 3: Broadcast (in case server expects broadcast)")
print("="*60)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(("0.0.0.0", LOCAL_PORT + 2))
sock.settimeout(2.0)
builder = PacketBuilder()
a1 = MsgA1()
a1.target_history = TargetHistory.LEVEL_4
# Try broadcast
broadcast_ip = SERVER_IP.rsplit('.', 1)[0] + ".255"
print(f"\nSending to broadcast {broadcast_ip}:{SERVER_PORT}")
try:
pkt = builder.build_packet(a1, is_request=False)
sock.sendto(pkt, (broadcast_ip, SERVER_PORT))
print(f"[TX] MsgA1 to broadcast")
print(hex_dump(pkt[:40], " First 40B: "))
try:
data, addr = sock.recvfrom(4096)
print(f"\n[RX] Got {len(data)} bytes from {addr}!")
print(hex_dump(data, " "))
except socket.timeout:
print(f"[RX] No response (timeout)")
except Exception as e:
print(f"[ERROR] {e}")
finally:
sock.close()
def passive_listen():
"""Just listen for any traffic on the BC port."""
print("\n" + "="*60)
print("TEST 4: Passive Listen (check if server sends anything)")
print("="*60)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("0.0.0.0", LOCAL_PORT))
sock.settimeout(5.0)
print(f"\nListening on port {LOCAL_PORT} for 5 seconds...")
try:
while True:
try:
data, addr = sock.recvfrom(4096)
print(f"\n[RX] Got {len(data)} bytes from {addr}!")
print(hex_dump(data, " "))
except socket.timeout:
print("[RX] No traffic received in 5 seconds")
break
finally:
sock.close()
if __name__ == "__main__":
print("="*60)
print("UDP1553 Communication Diagnostic")
print("="*60)
print(f"Server: {SERVER_IP}:{SERVER_PORT}")
print(f"Local port: {LOCAL_PORT}")
# Run tests
test_full_format()
time.sleep(1)
test_simple_format()
time.sleep(1)
# test_broadcast()
# time.sleep(1)
# passive_listen()
print("\n" + "="*60)
print("Diagnostic complete!")
print("="*60)