# -*- coding: utf-8 -*- """ New message definitions using correct PyBusMonitor1553 structures. This module wraps the verified 1553 message structures from pybusmonitor1553 and provides a unified interface for the GUI. """ import ctypes import logging import sys from typing import Dict, List, Any, Optional # Setup aliasing to allow imports from embedded library # The PyBusMonitor1553 library expects to be imported as 'Grifo_E_1553lib' import sys import os # Add the Grifo_E_1553lib directory to path to enable relative imports grifo_path = os.path.join(os.path.dirname(__file__), '..', 'PyBusMonitor1553') if grifo_path not in sys.path: sys.path.insert(0, grifo_path) # Alias the module paths try: import Grifo_E_1553lib.data_types import Grifo_E_1553lib.messages except ImportError: # If that doesn't work, manually set up the aliases pass # Now import the message classes from Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters import MsgRdrSettingsAndParameters from Grifo_E_1553lib.messages.msg_rdr_operation_command import MsgRdrOperationCommand from Grifo_E_1553lib.messages.msg_graphic_setting import MsgGraphicSetting from Grifo_E_1553lib.messages.msg_nav_data_and_cursor import MsgNavDataAndCursor from Grifo_E_1553lib.messages.msg_inu_high_speed import MsgInuHighSpeed from Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters_tellback import MsgRdrSettingsAndParametersTellback from Grifo_E_1553lib.messages.msg_rdr_status_tellback import MsgRdrStatusTellback from pymsc.utils.profiler import monitor_execution from pymsc.core.field_mappings import get_field_path class Msg1553Base: """ Base wrapper class for all 1553 messages. Provides a unified interface for message access and hardware binding. """ def __init__(self, message_id: str, internal_structure: ctypes.Structure): """ Args: message_id: Identifier for the message (e.g., 'A1', 'A2', 'B6') internal_structure: The actual ctypes.Structure from PyBusMonitor1553 """ self.message_id = message_id self._internal = internal_structure self.message_wrapper = None self.logger = logging.getLogger('PyMsc') def bind_to_wrapper(self, wrapper: Any) -> None: """Links the local structure to the BusMonitorCore wrapper.""" self.message_wrapper = wrapper def get_datum(self) -> None: """ Updates the internal structure from the hardware wrapper. Synchronizes data from the 1553 bus into our local copy. """ if self.message_wrapper is None: return try: # Access the raw message from the wrapper raw_msg = self.message_wrapper.message # Copy the entire structure using ctypes # This preserves all bitfields and nested structures correctly ctypes.memmove( ctypes.addressof(self._internal), ctypes.addressof(raw_msg), ctypes.sizeof(self._internal) ) except Exception as e: self.logger.error(f"Sync error for {self.message_id}: {e}") def get_field_value(self, field_path: str) -> Any: """ Get value from a potentially nested field using dot notation. Args: field_path: Path to field, e.g., "rdr_mode_command.stby" or "settings.altitude_block_selection" Returns: The field value, or None if not found """ try: parts = field_path.split('.') obj = self._internal # Navigate through the hierarchy for part in parts[:-1]: obj = getattr(obj, part) # Get the final field final_field = parts[-1] # Check if there's a getter method (for Union types with bitfields) getter_name = f"get_{final_field}" if hasattr(obj, getter_name): return getattr(obj, getter_name)() else: # Direct attribute access value = getattr(obj, final_field) # If it's a Union or has a .raw attribute, return the raw value if hasattr(value, 'raw'): return value.raw # Check for a get_value() method (for complex Union types) if hasattr(value, 'get_value'): return value.get_value() # If it's a ctypes Structure/LittleEndianStructure, we can't do math with it # Return None to indicate this field needs proper mapping to a sub-field if isinstance(value, (ctypes.Structure, ctypes.Union)): self.logger.warning( f"Field {field_path} returned a ctypes Structure/Union. " f"Consider mapping to a specific sub-field instead." ) return None return value except Exception as e: self.logger.error(f"Error getting field {field_path} from {self.message_id}: {e}") return None def set_field_value(self, field_path: str, value: Any) -> None: """ Set value to a potentially nested field using dot notation. Triggers hardware transmission if wrapper is bound. Args: field_path: Path to field, e.g., "rdr_mode_command.stby" value: Value to set """ if self.message_wrapper is None: self.logger.warning(f"Message {self.message_id} is not bound") return try: parts = field_path.split('.') obj = self._internal obj_wrapper = self.message_wrapper.message # Navigate through both structures in parallel for part in parts[:-1]: obj = getattr(obj, part) obj_wrapper = getattr(obj_wrapper, part) final_field = parts[-1] # Check if there's a setter method (for Union types with bitfields) setter_name = f"set_{final_field}" if hasattr(obj, setter_name): # Use the setter method getattr(obj, setter_name)(int(value)) if hasattr(obj_wrapper, setter_name): getattr(obj_wrapper, setter_name)(int(value)) else: # Direct attribute access # Check if the field is a Union type with a 'raw' attribute field_value = getattr(obj, final_field) if hasattr(field_value, 'raw'): # It's a Union type, set the raw value field_value.raw = int(value) # Do the same for the wrapper wrapper_field_value = getattr(obj_wrapper, final_field) if hasattr(wrapper_field_value, 'raw'): wrapper_field_value.raw = int(value) else: # Simple type, direct assignment setattr(obj, final_field, int(value)) setattr(obj_wrapper, final_field, int(value)) # Trigger UDP transmission if hasattr(self.message_wrapper, 'send'): self.message_wrapper.send() except Exception as e: self.logger.error(f"Failed to set field {field_path} to {value}: {e}") def get_dict(self) -> Dict[str, Any]: """ Converts the message data to a dictionary. Note: This is a simplified version for compatibility. """ result = {} for field_name, field_type in self._internal._fields_: try: value = getattr(self._internal, field_name) if hasattr(value, 'raw'): result[field_name] = value.raw else: result[field_name] = value except: pass return result # ======================================================================== # Legacy API compatibility methods # ======================================================================== def get_value_for_field(self, field_name: str) -> Any: """ Legacy API method for GUI compatibility. Automatically maps flat field names to nested paths. Args: field_name: Legacy flat field name (e.g., 'stby') Returns: Field value """ # Map legacy field name to actual nested path field_path = get_field_path(self.message_id, field_name) return self.get_field_value(field_path) def set_value_for_field(self, field_name: str, value: Any) -> None: """ Legacy API method for GUI compatibility. Automatically maps flat field names to nested paths. Args: field_name: Legacy flat field name (e.g., 'stby') value: Value to set """ # Map legacy field name to actual nested path field_path = get_field_path(self.message_id, field_name) self.set_field_value(field_path, value) # --- Message Class Definitions --- class MsgA1Class(Msg1553Base): """A1: Radar Settings and Parameters""" def __init__(self): super().__init__('A1', MsgRdrSettingsAndParameters()) class MsgA2Class(Msg1553Base): """A2: Radar Operation Command""" def __init__(self): super().__init__('A2', MsgRdrOperationCommand()) class MsgA3Class(Msg1553Base): """A3: Graphic Settings""" def __init__(self): super().__init__('A3', MsgGraphicSetting()) class MsgA4Class(Msg1553Base): """A4: Navigation Data and Cursor""" def __init__(self): super().__init__('A4', MsgNavDataAndCursor()) class MsgA5Class(Msg1553Base): """A5: INU High Speed Data""" def __init__(self): super().__init__('A5', MsgInuHighSpeed()) class MsgB6Class(Msg1553Base): """B6: Radar Settings and Parameters Tellback""" def __init__(self): super().__init__('B6', MsgRdrSettingsAndParametersTellback()) class MsgB7Class(Msg1553Base): """B7: Radar Status Tellback""" def __init__(self): super().__init__('B7', MsgRdrStatusTellback()) # Note: B9 and B10 are not yet defined in PyBusMonitor1553 # For now, we'll create minimal placeholders class MsgB9Class(Msg1553Base): """B9: Antenna Position (Placeholder)""" def __init__(self): # Create a minimal structure as placeholder class PlaceholderB9(ctypes.Structure): _pack_ = 1 _fields_ = [ ('B09_Timetag_AC', ctypes.c_int32), ('B09_NavAz', ctypes.c_int16), ('B09_NavEl', ctypes.c_int16), ('B09_BodyAz', ctypes.c_int16), ('B09_BodyEl', ctypes.c_int16) ] super().__init__('B9', PlaceholderB9()) class MsgB10Class(Msg1553Base): """B10: Processor Timetag (Placeholder)""" def __init__(self): # Create a minimal structure as placeholder class PlaceholderB10(ctypes.Structure): _pack_ = 1 _fields_ = [('B10_Timetag_PX', ctypes.c_int32)] super().__init__('B10', PlaceholderB10()) # --- Message Instances --- msg_a1 = MsgA1Class() msg_a2 = MsgA2Class() msg_a3 = MsgA3Class() msg_a4 = MsgA4Class() msg_a5 = MsgA5Class() msg_b6 = MsgB6Class() msg_b7 = MsgB7Class() msg_b9 = MsgB9Class() msg_b10 = MsgB10Class() messages_1553: List[Msg1553Base] = [ msg_a1, msg_a2, msg_a3, msg_a4, msg_a5, msg_b6, msg_b7, msg_b9, msg_b10 ] @monitor_execution def update_all_1553_messages() -> None: """Update all messages from their hardware wrappers.""" for msg in messages_1553: msg.get_datum()