274 lines
9.6 KiB
Python
274 lines
9.6 KiB
Python
# test common interface
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import os,sys
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from pathlib import Path
|
|
from typing import Any, AnyStr, Tuple
|
|
|
|
from leo_grifo_core import theRecorder
|
|
from leo_grifo_pdf2report import PDFClass
|
|
|
|
|
|
#logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
|
|
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG)
|
|
|
|
CWD = os.path.dirname(__file__)
|
|
DEF_VERIFICATION_REPORT_FOLDER = os.path.join(CWD,"..","VERIFICATION_REPORTS")
|
|
|
|
class TestCommonNotification:
|
|
def __int__(self):
|
|
self.trap = dict()
|
|
|
|
def set(self, _interface_name, _value, _desc, _uptime):
|
|
self.trap['interface'] = _interface_name
|
|
self.trap['data'] = _value
|
|
self.trap['description'] = _desc
|
|
if _uptime:
|
|
self.trap['time'] = _uptime
|
|
else:
|
|
self.trap['time'] = datetime.datetime.now()
|
|
return self
|
|
|
|
#
|
|
# Common interface for each set/get class
|
|
#
|
|
class TestCommonInterface(ABC):
|
|
def __init__(self, _timeout=0.1):
|
|
self.timeout = _timeout
|
|
|
|
def default_timeout(self):
|
|
return self.timeout
|
|
|
|
def enable_notification(self) -> bool:
|
|
logging.error('This interface has no notification management')
|
|
return False
|
|
|
|
def disable_notification(self) -> bool:
|
|
logging.error('This interface has no notification management')
|
|
return False
|
|
|
|
def get_last_notification(self) -> (Any, AnyStr):
|
|
logging.error('This interface has no notification management')
|
|
return None, 'Notifications not available'
|
|
|
|
@abstractmethod
|
|
def check(self, expected_result, *fields, **kwargs) -> (bool, Any, AnyStr):
|
|
return False, None, None
|
|
|
|
@abstractmethod
|
|
def set(self, value, *fields, **kwargs) -> (bool, AnyStr):
|
|
return True, None
|
|
|
|
@abstractmethod
|
|
def get(self, *fields, **kwargs) -> (Any, AnyStr):
|
|
return None, None
|
|
|
|
def discover(self, *fields, **kwargs) -> list:
|
|
logging.error('This interface has no discover facilities')
|
|
return []
|
|
|
|
#
|
|
# Common function interface for using in test
|
|
#
|
|
def check(equipment_interface: TestCommonInterface, expected_result, *fields, **kwargs) -> (bool, Any):
|
|
asHex = kwargs.get('hex',False)
|
|
expected = None
|
|
error_desc = ''
|
|
|
|
common_desc = 'Check that '+ ' '.join((f'{e}' for e in fields))
|
|
if isinstance(expected_result, (list, tuple)) and len(expected_result) == 2:
|
|
expected_ranges = ( f'0x{v:X}' for v in expected_result ) if asHex is True else expected_result
|
|
if expected_result[0] is None:
|
|
common_desc += ' is less than {} '.format(expected_ranges[1])
|
|
expected = expected_ranges[1]
|
|
elif expected_result[1] is None:
|
|
common_desc += ' is greater than {} '.format(expected_ranges[0])
|
|
expected = expected_ranges[0]
|
|
else:
|
|
common_desc += ' is between {} and {} '.format(expected_ranges[0], expected_ranges[1])
|
|
# expected = ???
|
|
else:
|
|
expected = expected_result
|
|
desc_expected = f'0x{expected_result:X}' if isinstance(expected_result, int ) and asHex is True else expected_result
|
|
common_desc += ' is equal to {}'.format(desc_expected)
|
|
|
|
common_desc += kwargs.get('description', '')
|
|
logging.info(common_desc)
|
|
|
|
try:
|
|
ret, value, error = equipment_interface.check(expected_result,*fields,**kwargs)
|
|
except Exception as e:
|
|
error_desc = 'EXCEPTION: {}'.format(e)
|
|
logging.error(error_desc)
|
|
theRecorder.add_step(common_desc, False, error_desc, '{}'.format(expected))
|
|
return False, None
|
|
|
|
if isinstance(expected_result, int ) and asHex is True:
|
|
desc_value = f'0x{int(value):X}'
|
|
else:
|
|
desc_value = f'{value}'
|
|
|
|
if ret:
|
|
logging.info('OK: read value = {}'.format(desc_value))
|
|
else:
|
|
logging.warning('CHECK ERROR: read value = {}'.format(desc_value))
|
|
error_desc = ' Read value = {} is not the expected result'.format(desc_value)
|
|
if error is not None:
|
|
logging.error('INTERFACE ERROR: {}'.format(error))
|
|
error_desc = error
|
|
|
|
theRecorder.add_step(common_desc, ret, error_desc, desc_value)
|
|
return ret, error_desc
|
|
|
|
|
|
def getValue(equipment_interface: TestCommonInterface, *fields, **kwargs) -> (Any, AnyStr):
|
|
asHex = kwargs.get('hex',False)
|
|
common_desc = 'Get '
|
|
for f in fields:
|
|
common_desc += f'{f} '
|
|
common_desc += kwargs.get('description', '')
|
|
logging.info(common_desc)
|
|
error_desc = ''
|
|
|
|
if kwargs.get('timeout') is None:
|
|
kwargs['timeout'] = equipment_interface.default_timeout()
|
|
try:
|
|
value, error = equipment_interface.get(*fields,**kwargs)
|
|
except Exception as e:
|
|
error_desc = 'EXCEPTION: {}'.format(e)
|
|
logging.error(error_desc)
|
|
theRecorder.add_step(common_desc, False, error_desc, '')
|
|
return None, error_desc
|
|
|
|
ret = True
|
|
if error is not None:
|
|
logging.error('INTERFACE ERROR: {}'.format(error))
|
|
error_desc = error
|
|
ret = False
|
|
|
|
desc_value = f'0x{value:X}' if asHex is True else value
|
|
theRecorder.add_step(common_desc, ret, error_desc, '{} '.format(desc_value))
|
|
return value, error
|
|
|
|
|
|
#NEW SET: gestione in caso l'elemento passato come input_value è volutamente sbagliato
|
|
def setValue(equipment_interface: TestCommonInterface, input_value, *fields, **kwargs) -> (bool, Any):
|
|
common_desc = 'Set '
|
|
for f in fields:
|
|
common_desc += f'{f} '
|
|
common_desc += f'to {input_value}'
|
|
common_desc += kwargs.get('description', '')
|
|
|
|
# Allow suppressing INFO log for repetitive operations (e.g., tgt_gen timetag updates)
|
|
# Use verbose=False to reduce log spam, defaults to True for backward compatibility
|
|
if kwargs.get('verbose', True):
|
|
logging.info(common_desc)
|
|
|
|
if kwargs.get('timeout') is None:
|
|
kwargs['timeout'] = equipment_interface.default_timeout()
|
|
|
|
if kwargs.get('error_case') is None:
|
|
kwargs['error_case'] = False
|
|
|
|
try:
|
|
ret, error = equipment_interface.set(input_value,*fields,**kwargs)
|
|
except Exception as e:
|
|
exception_desc = 'EXCEPTION: {}'.format(e)
|
|
logging.error(exception_desc)
|
|
|
|
if(kwargs.get('error_case',False)==True):
|
|
#time.sleep(kwargs.get('timeout', equipment_interface.timeout))
|
|
theRecorder.add_step(common_desc, True, exception_desc, '{}'.format(input_value))#verde
|
|
return True, common_desc
|
|
else:
|
|
theRecorder.add_step(common_desc, False, exception_desc, '{}'.format(input_value))
|
|
#time.sleep(kwargs.get('timeout', equipment_interface.timeout))
|
|
return False, None
|
|
|
|
error_desc = ''
|
|
if error is not None:
|
|
logging.error('INTERFACE ERROR: {}'.format(error))
|
|
error_desc = error
|
|
|
|
if(kwargs.get('error_case',False)==True and ret==False):
|
|
ret=True
|
|
|
|
#time.sleep(kwargs.get('timeout', equipment_interface.timeout))
|
|
theRecorder.add_step(common_desc, ret, error_desc, '{} '.format(input_value))
|
|
return ret, error_desc
|
|
|
|
|
|
def check_in_range(equipment_interface: TestCommonInterface, expected_range: Tuple, *fields, **kwargs) -> (bool, Any):
|
|
return check(equipment_interface, expected_range, *fields, **kwargs)
|
|
|
|
|
|
def check_greater_than(equipment_interface: TestCommonInterface, lower_range, *fields, **kwargs) -> (bool, Any):
|
|
return check(equipment_interface, (lower_range, None), *fields, **kwargs)
|
|
|
|
|
|
def check_less_than(equipment_interface: TestCommonInterface, upper_range, *fields, **kwargs) -> (bool, Any):
|
|
return check(equipment_interface, (None, upper_range), *fields, **kwargs)
|
|
|
|
|
|
def add_text_log(text):
|
|
theRecorder.add_step(text, True, None, None)
|
|
logging.info(text)
|
|
|
|
def get_json_config():
|
|
config_file = os.path.join(CWD,'json','instrument_settings.json')
|
|
c = open(config_file)
|
|
return json.load(c) if c is not None else {}
|
|
|
|
def record_pass_fail_result(description, result, errors, expected):
|
|
theRecorder.add_step(description, result, errors, expected)
|
|
text = f'FAILED: {description} - expected result was: {expected}'
|
|
if result:
|
|
text = f'PASS: {description}'
|
|
logging.info(text)
|
|
|
|
def generate_pdf_report(template_info_filename, elapsed_time):
|
|
f = open(os.path.join(CWD,'json','default_template.json'))
|
|
t = open(template_info_filename)
|
|
json_template = json.load(f)
|
|
json_test = json.load(t)
|
|
|
|
my_pdf = PDFClass(elapsed_time)
|
|
doc = my_pdf.start_doc(json_template)
|
|
file_name = my_pdf.pdf_name(json_template, '')
|
|
my_pdf.pdf_preamble(doc, json_test)
|
|
my_pdf.pdf_test_information_section(doc, json_test)
|
|
|
|
session = theRecorder.get_db_session()
|
|
if len(session) > 1:
|
|
tot = 0
|
|
ok = 0
|
|
for v in session:
|
|
tot = tot + 1
|
|
if v.test['fail'] == 0:
|
|
ok = ok + 1
|
|
my_pdf.pdf_step_summary(ok, tot - ok, doc)
|
|
|
|
for s in session:
|
|
my_pdf.pdf_step_result(s.test['name'], s.test['pass'], s.test['fail'], doc)
|
|
step_list = theRecorder.get_db()
|
|
my_pdf.pdf_step_execution(step_list[s.idx_first:s.idx_last], doc)
|
|
|
|
folder_path = f"{DEF_VERIFICATION_REPORT_FOLDER}/{json_test['ID']}"
|
|
print(folder_path)
|
|
if not Path(folder_path).is_dir():
|
|
os.makedirs(Path(folder_path))
|
|
|
|
my_pdf.pdf_generate(folder_path, doc, file_name)
|
|
|
|
|
|
def close_session(session_name: str):
|
|
theRecorder.close_session(session_name)
|
|
|
|
|
|
def create_and_enable_notifier(interface_obj: TestCommonInterface):
|
|
theRecorder.add_step('Launch notification receiver ...', True, None, None)
|
|
return interface_obj.enable_notification()
|