PlatSim_Genova/TestEnvironment/env/leo_grifo_1553.py
2026-01-30 16:38:33 +01:00

141 lines
5.3 KiB
Python

import logging
from abc import ABC
from typing import Any, AnyStr
import interpreter
import time
from leo_grifo_common import TestCommonInterface,add_text_log
from leo_grifo_core import theRecorder
import pdb
import traceback
class GrifoInstrumentInterface(TestCommonInterface, ABC):
def __init__(self, _timeout=0.5):
super().__init__(_timeout)
mgr = interpreter.PyInterfaceManager()
index = mgr.indexOf('Grifo 1553 Interface')
logging.info(index)
self.grifo_1553 = mgr.getInterface(index)
#self.grifo_1553.setVerbose(True)
try:
self.run(True)
except Exception as e:
log.error(e)
def __delete__(self):
pass
def check(self, expected_result, *fields, **kwargs) -> (bool, Any, AnyStr):
timeout = float(kwargs.get('timeout', 0 ))
step = float(kwargs.get('step', 0 ))
#logging.info(kwargs)
#se step > 0 sto cercando di leggere messaggio/valore
if step > 0:
if len(fields) != 2:
return False, None, 'message and field expected as argument'
msg, field = fields
if not self.grifo_1553.isMessageReadOnly(msg):
return False, None, f'message {msg} is not RX: cannot perform timing check operations'
ret, value, error = self.__check(expected_result, *fields, **kwargs)
#logging.info(f' {ret} {value} {error}')
if ret is False:
last_received_sz = self.grifo_1553.getSingleMessageReceivedSz(msg)
start = time.perf_counter()
repeat = True
time_passed = 0
ret , value, error = ( False, None, f"message {msg} not yet received" )
while repeat:
time.sleep(step)
cur_received_sz = self.grifo_1553.getSingleMessageReceivedSz(msg)
if cur_received_sz > last_received_sz:
ret, value, error = self.__check(expected_result, *fields, **kwargs)
if ret is True:
break
#logging.info(f'waiting for a new message {msg} [now = {cur_received_sz}]')
repeat = time.perf_counter() - start < timeout
else:
ret, value, error = self.__check(expected_result, *fields, **kwargs)
return ret, value, error
def __check(self, expected_result, *fields, **kwargs) -> (bool, Any, AnyStr):
str_value, error = self.get(*fields,**kwargs)
if error:
return False,str_value, error
ret = False
value = None
if isinstance(expected_result, (tuple, float, int)):
try:
value = float(str_value)
except:
ret = False
error = f"Cannot check range when {str_value} is not a float number"
else:
value = str_value
if value is not None:
if isinstance(expected_result, (list, tuple)):
if expected_result[0] is None:
ret = value <= expected_result[1]
elif expected_result[1] is None:
ret = expected_result[0] <= value
else:
ret = expected_result[0] <= value <= expected_result[1]
else:
#logging.info(f'#{value.__class__}# #{expected_result.__class__}#')
ret = value == expected_result
return ret, value, error
def set(self, value, *fields, **kwargs) -> (bool, AnyStr):
if len(fields) != 2:
return False, 'message and field expected as argument'
msg, field = fields
if value is None:
ret = self.grifo_1553.sendLastCommittedMessage(msg)
else:
ret = self.grifo_1553.assignMessageFieldValue(msg, field, value )
if kwargs.get('commitChanges',False) is True:
if kwargs.get('errorInject','BAD_CRC') is True:
self.grifo_1553.enableBusErrorInjection(0)
self.grifo_1553.commitChanges()
self.grifo_1553.disableBusErrorInjection()
else:
self.grifo_1553.commitChanges()
time.sleep(kwargs.get('timeout',0))
return ret, "inconsistent pairs message,field or invalid input" if ret is False else None
def get(self, *fields, **kwargs) -> (Any, AnyStr):
if len(fields) != 2:
return False, 'message and field expected as argument'
msg, field = fields
# breakpoint()
if field is None and msg == "@MESSAGE_ERROR_SIZE" :
val = self.grifo_1553.getMessageErrorSz()
else:
val = str( self.grifo_1553.getMessageFieldValue(msg, field ) )
return val, "inconsistent pairs message,field or return value is not available" if val is None else None
def getInterface(self):
return self.grifo_1553
def run(self,enable : bool ):
if enable is True:
self.grifo_1553.start()
else:
self.grifo_1553.stop()
theGrifo1553 = GrifoInstrumentInterface(0.2)