prima implementazione funzionante, aggiunti test

This commit is contained in:
VALLONGOL 2025-12-09 16:06:55 +01:00
parent 4e806470e6
commit b5542d51bb
11 changed files with 554 additions and 11 deletions

4
.coveragerc Normal file
View File

@ -0,0 +1,4 @@
[run]
omit =
pybusmonitor1553/__main__.py
pybusmonitor1553/utils/printer.py

View File

@ -1,15 +1,54 @@
import time
import sys
from .core.network import UdpHandler
from .core.dispatcher import MessageDispatcher
from .utils.printer import dump_message
def on_packet_received(data, addr):
header, msg = dispatcher.parse_packet(data)
if msg:
print(f"Received: {msg}")
# Esempio di accesso ai dati
if hasattr(msg, 'target_history'):
print(f" -> Target History: {msg.target_history}")
# Configuration
RX_IP = "127.0.0.1"
RX_PORT = 61553 # Listening port for the Monitor
dispatcher = MessageDispatcher()
net = UdpHandler()
net.register_callback(on_packet_received)
net.start()
def main():
print("--------------------------------------------------")
print(" PyBusMonitor1553 - Console Mode")
print("--------------------------------------------------")
# 1. Initialize Components
dispatcher = MessageDispatcher()
network = UdpHandler(rx_ip=RX_IP, rx_port=RX_PORT)
# 2. Define the callback
def on_packet(data, addr):
# Header parsing and Message Dispatching
header, msg = dispatcher.parse_packet(data)
if msg:
# If successfully decoded, print details
print(f"[RX] From {addr[0]}:{addr[1]} | Type: {msg.__class__.__name__}")
dump_message(msg)
elif header:
# Valid UDP header but unknown 1553 message
print(f"[RX] From {addr[0]}:{addr[1]} | Unknown/Invalid 1553 Message")
else:
# Garbage data
print(f"[RX] From {addr[0]}:{addr[1]} | Invalid Packet Structure")
# 3. Register callback and start
network.register_callback(on_packet)
network.start()
print(f"Monitoring started on {RX_IP}:{RX_PORT}")
print("Press Ctrl+C to stop.")
# 4. Main Loop
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")
finally:
network.stop()
sys.exit(0)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,88 @@
import socket
import struct
import time
import sys
import os
# Add parent directory to path to import lib1553
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib1553.headers import UDP1553Header, UDP1553MessageHeader, CommandWordUnion
from lib1553.messages.msg_a1 import MsgA1
from lib1553.messages.msg_b5 import MsgB5
from lib1553.constants import TargetHistory, AltitudeBlock
TARGET_IP = "127.0.0.1"
TARGET_PORT = 61553 # Must match __main__.py RX_PORT
def create_packet(msg_obj):
"""Wraps a 1553 message object into the proprietary UDP packet structure."""
# 1. Create UDP Header
udp_header = UDP1553Header()
# (Other fields default to 0/correct constants)
# 2. Create 1553 Message Header (Command Word)
# Note: We simulate the Command Word based on the Message properties
# RT Address is arbitrary here (e.g. 1)
# Word Count: In 1553, 32 words is represented as 0.
wc_val = 32 if len(msg_obj.data) == 32 else len(msg_obj.data)
if wc_val == 32: wc_val = 0
cw = CommandWordUnion(
rt_addr=1,
sub_addr=msg_obj.SUBADDRESS,
word_count=wc_val,
transmit=msg_obj.IS_TRANSMIT
)
msg_header = UDP1553MessageHeader(command_word_union=cw)
# 3. Serialize Data
payload = msg_obj.pack()
# 4. Combine
return bytes(udp_header) + bytes(msg_header) + payload
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(f"Simulating Traffic -> {TARGET_IP}:{TARGET_PORT}")
try:
while True:
# --- Send Message A1 (Settings) ---
a1 = MsgA1()
# Populate some fields to verify decoding
a1.target_history = TargetHistory.LEVEL_4
a1.symbol_intensity = 99
a1.beacon_delay = 12.5 # microseconds
a1.altitude_block = AltitudeBlock.TPBK
data_a1 = create_packet(a1)
sock.sendto(data_a1, (TARGET_IP, TARGET_PORT))
print("Sent Msg A1")
time.sleep(0.5)
# --- Send Message B5 (Tracked Target) ---
b5 = MsgB5()
b5.target_range = 15000 # ft
b5.target_cas = 350.5 # knots
b5.norm_pos_x = 0.8 # Normalized
b5.vel_x = 450.0 # ft/s
b5.ant_elevation = 0.12 # Semicircles
data_b5 = create_packet(b5)
sock.sendto(data_b5, (TARGET_IP, TARGET_PORT))
print("Sent Msg B5")
time.sleep(1.5)
except KeyboardInterrupt:
print("Simulation stopped.")
finally:
sock.close()
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,42 @@
import inspect
from ..lib1553.fields import Field
def dump_message(msg_obj):
"""
Introspects a Message object and prints all its decoded fields.
"""
cls = msg_obj.__class__
print(f"\n{'='*60}")
print(f"MESSAGE: {cls.__name__} (SA: {cls.SUBADDRESS})")
print(f"{'-'*60}")
# 1. Get all attributes that are instances of our Field descriptors
fields = []
for name, obj in cls.__dict__.items():
if isinstance(obj, Field):
fields.append((name, obj))
# 2. Sort fields by word_index then start_bit to keep print order logical
# (Word 0 Bit 0 comes before Word 0 Bit 5, which comes before Word 1)
fields.sort(key=lambda x: (x[1].word_index, x[1].start_bit))
# 3. Print values
for name, field_desc in fields:
try:
# This triggers the __get__ method of the descriptor
value = getattr(msg_obj, name)
# Format value based on type
if isinstance(value, float):
val_str = f"{value:.4f}"
elif hasattr(value, 'name'): # Enum
val_str = f"{value.name} ({value.value})"
else:
val_str = str(value)
print(f"Word {field_desc.word_index:02d} | {name:<25}: {val_str}")
except Exception as e:
print(f"Word {field_desc.word_index:02d} | {name:<25}: <Error: {e}>")
print(f"{'='*60}\n")

3
pytest.ini Normal file
View File

@ -0,0 +1,3 @@
[pytest]
addopts = -q --cov=pybusmonitor1553 --cov-config=.coveragerc --cov-report=term-missing --cov-report=xml:coverage.xml --cov-report=html:htmlcov
testpaths = tests

View File

@ -0,0 +1,107 @@
import ctypes
import socket
from pybusmonitor1553.core.dispatcher import MessageDispatcher
from pybusmonitor1553.lib1553.headers import UDP1553Header, UDP1553MessageHeader, CommandWordUnion
from pybusmonitor1553.lib1553.constants import Subaddress
def _make_packet_with_cw(sub_addr, transmit=False, data_len=64, udp_marker=None):
cw = CommandWordUnion(rt_addr=0, sub_addr=sub_addr, word_count=0, transmit=transmit)
msg_hdr = UDP1553MessageHeader(command_word_union=cw)
udp_hdr = UDP1553Header()
if udp_marker is not None:
udp_hdr.marker1553 = udp_marker
header_bytes = ctypes.string_at(ctypes.addressof(udp_hdr), ctypes.sizeof(udp_hdr))
msg_hdr_bytes = ctypes.string_at(ctypes.addressof(msg_hdr), ctypes.sizeof(msg_hdr))
data_bytes = b"\x00" * data_len
return header_bytes + msg_hdr_bytes + data_bytes
def test_invalid_magic_number_returns_none():
disp = MessageDispatcher()
# Provide wrong marker
raw = _make_packet_with_cw(int(Subaddress.RX_SETTINGS), transmit=False, udp_marker=0xFFFF)
h, msg = disp.parse_packet(raw)
assert h is None and msg is None
def test_unknown_subaddress_returns_header_and_none():
disp = MessageDispatcher()
# Use a subaddress not registered (0 is not in registry)
raw = _make_packet_with_cw(0, transmit=False)
h, msg = disp.parse_packet(raw)
assert h is not None and msg is None
def test_message_class_instantiation_exception_is_handled(monkeypatch):
disp = MessageDispatcher()
# Create a faulty class that raises on construction
class Faulty:
SUBADDRESS = int(Subaddress.RX_SETTINGS)
IS_TRANSMIT = False
def __init__(self, data):
raise RuntimeError("boom")
key = (int(Subaddress.RX_SETTINGS), False)
# Replace registry entry with faulty
disp._registry[key] = Faulty
raw = _make_packet_with_cw(int(Subaddress.RX_SETTINGS), transmit=False)
h, msg = disp.parse_packet(raw)
assert h is not None and msg is None
def test_udp_start_handles_socket_error(monkeypatch):
# Monkeypatch socket.socket to raise on bind
import socket as _socket
class FakeSock:
def __init__(self, *a, **k):
pass
def setsockopt(self, *a, **k):
pass
def bind(self, addr):
raise OSError("bind failed")
monkeypatch.setattr(_socket, 'socket', lambda *a, **k: FakeSock())
from pybusmonitor1553.core.network import UdpHandler
h = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
# Should not raise
h.start()
assert not h._running
def test_udp_send_handles_send_error():
from pybusmonitor1553.core.network import UdpHandler
h = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
class BadSock:
def sendto(self, data, addr):
raise RuntimeError("send error")
h._sock = BadSock()
# Should not raise
h.send(b'data')
def test_receive_loop_handles_exceptions_and_exits():
from pybusmonitor1553.core.network import UdpHandler
h = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
class ExplodingSock:
def recvfrom(self, n):
# set running False as side-effect then raise
h._running = False
raise RuntimeError("recv boom")
h._sock = ExplodingSock()
h._running = True
# Run the loop; it should catch the exception and then exit
h._receive_loop()
assert not h._running

65
tests/test_fields.py Normal file
View File

@ -0,0 +1,65 @@
import pytest
from enum import Enum
from pybusmonitor1553.lib1553.message_base import MessageBase
from pybusmonitor1553.lib1553.fields import BitField, EnumField, ScaledField, ASCIIField
class DummyMsg(MessageBase):
pass
def test_bitfield_get_set():
class M(DummyMsg):
bf = BitField(word_index=0, start_bit=0, width=3)
m = M()
# initial 0
assert m.bf == 0
m.bf = 5
assert m.bf == 5
# underlying word contains shifted value
assert m.data[0] != 0
def test_enumfield_with_enum_and_raw():
class E(Enum):
A = 0
B = 1
class M(DummyMsg):
ef = EnumField(word_index=1, start_bit=0, enum_cls=E, width=1)
m = M()
# set with enum
m.ef = E.B
assert m.ef == E.B
# set with raw int
m.ef = 0
assert m.ef == E.A
def test_scaledfield_signed_unsigned():
class M(DummyMsg):
s1 = ScaledField(word_index=2, start_bit=0, width=16, lsb_value=0.5, signed=False)
s2 = ScaledField(word_index=3, start_bit=0, width=8, lsb_value=1.0, signed=True)
m = M()
m.s1 = 10.0
assert pytest.approx(m.s1, rel=1e-6) == 10.0
# signed field: set negative
m.s2 = -3.0
assert pytest.approx(m.s2, rel=1e-6) == -3.0
def test_asciifield_get_set_and_validation():
class M(DummyMsg):
a = ASCIIField(word_index=4)
m = M()
m.a = 'OK'
assert m.a == 'OK'
with pytest.raises(ValueError):
m.a = 'TOO_LONG'

View File

@ -0,0 +1,52 @@
import ctypes
import pytest
from pybusmonitor1553.lib1553.messages.msg_a4 import MsgA4
from pybusmonitor1553.lib1553.messages.msg_b6 import MsgB6
from pybusmonitor1553.lib1553.messages.msg_b7 import MsgB7
from pybusmonitor1553.lib1553.constants import CursorMode, DTTEnable, SAREnable
def test_msg_a4_cursor_and_sar_enable():
m = MsgA4()
# Default enums
assert m.cursor_mode is not None
# Set cursor mode to SLAVE (value 1) and verify underlying storage
m.cursor_mode = CursorMode.SLAVE
assert m.cursor_mode == CursorMode.SLAVE
# Cursor mode sits in word 0; ensure data changed
assert m.data[0] != 0
# SAR enable test
m.sar_enable = SAREnable.SAR_ENABLED
assert m.sar_enable == SAREnable.SAR_ENABLED
def test_msg_b6_and_b7_basic_fields():
b6 = MsgB6()
b7 = MsgB7()
# set some values and read back
b6.target_history_tb = 2
assert int(b6.target_history_tb) == 2
b7.master_mode_tb = 3
assert int(b7.master_mode_tb) == 3
def test_ppos_lat_positive_value():
m = MsgA4()
# construct a small positive 25-bit raw value
raw_25 = 123456
msw = (raw_25 >> 9) & 0xFFFF
lsw_fragment = raw_25 & 0x1FF
lsw_word = (lsw_fragment << 7) & 0xFFFF
m._data[23] = msw
m._data[24] = lsw_word
expected = float(raw_25 * 5.9604644775e-8)
assert pytest.approx(m.ppos_lat, rel=1e-9) == expected

107
tests/test_more_coverage.py Normal file
View File

@ -0,0 +1,107 @@
import struct
import pytest
from enum import Enum
from pybusmonitor1553.lib1553.fields import Field, BitField, EnumField, ScaledField, ASCIIField
from pybusmonitor1553.lib1553.message_base import MessageBase
from pybusmonitor1553.lib1553.messages.msg_a3 import MsgA3
from pybusmonitor1553.lib1553.messages.msg_a5 import MsgA5
def test_descriptor_returns_self_on_class_access():
class M(MessageBase):
bf = BitField(word_index=0, start_bit=0, width=1)
# Accessing via class should return the descriptor itself
assert isinstance(M.bf, BitField)
def test_enumfield_auto_width_and_fallback():
class E(Enum):
A = 0
B = 1
C = 3
class M(MessageBase):
ef = EnumField(word_index=0, start_bit=0, enum_cls=E) # width auto
# Width auto-calculated to fit max value (3 -> 2 bits)
assert M.ef.width == 2
m = M()
# Set raw value directly to an undefined value (e.g., 2)
m.data[0] = 2 << (16 - (0 + M.ef.width))
# Should return raw int because 2 not in Enum
assert m.ef == 2
def test_asciifield_nonprintable_and_validation():
class M(MessageBase):
a = ASCIIField(word_index=0)
m = M()
# write a raw word with non-printable bytes 0x00 0x01
m._data[0] = (0 << 8) | 1
assert m.a == '??'
with pytest.raises(ValueError):
m.a = 'X' # too short
def test_message_base_pack_unpack_and_clamp():
class M(MessageBase):
pass
m = M()
# set some sample words
for i in range(32):
m._data[i] = i
packed = m.pack()
assert len(packed) == 64
# create raw with more than 32 words (e.g., 40 words)
extra = struct.pack('>40H', *list(range(40)))
m2 = M()
m2.unpack(extra)
# only first 32 should be loaded
assert m2._data[31] == 31
def test_msg_a3_25bit_signed_and_ascii():
m = MsgA3()
# positive raw
raw_25 = 0x00ABCDE
msw = (raw_25 >> 9) & 0xFFFF
lsw_fragment = raw_25 & 0x1FF
m._data[2] = msw
m._data[3] = (lsw_fragment << 7)
assert isinstance(m.wp1_latitude, float)
# negative value: set sign bit of 25-bit value
neg_raw = (1 << 24) | 12345
msw = (neg_raw >> 9) & 0xFFFF
lsw_fragment = neg_raw & 0x1FF
m._data[4] = msw
m._data[5] = (lsw_fragment << 7)
val = m.wp1_longitude
assert isinstance(val, float)
# ASCII fields
m.hpt_callsign_ab = 'AA'
assert m.hpt_callsign_ab == 'AA'
def test_msg_a5_32bit_signed_scaled_velocity():
m = MsgA5()
# build a negative 32-bit raw value
raw32 = (1 << 31) | 12345678 # sign bit set
msw = (raw32 >> 16) & 0xFFFF
lsw = raw32 & 0xFFFF
m._data[2] = msw
m._data[3] = lsw
expected = float((raw32 - (1 << 32)) * 3.81470e-6)
assert pytest.approx(m.velocity_x, rel=1e-9) == expected

36
tests/test_udp_handler.py Normal file
View File

@ -0,0 +1,36 @@
import socket
import time
from pybusmonitor1553.core.network import UdpHandler
def test_udp_handler_receives_packet():
received = {}
def cb(data, addr):
received['data'] = data
received['addr'] = addr
handler = UdpHandler(rx_ip='127.0.0.1', rx_port=0)
handler.register_callback(cb)
handler.start()
try:
# Get dynamically assigned port
bound_port = handler._sock.getsockname()[1]
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.sendto(b'ping', ('127.0.0.1', bound_port))
timeout = time.time() + 2.0
while time.time() < timeout and 'data' not in received:
time.sleep(0.01)
assert received.get('data') == b'ping'
finally:
handler.stop()
try:
s.close()
except Exception:
pass