# -*- coding: utf-8 -*- import ctypes import logging from typing import Dict, List, Any, Optional from pymsc.utils.profiler import monitor_execution class Msg1553Class(ctypes.Structure): """ Base class for all 1553 message structures. Provides methods to sync with BusMonitorCore wrappers. """ _fields_ = [] def __init__(self, message_id: str): super().__init__() self.message_id = message_id self.message_wrapper = None self.logger = logging.getLogger('PyMsc') def bind_to_wrapper(self, wrapper: Any) -> None: """ Links this structure to a BusMonitorCore message wrapper. """ self.message_wrapper = wrapper def get_datum(self) -> None: """ Updates the ctypes structure with data from the 1553 network wrapper. """ if self.message_wrapper is None: return raw_msg = self.message_wrapper.message for field_info in self._fields_: field_name = field_info[0] try: target = getattr(raw_msg, field_name, None) if target is None: continue if hasattr(target, 'raw'): val = int(target.raw) else: val = int(target) setattr(self, field_name, val) except Exception as e: self.logger.error(f"Sync error for {self.message_id}.{field_name}: {e}") def set_value_for_field(self, field_name: str, value: Any) -> None: """ Sets a value in the 1553 wrapper and triggers physical send. """ if self.message_wrapper is None: self.logger.warning(f"Message {self.message_id} is not bound") return try: raw_msg = self.message_wrapper.message target_field = getattr(raw_msg, field_name, None) if target_field is not None: if hasattr(target_field, 'set_val'): target_field.set_val(int(value)) else: setattr(raw_msg, field_name, int(value)) setattr(self, field_name, int(value)) if hasattr(self.message_wrapper, 'send'): self.message_wrapper.send() else: self.logger.error(f"Field {field_name} not found in {self.message_id}") except Exception as e: self.logger.error(f"Failed to set field {field_name}: {e}") def get_value_for_field(self, field_name: str) -> Any: """Returns the current cached value from the ctypes structure.""" return getattr(self, field_name) def get_dict(self) -> Dict[str, Any]: """Converts the ctypes structure to a dictionary.""" result = {} for field_info in self._fields_: name = field_info[0] result[name] = getattr(self, name) return result # --- Message Classes Definitions --- class MsgA1Class(Msg1553Class): _fields_ = [ ('time_mark', ctypes.c_int), ('tgt_history', ctypes.c_int), ('alt_block', ctypes.c_int), ('channel', ctypes.c_int), ('PARAM_ID', ctypes.c_int), ('PARAM_TRANSF', ctypes.c_int), ('PARAM_TXRX', ctypes.c_int), ('param_value', ctypes.c_int), ('FREQ_AGILITY', ctypes.c_int), ('PRF_LOOKUP', ctypes.c_int), ('LPRF_THRESHOLD', ctypes.c_int), ('GND_TGT_REJ_RAD_VEL', ctypes.c_int), ('MIN_DET_GND_TGT_RAD_VEL', ctypes.c_int), ('ALE_BLANKING', ctypes.c_int), ('moving_target_gain', ctypes.c_int), ('FREQ_GROUPING_SELECTION', ctypes.c_int), ('WAVE_INTER_SEL', ctypes.c_int), ('AC_IDENTIFIER', ctypes.c_int), ('beacon_code', ctypes.c_int), ('beacon_delay', ctypes.c_int), ('video_intensity', ctypes.c_int), ('if_gain', ctypes.c_int) ] def __init__(self): super().__init__('A1') class MsgA2Class(Msg1553Class): _fields_ = [ ('stby', ctypes.c_int), ('freeze', ctypes.c_int), ('IBIT', ctypes.c_int), ('EXPAND', ctypes.c_int), ('range_scale', ctypes.c_int), ('rws_submode', ctypes.c_int), ('gm_submode', ctypes.c_int), ('acm_submode', ctypes.c_int), ('velocity_scale', ctypes.c_int), ('scan_width', ctypes.c_int), ('SAR_ACQUISITION', ctypes.c_int), ('PWR_UP_STOP_FUNCT_SEL', ctypes.c_int), ('SPOT_FUNC_SEL', ctypes.c_int), ('ZOOM_COMMAND', ctypes.c_int), ('SAR_MAP_ORIENTATION', ctypes.c_int), ('silence', ctypes.c_int), ('des_ctrl', ctypes.c_int), ('rdr_mode', ctypes.c_int), ('bars', ctypes.c_int), ('emergency', ctypes.c_int) ] def __init__(self): super().__init__('A2') class MsgA3Class(Msg1553Class): _fields_ = [ ('DISPL_WAY_01', ctypes.c_int), ('DISPL_WAY_02', ctypes.c_int), ('DISPL_WAY_03', ctypes.c_int), ('INT_FLIGHT_DIR', ctypes.c_int), ('BREAK_AWAY_CUE', ctypes.c_int), ('INT_ZONES_AND_TGT_TIP', ctypes.c_int), ('TIME_TO_GO_TO_CURSOR', ctypes.c_int), ('ALLOWABLE_STEERING_ERROR_CIRCLE', ctypes.c_int), ('ATTACK_STEERING_CUE', ctypes.c_int), ('BULLS_EYE', ctypes.c_int), ('TIME_TO_GO_TO_CURSOR_VALUE', ctypes.c_int), ('INT_MODE_SEL', ctypes.c_int), ('IFD_MODE_SEL', ctypes.c_int), ('ASC_MODE_SEL', ctypes.c_int), ('ASEC_MODE_SEL', ctypes.c_int), ('IFD_X', ctypes.c_int), ('IFD_Y', ctypes.c_int), ('R_MIN_INT', ctypes.c_int), ('R_MAX_INT', ctypes.c_int), ('NO_ESCAPE_Y', ctypes.c_int), ('TARGET_TIP_Y', ctypes.c_int), ('ASC_X', ctypes.c_int), ('ASC_Y', ctypes.c_int), ('ASEC_RADIUS', ctypes.c_int), ('HPT_INT_ZONE', ctypes.c_int), ('SPT_INT_ZONE', ctypes.c_int), ('HPT_DATALINK', ctypes.c_int), ('SPT_DATALINK', ctypes.c_int), ('HPT_FRIEND_FOE', ctypes.c_int), ('SPT_FRIEND_FOE', ctypes.c_int), ('TWS_TRACKED_TGT_1', ctypes.c_int), ('TWS_TRACKED_TGT_2', ctypes.c_int), ('TWS_TRACKED_TGT_3', ctypes.c_int), ('TWS_TRACKED_TGT_4', ctypes.c_int), ('TWS_TRACKED_TGT_5', ctypes.c_int), ('TWS_TRACKED_TGT_6', ctypes.c_int), ('TWS_TRACKED_TGT_7', ctypes.c_int), ('TWS_TRACKED_TGT_8', ctypes.c_int), ('TWS_TRACKED_TGT_ID_1', ctypes.c_int), ('TWS_TRACKED_TGT_ID_2', ctypes.c_int), ('TWS_TRACKED_TGT_ID_3', ctypes.c_int), ('TWS_TRACKED_TGT_ID_4', ctypes.c_int), ('TWS_TRACKED_TGT_ID_5', ctypes.c_int), ('TWS_TRACKED_TGT_ID_6', ctypes.c_int), ('TWS_TRACKED_TGT_ID_7', ctypes.c_int), ('TWS_TRACKED_TGT_ID_8', ctypes.c_int), ('WAY_1_LAT', ctypes.c_int), ('WAY_1_LON', ctypes.c_int), ('WAY_1_CODE', ctypes.c_int), ('WAY_VAL_1', ctypes.c_int), ('WAY_FYT_1', ctypes.c_int), ('WAY_VALUE_1', ctypes.c_int), ('WAY_2_LAT', ctypes.c_int), ('WAY_2_LON', ctypes.c_int), ('WAY_2_CODE', ctypes.c_int), ('WAY_VAL_2', ctypes.c_int), ('WAY_FYT_2', ctypes.c_int), ('WAY_VALUE_2', ctypes.c_int), ('WAY_3_LAT', ctypes.c_int), ('WAY_3_LON', ctypes.c_int), ('WAY_3_CODE', ctypes.c_int), ('WAY_VAL_3', ctypes.c_int), ('WAY_FYT_3', ctypes.c_int), ('WAY_VALUE_3', ctypes.c_int) ] def __init__(self): super().__init__('A3') class MsgA4Class(Msg1553Class): _fields_ = [ ('A4_Timetag', ctypes.c_int), ('A4_true_heading', ctypes.c_int), ('A4_magnetic_heading', ctypes.c_int), ('A4_crs_zero', ctypes.c_int), ('A4_crs_slave', ctypes.c_int), ('A4_crs_snowplough', ctypes.c_int), ('A4_crs_rate_val', ctypes.c_int), ('A4_crs_x', ctypes.c_int), ('A4_crs_y', ctypes.c_int), ('A4_crs_az', ctypes.c_int), ('A4_crs_rng', ctypes.c_int), ('A4_ant_slew', ctypes.c_int), ('A4_az_demand', ctypes.c_int), ('A4_el_demand', ctypes.c_int), ('A4_x_acc', ctypes.c_int), ('A4_y_acc', ctypes.c_int), ('A4_z_acc', ctypes.c_int), ('A4_spoi_pos_invalid', ctypes.c_int), ('A4_spoi_antitude_invalid', ctypes.c_int), ('A4_spoi_lat', ctypes.c_int), ('A4_spoi_long', ctypes.c_int), ('A4_spoi_baroalt', ctypes.c_int), ('A4_ppos_timetag', ctypes.c_int), ('A4_ppos_invalid', ctypes.c_int), ('A4_ppos_lat', ctypes.c_int), ('A4_ppos_long', ctypes.c_int), ('A4_wind_direction', ctypes.c_int), ('A4_wind_speed', ctypes.c_int), ('a4_corrected_baro_altitude', ctypes.c_int), ('a4_radio_altimeter_altitude', ctypes.c_int), ('a4_baro_altitude', ctypes.c_int), ('a4_baro_altitude_invalid', ctypes.c_int), ('a4_radio_altimeter_invalid', ctypes.c_int), ('a4_corrected_baro_altitude_invalid', ctypes.c_int), ('a4_tas_invalid', ctypes.c_int), ('a4_cas_invalid', ctypes.c_int), ('a4_tas', ctypes.c_int), ('a4_cas', ctypes.c_int), ('PRESENT_POSITION_INVALID', ctypes.c_int), ('ATTITUDE_INVALID', ctypes.c_int), ('NAVIGATION_DATA_INVALID', ctypes.c_int), ('CLEARANCE_PLANE_DIST', ctypes.c_int), ('SAR_ENABLED', ctypes.c_int), ('NORM_GHOST_SELECTION', ctypes.c_int), ('DTT_ENABLED', ctypes.c_int) ] def __init__(self): super().__init__('A4') class MsgA5Class(Msg1553Class): _fields_ = [ ('A5_Timetag', ctypes.c_int), ('A5_ptaz_timetag', ctypes.c_int), ('A5_pitch_timetag', ctypes.c_int), ('A5_roll_timetag', ctypes.c_int), ('A5_ptaz', ctypes.c_int), ('A5_pitch', ctypes.c_int), ('A5_roll', ctypes.c_int), ('A5_rate_pitch', ctypes.c_int), ('A5_rate_roll', ctypes.c_int), ('A5_rate_yaw', ctypes.c_int), ('A5_angacc_pitch', ctypes.c_int), ('A5_angacc_roll', ctypes.c_int), ('A5_angacc_yaw', ctypes.c_int), ('A5_x_vel', ctypes.c_int), ('A5_y_vel', ctypes.c_int), ('A5_z_vel', ctypes.c_int), ('a5_nx', ctypes.c_int), ('a5_ny', ctypes.c_int), ('a5_nz', ctypes.c_int) ] def __init__(self): super().__init__('A5') class MsgB6Class(Msg1553Class): _fields_ = [ ('rdr_fail', ctypes.c_int), ('and_fail', ctypes.c_int), ('pps_fail', ctypes.c_int), ('lcu_fail', ctypes.c_int), ('px_fail', ctypes.c_int), ('aesa_fail', ctypes.c_int), ('rep_overt', ctypes.c_int), ('pps_overt', ctypes.c_int), ('ant_overt', ctypes.c_int), ('lcu_overt', ctypes.c_int), ('tgt_history_tb', ctypes.c_int), ('alt_block_tb', ctypes.c_int), ('PARAM_ID_TB', ctypes.c_int), ('PARAM_TRANSF_TB', ctypes.c_int), ('PARAM_TXRX_TB', ctypes.c_int), ('PARAM_VALUE_TB', ctypes.c_int), ('tb_az_centre', ctypes.c_int), ('tb_el_centre', ctypes.c_int), ('beacon_code_tb', ctypes.c_int), ('beacon_delay_tb', ctypes.c_int), ('FREQ_AGILITY_TB', ctypes.c_int), ('CHANNEL_TB', ctypes.c_int), ('PRF_LOOKUP_TB', ctypes.c_int), ('LPRF_THRESHOLD_TB', ctypes.c_int), ('moving_target_gain_tb', ctypes.c_int), ('FREQ_GROUPING_SELECTION_TB', ctypes.c_int), ('WAVE_INTER_SEL_TB', ctypes.c_int), ('TB_CRS_RANGE', ctypes.c_int), ('TB_CRS_AZIMUTH', ctypes.c_int), ('TB_CRS_LAT', ctypes.c_int), ('TB_CRS_LON', ctypes.c_int), ('video_intensity_tb', ctypes.c_int), ('if_gain_tb', ctypes.c_int), ('tb_crs_x', ctypes.c_int), ('tb_crs_y', ctypes.c_int) ] def __init__(self): super().__init__('B6') class MsgB7Class(Msg1553Class): _fields_ = [ ('stby_tb', ctypes.c_int), ('freeze_tb', ctypes.c_int), ('IBIT_TB', ctypes.c_int), ('range_scale_tb', ctypes.c_int), ('rws_submode_tb', ctypes.c_int), ('gm_submode_tb', ctypes.c_int), ('acm_submode_tb', ctypes.c_int), ('velocity_scale_tb', ctypes.c_int), ('scan_width_tb', ctypes.c_int), ('bars_tb', ctypes.c_int), ('TRANSITION', ctypes.c_int), ('LAST_ACQ_FAIL', ctypes.c_int), ('DEGRADED_PERF_STATUS', ctypes.c_int), ('SAR_ACQUISITION_TB', ctypes.c_int), ('mode_tb', ctypes.c_int), ('lock_sts', ctypes.c_int), ('rf_status', ctypes.c_int), ('emergency_tb', ctypes.c_int) ] def __init__(self): super().__init__('B7') class MsgB9Class(Msg1553Class): _fields_ = [ ('B09_Timetag_AC', ctypes.c_int), ('B09_NavAz', ctypes.c_int), ('B09_NavEl', ctypes.c_int), ('B09_BodyAz', ctypes.c_int), ('B09_BodyEl', ctypes.c_int) ] def __init__(self): super().__init__('B9') class MsgB10Class(Msg1553Class): _fields_ = [('B10_Timetag_PX', ctypes.c_int)] def __init__(self): super().__init__('B10') # --- Message Instances --- # These instances are used by GUI and Registry. # Names are lowercase to follow standard object naming conventions. msg_a1 = MsgA1Class() msg_a2 = MsgA2Class() msg_a3 = MsgA3Class() msg_a4 = MsgA4Class() msg_a5 = MsgA5Class() msg_b6 = MsgB6Class() msg_b7 = MsgB7Class() msg_b9 = MsgB9Class() msg_b10 = MsgB10Class() # Global list for batch operations (like binding or syncing) messages_1553: List[Msg1553Class] = [ msg_a1, msg_a2, msg_a3, msg_a4, msg_a5, msg_b6, msg_b7, msg_b9, msg_b10 ] @monitor_execution def update_all_1553_messages() -> None: """Updates all messages from their bound network wrappers.""" for msg in messages_1553: msg.get_datum()