SXXXXXXX_PyBusMonitor1553/tests/test_dispatcher_errors.py

108 lines
3.3 KiB
Python

import ctypes
import socket
from pybusmonitor1553.core.dispatcher import MessageDispatcher
from pybusmonitor1553.lib1553.headers import UDP1553Header, UDP1553MessageHeader, CommandWordUnion
from pybusmonitor1553.lib1553.constants import Subaddress
def _make_packet_with_cw(sub_addr, transmit=False, data_len=64, udp_marker=None):
cw = CommandWordUnion(rt_addr=0, sub_addr=sub_addr, word_count=0, transmit=transmit)
msg_hdr = UDP1553MessageHeader(command_word_union=cw)
udp_hdr = UDP1553Header()
if udp_marker is not None:
udp_hdr.marker1553 = udp_marker
header_bytes = ctypes.string_at(ctypes.addressof(udp_hdr), ctypes.sizeof(udp_hdr))
msg_hdr_bytes = ctypes.string_at(ctypes.addressof(msg_hdr), ctypes.sizeof(msg_hdr))
data_bytes = b"\x00" * data_len
return header_bytes + msg_hdr_bytes + data_bytes
def test_invalid_magic_number_returns_none():
disp = MessageDispatcher()
# Provide wrong marker
raw = _make_packet_with_cw(int(Subaddress.RX_SETTINGS), transmit=False, udp_marker=0xFFFF)
h, msg = disp.parse_packet(raw)
assert h is None and msg is None
def test_unknown_subaddress_returns_header_and_none():
disp = MessageDispatcher()
# Use a subaddress not registered (0 is not in registry)
raw = _make_packet_with_cw(0, transmit=False)
h, msg = disp.parse_packet(raw)
assert h is not None and msg is None
def test_message_class_instantiation_exception_is_handled(monkeypatch):
disp = MessageDispatcher()
# Create a faulty class that raises on construction
class Faulty:
SUBADDRESS = int(Subaddress.RX_SETTINGS)
IS_TRANSMIT = False
def __init__(self, data):
raise RuntimeError("boom")
key = (int(Subaddress.RX_SETTINGS), False)
# Replace registry entry with faulty
disp._registry[key] = Faulty
raw = _make_packet_with_cw(int(Subaddress.RX_SETTINGS), transmit=False)
h, msg = disp.parse_packet(raw)
assert h is not None and msg is None
def test_udp_start_handles_socket_error(monkeypatch):
# Monkeypatch socket.socket to raise on bind
import socket as _socket
class FakeSock:
def __init__(self, *a, **k):
pass
def setsockopt(self, *a, **k):
pass
def bind(self, addr):
raise OSError("bind failed")
monkeypatch.setattr(_socket, 'socket', lambda *a, **k: FakeSock())
from pybusmonitor1553.core.network import UdpHandler
h = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
# Should not raise
h.start()
assert not h._running
def test_udp_send_handles_send_error():
from pybusmonitor1553.core.network import UdpHandler
h = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
class BadSock:
def sendto(self, data, addr):
raise RuntimeError("send error")
h._sock = BadSock()
# Should not raise
h.send(b'data')
def test_receive_loop_handles_exceptions_and_exits():
from pybusmonitor1553.core.network import UdpHandler
h = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
class ExplodingSock:
def recvfrom(self, n):
# set running False as side-effect then raise
h._running = False
raise RuntimeError("recv boom")
h._sock = ExplodingSock()
h._running = True
# Run the loop; it should catch the exception and then exit
h._receive_loop()
assert not h._running