PlatSim_Genova/TestEnvironment/env/leo_grifo_terminal.py
2026-01-30 16:38:33 +01:00

129 lines
5.4 KiB
Python

from leo_grifo_core import theRecorder
from leo_grifo_common import get_json_config
from leo_terminal_serial import Terminal2Serial
import logging
import time
import re
class GrifoSerialTerminal(Terminal2Serial):
def __init__(self):
conf = get_json_config()
serial_conf = conf.get("serial_terminal",{})
port = serial_conf.get("port","COM2")
speed = serial_conf.get("speed","9600")
stop_bit = int(serial_conf.get("stop_bit","1"))
bytesize = serial_conf.get("bytesize",8)
parity = serial_conf.get("parity","N")
mode = serial_conf.get("mode","EXPECT_LF")
super().__init__(port=port,
baudrate=speed,
stopbits=stop_bit,
bytesize=bytesize,
parity=parity,
mode=mode)
# Serial message statistics tracking for current run
# Structure: {
# 'total_messages': int,
# 'error_messages': int, # %%E prefixed
# 'fatal_messages': int, # %%F prefixed
# 'recycle_count': int, # RECYCLE keyword detected
# 'error_details': [], # List of (timestamp, message) tuples for %%E
# 'fatal_details': [], # List of (timestamp, message) tuples for %%F
# 'recycle_details': [], # List of (timestamp, message) tuples for RECYCLE
# }
self._serial_stats = {
'total_messages': 0,
'error_messages': 0,
'fatal_messages': 0,
'recycle_count': 0,
'error_details': [],
'fatal_details': [],
'recycle_details': [],
}
#override parent callback method
def read_action(self, data, length):
if length > 0:
if self.mode == 'EXPECT_LF' and data:
try:
dummy = (data.decode('utf_8','ignore')).rstrip()
except:
dummy = ''
if len(tuple( e for e in ('%%E', '%%F') if e in dummy)) > 0:
theRecorder.add_step(dummy,False,'Fail detected on serial','')
logging.critical(dummy)
# Track error/fatal messages in statistics
timestamp = time.strftime('%H:%M:%S')
if '%%E' in dummy:
self._serial_stats['error_messages'] += 1
self._serial_stats['error_details'].append((timestamp, dummy))
elif '%%F' in dummy:
self._serial_stats['fatal_messages'] += 1
self._serial_stats['fatal_details'].append((timestamp, dummy))
# Check for RECYCLE keyword in error/fatal messages
if 'RECYCLE' in dummy.upper():
self._serial_stats['recycle_count'] += 1
self._serial_stats['recycle_details'].append((timestamp, dummy))
elif len(dummy):
#theRecorder.add_step(dummy,True,'Message received on serial','')
#redirect to logger file only info level
logging.debug(dummy)
# Count all valid messages
if len(dummy) > 0:
self._serial_stats['total_messages'] += 1
def get_serial_statistics(self):
"""
Get current serial message statistics for the current run.
Returns:
dict: Dictionary containing:
- total_messages: Total serial messages received
- error_messages: Count of %%E messages
- fatal_messages: Count of %%F messages
- recycle_count: Count of RECYCLE events detected
- error_details: List of (timestamp, message) for %%E
- fatal_details: List of (timestamp, message) for %%F
- recycle_details: List of (timestamp, message) for RECYCLE
"""
# Return a copy to prevent external modification
return {
'total_messages': self._serial_stats['total_messages'],
'error_messages': self._serial_stats['error_messages'],
'fatal_messages': self._serial_stats['fatal_messages'],
'recycle_count': self._serial_stats['recycle_count'],
'error_details': self._serial_stats['error_details'].copy(),
'fatal_details': self._serial_stats['fatal_details'].copy(),
'recycle_details': self._serial_stats['recycle_details'].copy(),
}
def reset_serial_statistics(self):
"""
Reset serial statistics counters for a new test run.
Should be called at the beginning of each test repetition.
"""
self._serial_stats = {
'total_messages': 0,
'error_messages': 0,
'fatal_messages': 0,
'recycle_count': 0,
'error_details': [],
'fatal_details': [],
'recycle_details': [],
}
logging.debug("Serial statistics reset for new run")
# ---------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
theTerminal = GrifoSerialTerminal()
theTerminal.connect()
time.sleep(300)
theTerminal.disconnect()