SXXXXXXX_PyMsc/pymsc/core/message_definitions.py

310 lines
10 KiB
Python

# -*- coding: utf-8 -*-
"""
New message definitions using correct PyBusMonitor1553 structures.
This module wraps the verified 1553 message structures from pybusmonitor1553
and provides a unified interface for the GUI.
"""
import ctypes
import logging
import sys
from typing import Dict, List, Any, Optional
# Setup aliasing to allow imports from embedded library
# The PyBusMonitor1553 library expects to be imported as 'Grifo_E_1553lib'
import sys
import os
# Add the Grifo_E_1553lib directory to path to enable relative imports
grifo_path = os.path.join(os.path.dirname(__file__), '..', 'PyBusMonitor1553')
if grifo_path not in sys.path:
sys.path.insert(0, grifo_path)
# Alias the module paths
try:
import Grifo_E_1553lib.data_types
import Grifo_E_1553lib.messages
except ImportError:
# If that doesn't work, manually set up the aliases
pass
# Now import the message classes
from Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters import MsgRdrSettingsAndParameters
from Grifo_E_1553lib.messages.msg_rdr_operation_command import MsgRdrOperationCommand
from Grifo_E_1553lib.messages.msg_graphic_setting import MsgGraphicSetting
from Grifo_E_1553lib.messages.msg_nav_data_and_cursor import MsgNavDataAndCursor
from Grifo_E_1553lib.messages.msg_inu_high_speed import MsgInuHighSpeed
from Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters_tellback import MsgRdrSettingsAndParametersTellback
from Grifo_E_1553lib.messages.msg_rdr_status_tellback import MsgRdrStatusTellback
from pymsc.utils.profiler import monitor_execution
from pymsc.core.field_mappings import get_field_path
class Msg1553Base:
"""
Base wrapper class for all 1553 messages.
Provides a unified interface for message access and hardware binding.
"""
def __init__(self, message_id: str, internal_structure: ctypes.Structure):
"""
Args:
message_id: Identifier for the message (e.g., 'A1', 'A2', 'B6')
internal_structure: The actual ctypes.Structure from PyBusMonitor1553
"""
self.message_id = message_id
self._internal = internal_structure
self.message_wrapper = None
self.logger = logging.getLogger('PyMsc')
def bind_to_wrapper(self, wrapper: Any) -> None:
"""Links the local structure to the BusMonitorCore wrapper."""
self.message_wrapper = wrapper
def get_datum(self) -> None:
"""
Updates the internal structure from the hardware wrapper.
Synchronizes data from the 1553 bus into our local copy.
"""
if self.message_wrapper is None:
return
try:
# Access the raw message from the wrapper
raw_msg = self.message_wrapper.message
# Copy the entire structure using ctypes
# This preserves all bitfields and nested structures correctly
ctypes.memmove(
ctypes.addressof(self._internal),
ctypes.addressof(raw_msg),
ctypes.sizeof(self._internal)
)
except Exception as e:
self.logger.error(f"Sync error for {self.message_id}: {e}")
def get_field_value(self, field_path: str) -> Any:
"""
Get value from a potentially nested field using dot notation.
Args:
field_path: Path to field, e.g., "rdr_mode_command.stby" or "settings.altitude_block_selection"
Returns:
The field value, or None if not found
"""
try:
parts = field_path.split('.')
obj = self._internal
# Navigate through the hierarchy
for part in parts[:-1]:
obj = getattr(obj, part)
# Get the final field
final_field = parts[-1]
# Check if there's a getter method (for Union types with bitfields)
getter_name = f"get_{final_field}"
if hasattr(obj, getter_name):
return getattr(obj, getter_name)()
else:
# Direct attribute access
value = getattr(obj, final_field)
# If it's a Union or has a .raw attribute, return the raw value
if hasattr(value, 'raw'):
return value.raw
return value
except Exception as e:
self.logger.error(f"Error getting field {field_path} from {self.message_id}: {e}")
return None
def set_field_value(self, field_path: str, value: Any) -> None:
"""
Set value to a potentially nested field using dot notation.
Triggers hardware transmission if wrapper is bound.
Args:
field_path: Path to field, e.g., "rdr_mode_command.stby"
value: Value to set
"""
if self.message_wrapper is None:
self.logger.warning(f"Message {self.message_id} is not bound")
return
try:
parts = field_path.split('.')
obj = self._internal
obj_wrapper = self.message_wrapper.message
# Navigate through both structures in parallel
for part in parts[:-1]:
obj = getattr(obj, part)
obj_wrapper = getattr(obj_wrapper, part)
final_field = parts[-1]
# Check if there's a setter method (for Union types with bitfields)
setter_name = f"set_{final_field}"
if hasattr(obj, setter_name):
# Use the setter method
getattr(obj, setter_name)(int(value))
if hasattr(obj_wrapper, setter_name):
getattr(obj_wrapper, setter_name)(int(value))
else:
# Direct attribute access
setattr(obj, final_field, int(value))
setattr(obj_wrapper, final_field, int(value))
# Trigger UDP transmission
if hasattr(self.message_wrapper, 'send'):
self.message_wrapper.send()
except Exception as e:
self.logger.error(f"Failed to set field {field_path} to {value}: {e}")
def get_dict(self) -> Dict[str, Any]:
"""
Converts the message data to a dictionary.
Note: This is a simplified version for compatibility.
"""
result = {}
for field_name, field_type in self._internal._fields_:
try:
value = getattr(self._internal, field_name)
if hasattr(value, 'raw'):
result[field_name] = value.raw
else:
result[field_name] = value
except:
pass
return result
# ========================================================================
# Legacy API compatibility methods
# ========================================================================
def get_value_for_field(self, field_name: str) -> Any:
"""
Legacy API method for GUI compatibility.
Automatically maps flat field names to nested paths.
Args:
field_name: Legacy flat field name (e.g., 'stby')
Returns:
Field value
"""
# Map legacy field name to actual nested path
field_path = get_field_path(self.message_id, field_name)
return self.get_field_value(field_path)
def set_value_for_field(self, field_name: str, value: Any) -> None:
"""
Legacy API method for GUI compatibility.
Automatically maps flat field names to nested paths.
Args:
field_name: Legacy flat field name (e.g., 'stby')
value: Value to set
"""
# Map legacy field name to actual nested path
field_path = get_field_path(self.message_id, field_name)
self.set_field_value(field_path, value)
# --- Message Class Definitions ---
class MsgA1Class(Msg1553Base):
"""A1: Radar Settings and Parameters"""
def __init__(self):
super().__init__('A1', MsgRdrSettingsAndParameters())
class MsgA2Class(Msg1553Base):
"""A2: Radar Operation Command"""
def __init__(self):
super().__init__('A2', MsgRdrOperationCommand())
class MsgA3Class(Msg1553Base):
"""A3: Graphic Settings"""
def __init__(self):
super().__init__('A3', MsgGraphicSetting())
class MsgA4Class(Msg1553Base):
"""A4: Navigation Data and Cursor"""
def __init__(self):
super().__init__('A4', MsgNavDataAndCursor())
class MsgA5Class(Msg1553Base):
"""A5: INU High Speed Data"""
def __init__(self):
super().__init__('A5', MsgInuHighSpeed())
class MsgB6Class(Msg1553Base):
"""B6: Radar Settings and Parameters Tellback"""
def __init__(self):
super().__init__('B6', MsgRdrSettingsAndParametersTellback())
class MsgB7Class(Msg1553Base):
"""B7: Radar Status Tellback"""
def __init__(self):
super().__init__('B7', MsgRdrStatusTellback())
# Note: B9 and B10 are not yet defined in PyBusMonitor1553
# For now, we'll create minimal placeholders
class MsgB9Class(Msg1553Base):
"""B9: Antenna Position (Placeholder)"""
def __init__(self):
# Create a minimal structure as placeholder
class PlaceholderB9(ctypes.Structure):
_pack_ = 1
_fields_ = [
('B09_Timetag_AC', ctypes.c_int32),
('B09_NavAz', ctypes.c_int16),
('B09_NavEl', ctypes.c_int16),
('B09_BodyAz', ctypes.c_int16),
('B09_BodyEl', ctypes.c_int16)
]
super().__init__('B9', PlaceholderB9())
class MsgB10Class(Msg1553Base):
"""B10: Processor Timetag (Placeholder)"""
def __init__(self):
# Create a minimal structure as placeholder
class PlaceholderB10(ctypes.Structure):
_pack_ = 1
_fields_ = [('B10_Timetag_PX', ctypes.c_int32)]
super().__init__('B10', PlaceholderB10())
# --- Message Instances ---
msg_a1 = MsgA1Class()
msg_a2 = MsgA2Class()
msg_a3 = MsgA3Class()
msg_a4 = MsgA4Class()
msg_a5 = MsgA5Class()
msg_b6 = MsgB6Class()
msg_b7 = MsgB7Class()
msg_b9 = MsgB9Class()
msg_b10 = MsgB10Class()
messages_1553: List[Msg1553Base] = [
msg_a1, msg_a2, msg_a3, msg_a4, msg_a5,
msg_b6, msg_b7, msg_b9, msg_b10
]
@monitor_execution
def update_all_1553_messages() -> None:
"""Update all messages from their hardware wrappers."""
for msg in messages_1553:
msg.get_datum()