SXXXXXXX_PyMsc/pymsc/PyBusMonitor1553/Grifo_E_1553lib/connection/utils.py

167 lines
4.8 KiB
Python

import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'src')))
from Grifo_E_1553lib.mil1553udp import UDP1553Header ,UDP1553Message, cw
from Grifo_E_1553lib.udp_receiver import UDPReceiver
import socket
import ctypes
import time
from pynput.keyboard import Key, Listener
import threading
# from Grifo_E_1553lib.messages.messages import B7
mcounter = 0
DEBUG = False
def prepare_udp_message(msg_list):
global mcounter
udp_header = UDP1553Header()
udp_header.marker1553 = ctypes.c_uint16(udp_header.k_marker_1553)
udp_header.otype = ctypes.c_uint16(udp_header.k_otype_bc) # BC
udp_header.mcounter = mcounter
udp_header.ta = 0
packed_header = bytes(udp_header)
packed_messages = packed_header
for msg in msg_list:
packed_messages+= msg.get_packed_message() #bytes(msg)
packed_messages+= bytes(ctypes.c_uint16(udp_header.k_marker_end_1553))
# print(f"size of message: {len(packed_messages)} bytes.")
mcounter+=1
return packed_messages
UDP_IP = "127.0.0.1"
UDP_SEND_PORT = 51553
UDP_RECV_PORT = 61553
# def init_recv_sock():
# sock.bind((UDP_IP, UDP_RECV_PORT))
sock = None
BIND_ERROR_MSG = None
def init_recv_sock(ip: str = None, port: int = None):
"""Initialize and bind the receive socket. Call this when ready to listen.
Returns True on success, False on failure. On failure `get_bind_error()`
returns the error string.
"""
global sock, BIND_ERROR_MSG, UDP_IP, UDP_RECV_PORT
if ip is None:
ip = UDP_IP
if port is None:
port = UDP_RECV_PORT
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((ip, port))
sock = s
BIND_ERROR_MSG = None
return True
except OSError as e:
BIND_ERROR_MSG = str(e)
sock = None
return False
def get_bind_error():
return BIND_ERROR_MSG
def close_recv_sock():
"""Close and clear the global receive socket if it exists."""
global sock
try:
if sock is not None:
try:
sock.close()
except Exception:
pass
sock = None
except Exception:
pass
def send_udp_msg(msg):
"""Send a UDP message to the configured send port.
If the global receive socket hasn't been initialized, this function
uses a temporary unbound socket for sending only.
"""
# use a temporary socket for sending if no bound socket exists
if sock is None:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(msg, (UDP_IP, UDP_SEND_PORT))
return
sock.sendto(msg, (UDP_IP, UDP_SEND_PORT))
def current_milli_time():
return round(time.time() * 1000)
STOP_ALL = threading.Event()
STOP_ALL.clear()
canSend = threading.Event()
def keypressed(key):
if key == Key.esc:
print("Quitting the program...")
STOP_ALL.set()
return False
def receive_messages():
# Wait for a bound receive socket; caller should ensure init_recv_sock() was called.
if sock is None:
# No socket available: caller should treat this as termination condition
return [None, None, None]
try:
data, addr = sock.recvfrom(4096)
except OSError:
# Socket was closed or invalid; return None to allow caller to exit cleanly
return [None, None, None]
# print(f"RECEIVED FROM: {addr} ")
# print(f"RECEIVEDDDDDDDDDDD {len(data)}" )
# print(len(data))
UDPHeader =UDP1553Header.from_buffer_copy(bytes(data)[0:ctypes.sizeof(UDP1553Header)])
# print(UDPHeader.marker1553)
header1553 = UDP1553Message.from_buffer_copy(bytes(data)[ctypes.sizeof(UDP1553Header):ctypes.sizeof(UDP1553Header) + ctypes.sizeof(UDP1553Message)])
raw_data = bytes(data)[ctypes.sizeof(UDP1553Header) + ctypes.sizeof(UDP1553Message):]
# Debug print: show received packet summary for troubleshooting
try:
sa = header1553.cw.str.sa
wc = header1553.cw.str.wc
raw_cw = header1553.cw.raw
except Exception:
sa = None
wc = None
raw_cw = None
if DEBUG:
print(f"[UDP RECV] from={addr} SA={sa} WC={wc} CW_raw={hex(raw_cw) if raw_cw is not None else None} UDP_mcounter={UDPHeader.mcounter}")
# also dump SW if present
try:
sw = header1553.sw
if DEBUG:
print(f"[UDP RECV] SW={hex(sw)} errcode={header1553.errcode}")
except Exception:
pass
return [UDPHeader,header1553,raw_data]
# print(ctypes.c_uint16.from_buffer_copy(bytes(data)[-ctypes.sizeof(ctypes.c_uint16):]))
# print(ctypes.sizeof(UDP1553Header))
def isA(msg):
return (msg.head.cw.str.sa < 10)
# print(ctypes.sizeof(UDP1553Header))
# print(ctypes.sizeof(UDP1553Message))
# print(ctypes.sizeof(UDP1553Header)+ ctypes.sizeof(UDP1553Message))
# # :ctypes.sizeof(UDP1553Header) +