86 lines
4.2 KiB
Python
86 lines
4.2 KiB
Python
import struct
|
|
import ctypes
|
|
from . import mil1553udp
|
|
from .mil1553udp import UDP1553Header, UDP1553Message
|
|
from pybusmonitor1553.Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters import MsgRdrSettingsAndParameters
|
|
from pybusmonitor1553.Grifo_E_1553lib.messages.msg_rdr_operation_command import MsgRdrOperationCommand
|
|
from pybusmonitor1553.Grifo_E_1553lib.data_types.enums import TargetHistory
|
|
from pybusmonitor1553.Grifo_E_1553lib.data_types.frequency import Frequency
|
|
|
|
class udp_1553_msg_t(ctypes.Structure):
|
|
_pack_ = 1
|
|
_fields_ = [
|
|
("marker", ctypes.c_uint16),
|
|
("cw", ctypes.c_uint16),
|
|
("sw", ctypes.c_uint16),
|
|
("errcode", ctypes.c_uint16)
|
|
]
|
|
|
|
class UDPParser:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def parse_header(self, data):
|
|
header = UDP1553Header.from_buffer_copy(data[:ctypes.sizeof(UDP1553Header)])
|
|
return header
|
|
|
|
def parse_message(self, data, header): # Aggiunto l'argomento header
|
|
#header = UDP1553Header.from_buffer_copy(data[:ctypes.sizeof(UDP1553Header)]) #rimosso perchè non serve
|
|
|
|
# Estrai il subaddress e la direzione dalla control word
|
|
#message_data = data[ctypes.sizeof(UDP1553Header):] #RIMUOVI QUESTA LINEA
|
|
#udp_msg = udp_1553_msg_t.from_buffer_copy(data)
|
|
|
|
# Accedi ai campi sa e tr
|
|
#control_word = udp_msg.cw
|
|
#cw_structure = udp_1553_cw_structure_t.from_buffer_copy(struct.pack("<H",control_word))
|
|
#subaddress = cw_structure.sa
|
|
#direction = cw_structure.tr
|
|
control_word = struct.unpack("<H", data[0:2])[0]
|
|
subaddress = (control_word >> 5) & 0x1F # Estrai i bit 5-9 (subaddress)
|
|
direction = (control_word >> 10) & 0x01 # Estrai il bit 10 (direction)
|
|
|
|
print(f"Subaddress: {subaddress}")
|
|
print(f"Direction: {direction}")
|
|
|
|
# Determina il tipo di messaggio in base al subaddress e alla direzione
|
|
if subaddress == 1 and direction == 0: # A1 (Settings)
|
|
#message = MsgRdrSettingsAndParameters()
|
|
# Copia i dati nel messaggio campo per campo
|
|
#offset = 0
|
|
|
|
# Interpreta direttamente i byte come una struttura MsgRdrSettingsAndParameters
|
|
message = MsgRdrSettingsAndParameters.from_buffer_copy(data)
|
|
print(f"settings.raw: {message.settings.raw:04X}")
|
|
|
|
# Estrai il history level (esempio)
|
|
history_level = (message.settings.raw >> 14) & 0x03
|
|
print(f"history_level: {history_level}")
|
|
|
|
# Estrai il history level (esempio)
|
|
# Assumi che RDROperationalSettings occupi 2 bytes e che history_level sia nel byte meno significativo
|
|
#history_level_value = struct.unpack("<H", data[offset:offset+2])[0]
|
|
#history_level_index = (history_level_value >> 14) & 0x03 # Prendi solo i 2 bit meno significanti
|
|
#print(f"settings.raw prima di set_history_level: 0x{message.settings.raw:04X}") #aggiunta
|
|
#message.settings.set_history_level(TargetHistory(history_level_index).value)
|
|
#print(f"settings.raw dopo di set_history_level: 0x{message.settings.raw:04X}") #aggiunta
|
|
#message.settings = RDROperationalSettings.from_buffer_copy(message_data[offset:offset+ctypes.sizeof(RDROperationalSettings)])
|
|
#offset += 2
|
|
|
|
# Estrai il channel (esempio)
|
|
#channel = struct.unpack("<B", message_data[offset:offset+1])[0] # Assumi che sia un unsigned byte
|
|
#message.frequency.set_channel(channel)
|
|
#offset += 1
|
|
#message.frequency = Frequency.from_buffer_copy(data[offset:offset+ctypes.sizeof(Frequency)])
|
|
#offset += ctypes.sizeof(Frequency)
|
|
|
|
# ... continua ad estrarre e impostare gli altri campi ...
|
|
return message
|
|
elif subaddress == 2 and direction == 0: # A2 (Operation Command)
|
|
message = MsgRdrOperationCommand()
|
|
# Copia i dati nel messaggio campo per campo (DA IMPLEMENTARE)
|
|
#message.rdr_mode_command.set_master_mode(...)
|
|
return message
|
|
else:
|
|
print(f"Unknown message type: subaddress={subaddress}, direction={direction}")
|
|
return None # o solleva un'eccezione |