211 lines
8.4 KiB
Python
211 lines
8.4 KiB
Python
import logging
|
|
from abc import ABC
|
|
from typing import Any, AnyStr
|
|
import interpreter
|
|
import time
|
|
|
|
from leo_grifo_common import TestCommonInterface,add_text_log
|
|
from leo_grifo_core import theRecorder
|
|
import pdb
|
|
import traceback
|
|
|
|
class GrifoInstrumentInterface(TestCommonInterface, ABC):
|
|
def __init__(self, _timeout=0.5):
|
|
super().__init__(_timeout)
|
|
mgr = interpreter.PyInterfaceManager()
|
|
index = mgr.indexOf('Grifo 1553 Interface')
|
|
logging.info(index)
|
|
self.grifo_1553 = mgr.getInterface(index)
|
|
#self.grifo_1553.setVerbose(True)
|
|
try:
|
|
self.run(True)
|
|
except Exception as e:
|
|
log.error(e)
|
|
|
|
|
|
def __delete__(self):
|
|
pass
|
|
|
|
def check(self, expected_result, *fields, **kwargs) -> (bool, Any, AnyStr):
|
|
timeout = float(kwargs.get('timeout', 0 ))
|
|
step = float(kwargs.get('step', 0 ))
|
|
#logging.info(kwargs)
|
|
|
|
#se step > 0 sto cercando di leggere messaggio/valore
|
|
if step > 0:
|
|
if len(fields) != 2:
|
|
return False, None, 'message and field expected as argument'
|
|
msg, field = fields
|
|
if not self.grifo_1553.isMessageReadOnly(msg):
|
|
return False, None, f'message {msg} is not RX: cannot perform timing check operations'
|
|
|
|
ret, value, error = self.__check(expected_result, *fields, **kwargs)
|
|
#logging.info(f' {ret} {value} {error}')
|
|
if ret is False:
|
|
last_received_sz = self.grifo_1553.getSingleMessageReceivedSz(msg)
|
|
start = time.perf_counter()
|
|
repeat = True
|
|
time_passed = 0
|
|
ret , value, error = ( False, None, f"message {msg} not yet received" )
|
|
while repeat:
|
|
time.sleep(step)
|
|
cur_received_sz = self.grifo_1553.getSingleMessageReceivedSz(msg)
|
|
if cur_received_sz > last_received_sz:
|
|
ret, value, error = self.__check(expected_result, *fields, **kwargs)
|
|
if ret is True:
|
|
break
|
|
#logging.info(f'waiting for a new message {msg} [now = {cur_received_sz}]')
|
|
repeat = time.perf_counter() - start < timeout
|
|
else:
|
|
ret, value, error = self.__check(expected_result, *fields, **kwargs)
|
|
|
|
return ret, value, error
|
|
|
|
|
|
def __check(self, expected_result, *fields, **kwargs) -> (bool, Any, AnyStr):
|
|
|
|
str_value, error = self.get(*fields,**kwargs)
|
|
if error:
|
|
return False,str_value, error
|
|
|
|
ret = False
|
|
value = None
|
|
if isinstance(expected_result, (tuple, float, int)):
|
|
try:
|
|
value = float(str_value)
|
|
except:
|
|
ret = False
|
|
error = f"Cannot check range when {str_value} is not a float number"
|
|
else:
|
|
value = str_value
|
|
|
|
if value is not None:
|
|
if isinstance(expected_result, (list, tuple)):
|
|
if expected_result[0] is None:
|
|
ret = value <= expected_result[1]
|
|
|
|
elif expected_result[1] is None:
|
|
ret = expected_result[0] <= value
|
|
|
|
else:
|
|
ret = expected_result[0] <= value <= expected_result[1]
|
|
else:
|
|
#logging.info(f'#{value.__class__}# #{expected_result.__class__}#')
|
|
ret = value == expected_result
|
|
|
|
return ret, value, error
|
|
|
|
def set(self, value, *fields, **kwargs) -> (bool, AnyStr):
|
|
if len(fields) != 2:
|
|
return False, 'message and field expected as argument'
|
|
msg, field = fields
|
|
|
|
if value is None:
|
|
ret = self.grifo_1553.sendLastCommittedMessage(msg)
|
|
else:
|
|
ret = self.grifo_1553.assignMessageFieldValue(msg, field, value )
|
|
if kwargs.get('commitChanges',False) is True:
|
|
if kwargs.get('errorInject','BAD_CRC') is True:
|
|
self.grifo_1553.enableBusErrorInjection(0)
|
|
self.grifo_1553.commitChanges()
|
|
self.grifo_1553.disableBusErrorInjection()
|
|
else:
|
|
self.grifo_1553.commitChanges()
|
|
|
|
#time.sleep(kwargs.get('timeout',0))
|
|
return ret, "inconsistent pairs message,field or invalid input" if ret is False else None
|
|
|
|
def get(self, *fields, **kwargs) -> (Any, AnyStr):
|
|
if len(fields) != 2:
|
|
return False, 'message and field expected as argument'
|
|
msg, field = fields
|
|
# breakpoint()
|
|
if field is None and msg == "@MESSAGE_ERROR_SIZE" :
|
|
val = self.grifo_1553.getMessageErrorSz()
|
|
else:
|
|
val = str( self.grifo_1553.getMessageFieldValue(msg, field ) )
|
|
|
|
return val, "inconsistent pairs message,field or return value is not available" if val is None else None
|
|
|
|
def getInterface(self):
|
|
return self.grifo_1553
|
|
|
|
def run(self,enable : bool ):
|
|
if enable is True:
|
|
self.grifo_1553.start()
|
|
else:
|
|
self.grifo_1553.stop()
|
|
|
|
|
|
class _LazyGrifoProxy:
|
|
"""Proxy that lazily constructs a GrifoInstrumentInterface on first use.
|
|
|
|
This prevents import-time initialization of the native `interpreter`
|
|
and associated hardware resources, allowing the script to be imported
|
|
without requiring hardware connectivity.
|
|
|
|
The proxy can be monkey-patched by the simulation harness (`setup_simulation`)
|
|
to replace the global `theGrifo1553` with a mock object when running in
|
|
`--simulate` mode. Since mock replaces sys.modules['leo_grifo_1553'] after
|
|
import, the proxy must be lazily initialized to defer hardware access.
|
|
|
|
Usage in production (target reale):
|
|
- First access to any method/attribute triggers real interface creation
|
|
- Full GrifoInstrumentInterface API is transparently available
|
|
|
|
Usage in simulation:
|
|
- Mock replaces sys.modules['leo_grifo_1553'] before any access
|
|
- Script re-imports from mock module, proxy is never initialized
|
|
"""
|
|
def __init__(self, timeout: float = 0.2):
|
|
self._timeout = timeout
|
|
self._instance = None
|
|
|
|
def _ensure(self):
|
|
"""Lazy-initialize the real GrifoInstrumentInterface on first use."""
|
|
if self._instance is None:
|
|
self._instance = GrifoInstrumentInterface(self._timeout)
|
|
|
|
def __getattr__(self, item):
|
|
"""Delegate all attribute/method access to the real instance."""
|
|
# Avoid infinite recursion for internal attributes
|
|
if item.startswith('_'):
|
|
# For known API methods, pretend they exist (hasattr support)
|
|
known_methods = ['check', 'get', 'set', 'getInterface', 'run']
|
|
if item[1:] in known_methods: # Strip leading underscore for check
|
|
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{item}'")
|
|
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{item}'")
|
|
|
|
# For known API methods, they're always available (support hasattr before init)
|
|
# This prevents triggering initialization just for hasattr() checks
|
|
known_methods = ['check', 'get', 'set', 'getInterface', 'run']
|
|
if item in known_methods and self._instance is None:
|
|
# Return a wrapper that will initialize on call
|
|
def lazy_method(*args, **kwargs):
|
|
self._ensure()
|
|
return getattr(self._instance, item)(*args, **kwargs)
|
|
return lazy_method
|
|
|
|
# For all other attributes, initialize and delegate
|
|
self._ensure()
|
|
return getattr(self._instance, item)
|
|
|
|
def __repr__(self):
|
|
if self._instance is None:
|
|
return f"<LazyGrifoProxy(uninitialized, timeout={self._timeout})>"
|
|
return repr(self._instance)
|
|
|
|
def __bool__(self):
|
|
"""Support truthiness checks (always True, consistent with singleton pattern)."""
|
|
return True
|
|
|
|
def __dir__(self):
|
|
"""Support dir() and attribute discovery."""
|
|
if self._instance is None:
|
|
# Return expected methods before initialization
|
|
return ['check', 'get', 'set', 'getInterface', 'run']
|
|
return dir(self._instance)
|
|
|
|
|
|
theGrifo1553 = _LazyGrifoProxy(0.2)
|