diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..f901f48 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,4 @@ +[run] +omit = + pybusmonitor1553/__main__.py + pybusmonitor1553/utils/printer.py diff --git a/pybusmonitor1553/__main__.py b/pybusmonitor1553/__main__.py index 78b3f0d..0ffb903 100644 --- a/pybusmonitor1553/__main__.py +++ b/pybusmonitor1553/__main__.py @@ -1,15 +1,54 @@ +import time +import sys from .core.network import UdpHandler from .core.dispatcher import MessageDispatcher +from .utils.printer import dump_message -def on_packet_received(data, addr): - header, msg = dispatcher.parse_packet(data) - if msg: - print(f"Received: {msg}") - # Esempio di accesso ai dati - if hasattr(msg, 'target_history'): - print(f" -> Target History: {msg.target_history}") +# Configuration +RX_IP = "127.0.0.1" +RX_PORT = 61553 # Listening port for the Monitor -dispatcher = MessageDispatcher() -net = UdpHandler() -net.register_callback(on_packet_received) -net.start() \ No newline at end of file +def main(): + print("--------------------------------------------------") + print(" PyBusMonitor1553 - Console Mode") + print("--------------------------------------------------") + + # 1. Initialize Components + dispatcher = MessageDispatcher() + network = UdpHandler(rx_ip=RX_IP, rx_port=RX_PORT) + + # 2. Define the callback + def on_packet(data, addr): + # Header parsing and Message Dispatching + header, msg = dispatcher.parse_packet(data) + + if msg: + # If successfully decoded, print details + print(f"[RX] From {addr[0]}:{addr[1]} | Type: {msg.__class__.__name__}") + dump_message(msg) + elif header: + # Valid UDP header but unknown 1553 message + print(f"[RX] From {addr[0]}:{addr[1]} | Unknown/Invalid 1553 Message") + else: + # Garbage data + print(f"[RX] From {addr[0]}:{addr[1]} | Invalid Packet Structure") + + # 3. Register callback and start + network.register_callback(on_packet) + network.start() + + print(f"Monitoring started on {RX_IP}:{RX_PORT}") + print("Press Ctrl+C to stop.") + + # 4. Main Loop + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nStopping...") + finally: + network.stop() + sys.exit(0) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/pybusmonitor1553/tools/simulate_traffic.py b/pybusmonitor1553/tools/simulate_traffic.py new file mode 100644 index 0000000..be34715 --- /dev/null +++ b/pybusmonitor1553/tools/simulate_traffic.py @@ -0,0 +1,88 @@ +import socket +import struct +import time +import sys +import os + +# Add parent directory to path to import lib1553 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from lib1553.headers import UDP1553Header, UDP1553MessageHeader, CommandWordUnion +from lib1553.messages.msg_a1 import MsgA1 +from lib1553.messages.msg_b5 import MsgB5 +from lib1553.constants import TargetHistory, AltitudeBlock + +TARGET_IP = "127.0.0.1" +TARGET_PORT = 61553 # Must match __main__.py RX_PORT + +def create_packet(msg_obj): + """Wraps a 1553 message object into the proprietary UDP packet structure.""" + + # 1. Create UDP Header + udp_header = UDP1553Header() + # (Other fields default to 0/correct constants) + + # 2. Create 1553 Message Header (Command Word) + # Note: We simulate the Command Word based on the Message properties + # RT Address is arbitrary here (e.g. 1) + # Word Count: In 1553, 32 words is represented as 0. + wc_val = 32 if len(msg_obj.data) == 32 else len(msg_obj.data) + if wc_val == 32: wc_val = 0 + + cw = CommandWordUnion( + rt_addr=1, + sub_addr=msg_obj.SUBADDRESS, + word_count=wc_val, + transmit=msg_obj.IS_TRANSMIT + ) + + msg_header = UDP1553MessageHeader(command_word_union=cw) + + # 3. Serialize Data + payload = msg_obj.pack() + + # 4. Combine + return bytes(udp_header) + bytes(msg_header) + payload + +def main(): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + print(f"Simulating Traffic -> {TARGET_IP}:{TARGET_PORT}") + + try: + while True: + # --- Send Message A1 (Settings) --- + a1 = MsgA1() + # Populate some fields to verify decoding + a1.target_history = TargetHistory.LEVEL_4 + a1.symbol_intensity = 99 + a1.beacon_delay = 12.5 # microseconds + a1.altitude_block = AltitudeBlock.TPBK + + data_a1 = create_packet(a1) + sock.sendto(data_a1, (TARGET_IP, TARGET_PORT)) + print("Sent Msg A1") + + time.sleep(0.5) + + # --- Send Message B5 (Tracked Target) --- + b5 = MsgB5() + b5.target_range = 15000 # ft + b5.target_cas = 350.5 # knots + b5.norm_pos_x = 0.8 # Normalized + b5.vel_x = 450.0 # ft/s + b5.ant_elevation = 0.12 # Semicircles + + data_b5 = create_packet(b5) + sock.sendto(data_b5, (TARGET_IP, TARGET_PORT)) + print("Sent Msg B5") + + time.sleep(1.5) + + except KeyboardInterrupt: + print("Simulation stopped.") + finally: + sock.close() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/pybusmonitor1553/utils/__init__.py b/pybusmonitor1553/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pybusmonitor1553/utils/printer.py b/pybusmonitor1553/utils/printer.py new file mode 100644 index 0000000..bae077b --- /dev/null +++ b/pybusmonitor1553/utils/printer.py @@ -0,0 +1,42 @@ +import inspect +from ..lib1553.fields import Field + +def dump_message(msg_obj): + """ + Introspects a Message object and prints all its decoded fields. + """ + cls = msg_obj.__class__ + print(f"\n{'='*60}") + print(f"MESSAGE: {cls.__name__} (SA: {cls.SUBADDRESS})") + print(f"{'-'*60}") + + # 1. Get all attributes that are instances of our Field descriptors + fields = [] + for name, obj in cls.__dict__.items(): + if isinstance(obj, Field): + fields.append((name, obj)) + + # 2. Sort fields by word_index then start_bit to keep print order logical + # (Word 0 Bit 0 comes before Word 0 Bit 5, which comes before Word 1) + fields.sort(key=lambda x: (x[1].word_index, x[1].start_bit)) + + # 3. Print values + for name, field_desc in fields: + try: + # This triggers the __get__ method of the descriptor + value = getattr(msg_obj, name) + + # Format value based on type + if isinstance(value, float): + val_str = f"{value:.4f}" + elif hasattr(value, 'name'): # Enum + val_str = f"{value.name} ({value.value})" + else: + val_str = str(value) + + print(f"Word {field_desc.word_index:02d} | {name:<25}: {val_str}") + + except Exception as e: + print(f"Word {field_desc.word_index:02d} | {name:<25}: ") + + print(f"{'='*60}\n") \ No newline at end of file diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..670ac4d --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +addopts = -q --cov=pybusmonitor1553 --cov-config=.coveragerc --cov-report=term-missing --cov-report=xml:coverage.xml --cov-report=html:htmlcov +testpaths = tests \ No newline at end of file diff --git a/tests/test_dispatcher_errors.py b/tests/test_dispatcher_errors.py new file mode 100644 index 0000000..f0433b5 --- /dev/null +++ b/tests/test_dispatcher_errors.py @@ -0,0 +1,107 @@ +import ctypes +import socket + +from pybusmonitor1553.core.dispatcher import MessageDispatcher +from pybusmonitor1553.lib1553.headers import UDP1553Header, UDP1553MessageHeader, CommandWordUnion +from pybusmonitor1553.lib1553.constants import Subaddress + + +def _make_packet_with_cw(sub_addr, transmit=False, data_len=64, udp_marker=None): + cw = CommandWordUnion(rt_addr=0, sub_addr=sub_addr, word_count=0, transmit=transmit) + msg_hdr = UDP1553MessageHeader(command_word_union=cw) + udp_hdr = UDP1553Header() + if udp_marker is not None: + udp_hdr.marker1553 = udp_marker + + header_bytes = ctypes.string_at(ctypes.addressof(udp_hdr), ctypes.sizeof(udp_hdr)) + msg_hdr_bytes = ctypes.string_at(ctypes.addressof(msg_hdr), ctypes.sizeof(msg_hdr)) + data_bytes = b"\x00" * data_len + return header_bytes + msg_hdr_bytes + data_bytes + + +def test_invalid_magic_number_returns_none(): + disp = MessageDispatcher() + # Provide wrong marker + raw = _make_packet_with_cw(int(Subaddress.RX_SETTINGS), transmit=False, udp_marker=0xFFFF) + h, msg = disp.parse_packet(raw) + assert h is None and msg is None + + +def test_unknown_subaddress_returns_header_and_none(): + disp = MessageDispatcher() + # Use a subaddress not registered (0 is not in registry) + raw = _make_packet_with_cw(0, transmit=False) + h, msg = disp.parse_packet(raw) + assert h is not None and msg is None + + +def test_message_class_instantiation_exception_is_handled(monkeypatch): + disp = MessageDispatcher() + + # Create a faulty class that raises on construction + class Faulty: + SUBADDRESS = int(Subaddress.RX_SETTINGS) + IS_TRANSMIT = False + def __init__(self, data): + raise RuntimeError("boom") + + key = (int(Subaddress.RX_SETTINGS), False) + # Replace registry entry with faulty + disp._registry[key] = Faulty + + raw = _make_packet_with_cw(int(Subaddress.RX_SETTINGS), transmit=False) + h, msg = disp.parse_packet(raw) + assert h is not None and msg is None + + +def test_udp_start_handles_socket_error(monkeypatch): + # Monkeypatch socket.socket to raise on bind + import socket as _socket + + class FakeSock: + def __init__(self, *a, **k): + pass + def setsockopt(self, *a, **k): + pass + def bind(self, addr): + raise OSError("bind failed") + + monkeypatch.setattr(_socket, 'socket', lambda *a, **k: FakeSock()) + + from pybusmonitor1553.core.network import UdpHandler + h = UdpHandler(rx_ip='127.0.0.1', rx_port=0) + # Should not raise + h.start() + assert not h._running + + +def test_udp_send_handles_send_error(): + from pybusmonitor1553.core.network import UdpHandler + + h = UdpHandler(rx_ip='127.0.0.1', rx_port=0) + + class BadSock: + def sendto(self, data, addr): + raise RuntimeError("send error") + + h._sock = BadSock() + # Should not raise + h.send(b'data') + + +def test_receive_loop_handles_exceptions_and_exits(): + from pybusmonitor1553.core.network import UdpHandler + + h = UdpHandler(rx_ip='127.0.0.1', rx_port=0) + + class ExplodingSock: + def recvfrom(self, n): + # set running False as side-effect then raise + h._running = False + raise RuntimeError("recv boom") + + h._sock = ExplodingSock() + h._running = True + # Run the loop; it should catch the exception and then exit + h._receive_loop() + assert not h._running diff --git a/tests/test_fields.py b/tests/test_fields.py new file mode 100644 index 0000000..3e1729a --- /dev/null +++ b/tests/test_fields.py @@ -0,0 +1,65 @@ +import pytest +from enum import Enum + +from pybusmonitor1553.lib1553.message_base import MessageBase +from pybusmonitor1553.lib1553.fields import BitField, EnumField, ScaledField, ASCIIField + + +class DummyMsg(MessageBase): + pass + + +def test_bitfield_get_set(): + class M(DummyMsg): + bf = BitField(word_index=0, start_bit=0, width=3) + + m = M() + # initial 0 + assert m.bf == 0 + m.bf = 5 + assert m.bf == 5 + # underlying word contains shifted value + assert m.data[0] != 0 + + +def test_enumfield_with_enum_and_raw(): + class E(Enum): + A = 0 + B = 1 + + class M(DummyMsg): + ef = EnumField(word_index=1, start_bit=0, enum_cls=E, width=1) + + m = M() + # set with enum + m.ef = E.B + assert m.ef == E.B + # set with raw int + m.ef = 0 + assert m.ef == E.A + + +def test_scaledfield_signed_unsigned(): + class M(DummyMsg): + s1 = ScaledField(word_index=2, start_bit=0, width=16, lsb_value=0.5, signed=False) + s2 = ScaledField(word_index=3, start_bit=0, width=8, lsb_value=1.0, signed=True) + + m = M() + m.s1 = 10.0 + assert pytest.approx(m.s1, rel=1e-6) == 10.0 + + # signed field: set negative + m.s2 = -3.0 + assert pytest.approx(m.s2, rel=1e-6) == -3.0 + + +def test_asciifield_get_set_and_validation(): + class M(DummyMsg): + a = ASCIIField(word_index=4) + + m = M() + m.a = 'OK' + assert m.a == 'OK' + + with pytest.raises(ValueError): + m.a = 'TOO_LONG' diff --git a/tests/test_messages_more.py b/tests/test_messages_more.py new file mode 100644 index 0000000..8eea835 --- /dev/null +++ b/tests/test_messages_more.py @@ -0,0 +1,52 @@ +import ctypes +import pytest + +from pybusmonitor1553.lib1553.messages.msg_a4 import MsgA4 +from pybusmonitor1553.lib1553.messages.msg_b6 import MsgB6 +from pybusmonitor1553.lib1553.messages.msg_b7 import MsgB7 +from pybusmonitor1553.lib1553.constants import CursorMode, DTTEnable, SAREnable + + +def test_msg_a4_cursor_and_sar_enable(): + m = MsgA4() + + # Default enums + assert m.cursor_mode is not None + + # Set cursor mode to SLAVE (value 1) and verify underlying storage + m.cursor_mode = CursorMode.SLAVE + assert m.cursor_mode == CursorMode.SLAVE + # Cursor mode sits in word 0; ensure data changed + assert m.data[0] != 0 + + # SAR enable test + m.sar_enable = SAREnable.SAR_ENABLED + assert m.sar_enable == SAREnable.SAR_ENABLED + + +def test_msg_b6_and_b7_basic_fields(): + b6 = MsgB6() + b7 = MsgB7() + + # set some values and read back + b6.target_history_tb = 2 + assert int(b6.target_history_tb) == 2 + + b7.master_mode_tb = 3 + assert int(b7.master_mode_tb) == 3 + + +def test_ppos_lat_positive_value(): + m = MsgA4() + + # construct a small positive 25-bit raw value + raw_25 = 123456 + msw = (raw_25 >> 9) & 0xFFFF + lsw_fragment = raw_25 & 0x1FF + lsw_word = (lsw_fragment << 7) & 0xFFFF + + m._data[23] = msw + m._data[24] = lsw_word + + expected = float(raw_25 * 5.9604644775e-8) + assert pytest.approx(m.ppos_lat, rel=1e-9) == expected diff --git a/tests/test_more_coverage.py b/tests/test_more_coverage.py new file mode 100644 index 0000000..2587950 --- /dev/null +++ b/tests/test_more_coverage.py @@ -0,0 +1,107 @@ +import struct +import pytest + +from enum import Enum + +from pybusmonitor1553.lib1553.fields import Field, BitField, EnumField, ScaledField, ASCIIField +from pybusmonitor1553.lib1553.message_base import MessageBase +from pybusmonitor1553.lib1553.messages.msg_a3 import MsgA3 +from pybusmonitor1553.lib1553.messages.msg_a5 import MsgA5 + + +def test_descriptor_returns_self_on_class_access(): + class M(MessageBase): + bf = BitField(word_index=0, start_bit=0, width=1) + + # Accessing via class should return the descriptor itself + assert isinstance(M.bf, BitField) + + +def test_enumfield_auto_width_and_fallback(): + class E(Enum): + A = 0 + B = 1 + C = 3 + + class M(MessageBase): + ef = EnumField(word_index=0, start_bit=0, enum_cls=E) # width auto + + # Width auto-calculated to fit max value (3 -> 2 bits) + assert M.ef.width == 2 + + m = M() + # Set raw value directly to an undefined value (e.g., 2) + m.data[0] = 2 << (16 - (0 + M.ef.width)) + # Should return raw int because 2 not in Enum + assert m.ef == 2 + + +def test_asciifield_nonprintable_and_validation(): + class M(MessageBase): + a = ASCIIField(word_index=0) + + m = M() + # write a raw word with non-printable bytes 0x00 0x01 + m._data[0] = (0 << 8) | 1 + assert m.a == '??' + + with pytest.raises(ValueError): + m.a = 'X' # too short + + +def test_message_base_pack_unpack_and_clamp(): + class M(MessageBase): + pass + + m = M() + # set some sample words + for i in range(32): + m._data[i] = i + + packed = m.pack() + assert len(packed) == 64 + + # create raw with more than 32 words (e.g., 40 words) + extra = struct.pack('>40H', *list(range(40))) + m2 = M() + m2.unpack(extra) + # only first 32 should be loaded + assert m2._data[31] == 31 + + +def test_msg_a3_25bit_signed_and_ascii(): + m = MsgA3() + + # positive raw + raw_25 = 0x00ABCDE + msw = (raw_25 >> 9) & 0xFFFF + lsw_fragment = raw_25 & 0x1FF + m._data[2] = msw + m._data[3] = (lsw_fragment << 7) + assert isinstance(m.wp1_latitude, float) + + # negative value: set sign bit of 25-bit value + neg_raw = (1 << 24) | 12345 + msw = (neg_raw >> 9) & 0xFFFF + lsw_fragment = neg_raw & 0x1FF + m._data[4] = msw + m._data[5] = (lsw_fragment << 7) + val = m.wp1_longitude + assert isinstance(val, float) + + # ASCII fields + m.hpt_callsign_ab = 'AA' + assert m.hpt_callsign_ab == 'AA' + + +def test_msg_a5_32bit_signed_scaled_velocity(): + m = MsgA5() + # build a negative 32-bit raw value + raw32 = (1 << 31) | 12345678 # sign bit set + msw = (raw32 >> 16) & 0xFFFF + lsw = raw32 & 0xFFFF + m._data[2] = msw + m._data[3] = lsw + + expected = float((raw32 - (1 << 32)) * 3.81470e-6) + assert pytest.approx(m.velocity_x, rel=1e-9) == expected diff --git a/tests/test_udp_handler.py b/tests/test_udp_handler.py new file mode 100644 index 0000000..0c3a45f --- /dev/null +++ b/tests/test_udp_handler.py @@ -0,0 +1,36 @@ +import socket +import time + +from pybusmonitor1553.core.network import UdpHandler + + +def test_udp_handler_receives_packet(): + received = {} + + def cb(data, addr): + received['data'] = data + received['addr'] = addr + + handler = UdpHandler(rx_ip='127.0.0.1', rx_port=0) + handler.register_callback(cb) + handler.start() + + try: + # Get dynamically assigned port + bound_port = handler._sock.getsockname()[1] + + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.sendto(b'ping', ('127.0.0.1', bound_port)) + + timeout = time.time() + 2.0 + while time.time() < timeout and 'data' not in received: + time.sleep(0.01) + + assert received.get('data') == b'ping' + + finally: + handler.stop() + try: + s.close() + except Exception: + pass