128 lines
4.5 KiB
Python
128 lines
4.5 KiB
Python
import socket
|
|
import struct
|
|
import ctypes
|
|
from datetime import datetime
|
|
from Grifo_E_1553lib import mil1553udp
|
|
from Grifo_E_1553lib.mil1553udp import UDP1553Header
|
|
from Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters import MsgRdrSettingsAndParameters
|
|
from Grifo_E_1553lib.messages.msg_rdr_operation_command import MsgRdrOperationCommand
|
|
from Grifo_E_1553lib.data_types.enums import TargetHistory, RdrModes
|
|
|
|
UDP_IP = "127.0.0.1"
|
|
UDP_PORT = 5005
|
|
MARKER_1553 = 0x1553
|
|
OTYPE_BC = 0x4342
|
|
|
|
print(f"Sending to {UDP_IP}:{UDP_PORT}")
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
def create_a1_message():
|
|
"""Creates an example MsgRdrSettingsAndParameters message."""
|
|
message = MsgRdrSettingsAndParameters()
|
|
message.settings.set_history_level(TargetHistory.TARGET_HISTORY_LEVEL_03) #non serve più
|
|
subaddress = 1 # Sostituisci con il subaddress corretto per A1
|
|
direction = 0 # Sostituisci con la direzione corretta per A1
|
|
|
|
#Create command word
|
|
command_word = (subaddress << 5) | (direction << 10)
|
|
#message.command_word = command_word
|
|
|
|
|
|
# set date_of_mission from system clock
|
|
now = datetime.now()
|
|
try:
|
|
# DateOfMission provides setters for year/month/day
|
|
message.date_of_mission.set_year_of_mission(now.year - 2000)
|
|
message.date_of_mission.set_month_of_mission(now.month)
|
|
message.date_of_mission.set_day_of_mission(now.day)
|
|
except Exception:
|
|
# Some instances may expose raw int; fallback to assign raw
|
|
from Grifo_E_1553lib.data_types.date_of_mission import DateOfMission
|
|
dm = DateOfMission()
|
|
dm.set_year_of_mission(now.year - 2000)
|
|
dm.set_month_of_mission(now.month)
|
|
dm.set_day_of_mission(now.day)
|
|
try:
|
|
setattr(message, 'date_of_mission', dm.raw)
|
|
except Exception:
|
|
pass
|
|
|
|
# set time_of_mission as seconds since midnight divided by 2 (lsb = 2s)
|
|
seconds = now.hour * 3600 + now.minute * 60 + now.second
|
|
time_field_value = seconds // 2
|
|
try:
|
|
message.time_of_mission.set(time_field_value)
|
|
except Exception:
|
|
# fallback: assign raw value
|
|
try:
|
|
message.time_of_mission = time_field_value
|
|
except Exception:
|
|
pass
|
|
|
|
return message, command_word
|
|
|
|
def create_a2_message():
|
|
"""Creates an example MsgRdrOperationCommand message."""
|
|
message = MsgRdrOperationCommand()
|
|
#message.rdr_mode_command.set_master_mode(RdrModes.ACM) #non serve più
|
|
subaddress = 2 # Sostituisci con il subaddress corretto per A2
|
|
direction = 0 # Sostituisci con la direzione corretta per A2
|
|
|
|
#Create command word
|
|
command_word = (subaddress << 5) | (direction << 10)
|
|
#message.command_word = command_word
|
|
|
|
return message, command_word
|
|
|
|
try:
|
|
while True:
|
|
# Chiedi all'utente quale messaggio inviare
|
|
message_type = input("Enter message type (A1 or A2, or 'q' to quit): ").upper()
|
|
|
|
if message_type == "Q":
|
|
break
|
|
|
|
if message_type == "A1":
|
|
message,command_word = create_a1_message()
|
|
elif message_type == "A2":
|
|
message, command_word = create_a2_message()
|
|
else:
|
|
print("Invalid message type")
|
|
continue
|
|
|
|
# Crea un header UDP1553 di esempio
|
|
header = UDP1553Header()
|
|
header.marker1553 = MARKER_1553
|
|
header.otype = OTYPE_BC # BC
|
|
packed_header = bytes(header)
|
|
|
|
# Imposta la command word nel messaggio (se necessario)
|
|
# (Potrebbe non essere necessario se la command word non viene utilizzata)
|
|
#message.command_word = command_word
|
|
|
|
packed_command_word = struct.pack("<H", command_word)
|
|
packed_message = bytes(message)
|
|
|
|
# Combina l'header e il messaggio
|
|
message_to_send = packed_header + packed_command_word + packed_message[2:]
|
|
|
|
print(f"Dati spediti: {message_to_send}")
|
|
print(f"cw {command_word} cw packed {packed_command_word} ")
|
|
#control_word = struct.unpack("<H", message_to_send[0:2])[0]
|
|
subaddress = (command_word >> 5) & 0x1F # Estrai i bit 5-9 (subaddress)
|
|
direction = (command_word >> 10) & 0x01 # Estrai il bit 10 (direction)
|
|
print(f"Subaddress: {subaddress}")
|
|
print(f"Direction: {direction}")
|
|
|
|
print(f"Valore di settings.raw: 0x{message.settings.raw:04X}") #Aggiunta
|
|
|
|
sock.sendto(message_to_send, (UDP_IP, UDP_PORT))
|
|
print(f"Message {message_type} sent!")
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
finally:
|
|
print("Exiting...")
|
|
sock.close()
|