SXXXXXXX_PyBusMonitor1553/tests/test_udp_handler.py

37 lines
849 B
Python

import socket
import time
from pybusmonitor1553.core.network import UdpHandler
def test_udp_handler_receives_packet():
received = {}
def cb(data, addr):
received['data'] = data
received['addr'] = addr
handler = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
handler.register_callback(cb)
handler.start()
try:
# Get dynamically assigned port
bound_port = handler._sock.getsockname()[1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.sendto(b'ping', ('127.0.0.1', bound_port))
timeout = time.time() + 2.0
while time.time() < timeout and 'data' not in received:
time.sleep(0.01)
assert received.get('data') == b'ping'
finally:
handler.stop()
try:
s.close()
except Exception:
pass