from leo_grifo_core import theRecorder from leo_grifo_common import get_json_config from leo_terminal_serial import Terminal2Serial import logging import time import re class GrifoSerialTerminal(Terminal2Serial): def __init__(self): conf = get_json_config() serial_conf = conf.get("serial_terminal",{}) port = serial_conf.get("port","COM2") speed = serial_conf.get("speed","9600") stop_bit = int(serial_conf.get("stop_bit","1")) bytesize = serial_conf.get("bytesize",8) parity = serial_conf.get("parity","N") mode = serial_conf.get("mode","EXPECT_LF") super().__init__(port=port, baudrate=speed, stopbits=stop_bit, bytesize=bytesize, parity=parity, mode=mode) # Serial message statistics tracking for current run # Structure: { # 'total_messages': int, # 'error_messages': int, # %%E prefixed # 'fatal_messages': int, # %%F prefixed # 'recycle_count': int, # RECYCLE keyword detected # 'error_details': [], # List of (timestamp, message) tuples for %%E # 'fatal_details': [], # List of (timestamp, message) tuples for %%F # 'recycle_details': [], # List of (timestamp, message) tuples for RECYCLE # } self._serial_stats = { 'total_messages': 0, 'error_messages': 0, 'fatal_messages': 0, 'recycle_count': 0, 'error_details': [], 'fatal_details': [], 'recycle_details': [], } #override parent callback method def read_action(self, data, length): if length > 0: if self.mode == 'EXPECT_LF' and data: try: dummy = (data.decode('utf_8','ignore')).rstrip() except: dummy = '' if len(tuple( e for e in ('%%E', '%%F') if e in dummy)) > 0: theRecorder.add_step(dummy,False,'Fail detected on serial','') logging.critical(dummy) # Track error/fatal messages in statistics timestamp = time.strftime('%H:%M:%S') if '%%E' in dummy: self._serial_stats['error_messages'] += 1 self._serial_stats['error_details'].append((timestamp, dummy)) elif '%%F' in dummy: self._serial_stats['fatal_messages'] += 1 self._serial_stats['fatal_details'].append((timestamp, dummy)) # Check for RECYCLE keyword in error/fatal messages if 'RECYCLE' in dummy.upper(): self._serial_stats['recycle_count'] += 1 self._serial_stats['recycle_details'].append((timestamp, dummy)) elif len(dummy): # Log ALL serial messages at INFO level for post-analysis logging.info(f"[SERIAL] {dummy}") # Count all valid messages if len(dummy) > 0: self._serial_stats['total_messages'] += 1 def get_serial_statistics(self): """ Get current serial message statistics for the current run. Returns: dict: Dictionary containing: - total_messages: Total serial messages received - error_messages: Count of %%E messages - fatal_messages: Count of %%F messages - recycle_count: Count of RECYCLE events detected - error_details: List of (timestamp, message) for %%E - fatal_details: List of (timestamp, message) for %%F - recycle_details: List of (timestamp, message) for RECYCLE """ # Return a copy to prevent external modification return { 'total_messages': self._serial_stats['total_messages'], 'error_messages': self._serial_stats['error_messages'], 'fatal_messages': self._serial_stats['fatal_messages'], 'recycle_count': self._serial_stats['recycle_count'], 'error_details': self._serial_stats['error_details'].copy(), 'fatal_details': self._serial_stats['fatal_details'].copy(), 'recycle_details': self._serial_stats['recycle_details'].copy(), } def reset_serial_statistics(self): """ Reset serial statistics counters for a new test run. Should be called at the beginning of each test repetition. """ self._serial_stats = { 'total_messages': 0, 'error_messages': 0, 'fatal_messages': 0, 'recycle_count': 0, 'error_details': [], 'fatal_details': [], 'recycle_details': [], } logging.debug("Serial statistics reset for new run") # --------------------------------------------------------------------------------------------------------------------- if __name__ == '__main__': theTerminal = GrifoSerialTerminal() theTerminal.connect() time.sleep(300) theTerminal.disconnect()