S1005403_RisCC/tools/udp_test_send.py

69 lines
1.7 KiB
Python

import threading
import socket
import time
import sys
import os
# Ensure project root on PYTHONPATH when running from terminal if needed
# This script assumes you run it with PYTHONPATH set to the project root.
from target_simulator.core.sfp_transport import SfpTransport
LISTEN_IP = "127.0.0.1"
LISTEN_PORT = 60001
CLIENT_BIND_PORT = 60002
received = []
def listener():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((LISTEN_IP, LISTEN_PORT))
print(f"Listener bound on {LISTEN_IP}:{LISTEN_PORT}")
s.settimeout(5.0)
try:
data, addr = s.recvfrom(65535)
print(f"Listener received {len(data)} bytes from {addr}")
print(data[:200])
received.append((data, addr))
except socket.timeout:
print("Listener timed out waiting for data")
finally:
s.close()
# Start listener thread
lt = threading.Thread(target=listener, daemon=True)
lt.start()
# Give listener a moment
time.sleep(0.2)
# Create transport bound to CLIENT_BIND_PORT and send a command
transport = SfpTransport(
host=LISTEN_IP, port=CLIENT_BIND_PORT, payload_handlers={}, raw_packet_callback=None
)
if not transport.start():
print("Failed to start SfpTransport (bind failed?)")
sys.exit(1)
cmd = "tgtreset -1"
print(f"Sending command: {cmd}")
sent = transport.send_script_command(cmd, (LISTEN_IP, LISTEN_PORT))
print(f"send_script_command returned: {sent}")
# Wait for listener
lt.join(timeout=6.0)
transport.shutdown()
if received:
print("SUCCESS: packet received by listener")
data, addr = received[0]
# Try to display a small header slice
print("raw bytes (first 80):", data[:80])
sys.exit(0)
else:
print("FAIL: no packet received")
sys.exit(2)