S1005403_RisCC/tools/udp_test_send2.py

61 lines
1.7 KiB
Python

import threading
import socket
import time
import sys
from target_simulator.core.sfp_transport import SfpTransport
LISTEN_IP = '127.0.0.1'
LISTEN_PORT = 60110
# Use port 0 to let the OS pick a free ephemeral port to avoid bind conflicts during tests
CLIENT_BIND_PORT = 0
received = []
def listener(listen_ip, listen_port):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.bind((listen_ip, listen_port))
except Exception as e:
print(f"Listener bind failed for {listen_ip}:{listen_port}: {e}")
return
print(f"Listener bound on {listen_ip}:{listen_port}")
s.settimeout(5.0)
try:
data, addr = s.recvfrom(65535)
print(f"Listener received {len(data)} bytes from {addr}")
print(data[:256])
received.append((data, addr))
except socket.timeout:
print("Listener timed out waiting for data")
finally:
s.close()
if __name__ == '__main__':
lt = threading.Thread(target=listener, args=(LISTEN_IP, LISTEN_PORT), daemon=True)
lt.start()
time.sleep(0.1)
transport = SfpTransport(host=LISTEN_IP, port=CLIENT_BIND_PORT, payload_handlers={}, raw_packet_callback=None)
ok = transport.start()
print(f"Transport started: {ok}")
time.sleep(0.05)
cmd = 'tgtreset -1'
print(f"Sending command: {cmd} to {(LISTEN_IP, LISTEN_PORT)}")
sent = transport.send_script_command(cmd, (LISTEN_IP, LISTEN_PORT))
print(f"send_script_command returned: {sent}")
lt.join(timeout=6.0)
transport.shutdown()
if received:
print('SUCCESS: packet received by listener')
data, addr = received[0]
print('raw bytes (first 80):', data[:80])
sys.exit(0)
else:
print('FAIL: no packet received')
sys.exit(2)