PlatSim_Genova/TestEnvironment/scripts/serial_aux.py
2026-02-03 10:39:54 +01:00

204 lines
7.4 KiB
Python

"""
serial_aux.py
Utility script that provides `send_serial_sequence()` to open a (real or
mock) serial port, send a sequence of characters with a per-character delay,
log all traffic to a separate log file and close the port.
Configuration section below makes it easy for an operator to change params.
Usage from test script:
from serial_aux import send_serial_sequence, CONF
send_serial_sequence()
In simulation mode set the `PORT` to a value starting with `MOCK:` (for example
`MOCK:aux1`) to route the traffic to the mock provided by
`GRIFO_M_PBIT_mock.create_mock_terminal()`.
"""
from __future__ import annotations
import time
import os
import logging
from datetime import datetime
from typing import Optional
# Operator-editable configuration (top of file)
# Default PORT will be overridden to a mock port when running in simulation
# (i.e. when `GRIFO_M_PBIT_mock` is importable). The operator can still set
# this to a real COM port (e.g. "COM5") when running against real hardware.
PORT = "COM5" # Serial port (or use "MOCK:aux1" for simulation)
BAUDRATE = 115200
BYTESIZE = 8
PARITY = 'N'
STOPBITS = 1
# Delay between single characters (seconds)
CHAR_DELAY_SEC = 0.05
# Default sequence: CTRL+X, three spaces, ENTER (CR)
SEQUENCE = b"\x18 \r"
LOG_FOLDER = os.path.join(os.path.dirname(__file__), '..', 'LOG')
LOG_FILENAME_TEMPLATE = 'serial_aux_{port}_{ts}.log'
def _ensure_log_folder(path: str) -> None:
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def send_serial_sequence(port: Optional[str] = None,
baudrate: Optional[int] = None,
char_delay: Optional[float] = None,
sequence: Optional[bytes] = None,
log_folder: Optional[str] = None) -> None:
"""
Open serial port, send `sequence` one byte at a time with `char_delay`
seconds between bytes, log all sent/received traffic to a file, then close.
If the provided port string starts with "MOCK:", the function will try to
register and use the mock adapter provided by `GRIFO_M_PBIT_mock` (simulation).
"""
import importlib
port = port or PORT
baudrate = baudrate or BAUDRATE
char_delay = char_delay if char_delay is not None else CHAR_DELAY_SEC
sequence = sequence or SEQUENCE
log_folder = log_folder or LOG_FOLDER
_ensure_log_folder(log_folder)
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
safe_port = port.replace(':', '_').replace('/', '_')
log_path = os.path.join(log_folder, LOG_FILENAME_TEMPLATE.format(port=safe_port, ts=ts))
# Prepare logger for this session
logger = logging.getLogger(f"serial_aux.{safe_port}.{ts}")
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_path, encoding='utf-8')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.info(f"Opening serial port: {port} @ {baudrate}bps")
# If using mock port (simulation), ask the mock module to register an adapter
ser = None
try:
# If user requested a MOCK: port explicitly, use that. Otherwise try
# to detect simulation environment: if GRIFO_M_PBIT_mock is importable
# we prefer a mock adapter so the helper works during --simulate runs.
mock_available = False
try:
import importlib
mock_mod = importlib.import_module('GRIFO_M_PBIT_mock')
mock_available = True
except Exception:
mock_mod = None
use_mock = False
if isinstance(port, str) and port.upper().startswith('MOCK:'):
use_mock = True
elif mock_available and (port is None or port == PORT or not str(port).upper().startswith('COM')):
# When running simulation prefer mock if default port left unchanged
use_mock = True
if use_mock:
if mock_mod is None:
raise ImportError('Mock module not available to provide mock serial port')
register = getattr(mock_mod, 'register_mock_serial_port', None)
if register is None:
raise ImportError('Mock module does not provide register_mock_serial_port')
ser = register(port or ('MOCK:aux1'), log_path=log_path)
logger.info(f"Using mock serial adapter for {port or 'MOCK:aux1'}")
else:
# Try using pyserial for real hardware
try:
import serial
except Exception:
logger.exception('pyserial not available; cannot open real serial port')
raise
try:
ser = serial.Serial(port=port, baudrate=baudrate, bytesize=BYTESIZE,
parity=PARITY, stopbits=STOPBITS, timeout=0.2)
logger.info(f"Opened real serial port {port}")
except Exception as e:
logger.warning(f"Could not open real port {port}: {e}")
# Fallback to mock if available
if mock_available and mock_mod is not None:
logger.info('Falling back to mock serial adapter')
register = getattr(mock_mod, 'register_mock_serial_port')
ser = register('MOCK:aux1', log_path=log_path)
else:
raise
# Start reader loop in-line: poll for inbound data and log it while sending
start_time = time.time()
logger.info(f"Sending sequence ({len(sequence)} bytes) with {char_delay}s char delay")
# Send bytes one by one
for b in sequence:
data = bytes([b])
try:
ser.write(data)
except Exception:
# Some mock adapters may expose write() under a different name
if hasattr(ser, 'write'):
raise
logger.debug(f"TX {data!r}")
time.sleep(char_delay)
# A short wait to allow device to respond
time.sleep(0.2)
# Poll for available inbound data and log a few reads
try:
if hasattr(ser, 'in_waiting'):
avail = getattr(ser, 'in_waiting')
# if callable property
if callable(avail):
avail = avail()
else:
avail = 0
except Exception:
avail = 0
if avail:
try:
r = ser.read(avail)
logger.debug(f"RX {r!r}")
except Exception:
logger.exception('Error while reading from serial')
# Close the connection
try:
ser.close()
logger.info('Serial port closed')
except Exception:
logger.exception('Error closing serial')
except Exception as exc:
logger.exception(f"Error in send_serial_sequence: {exc}")
# Ensure connection closed on error
try:
if ser and hasattr(ser, 'close'):
ser.close()
except Exception:
pass
raise
finally:
# Remove handler to avoid duplicate logs on repeated calls
for h in list(logger.handlers):
logger.removeHandler(h)
h.close()
if __name__ == '__main__':
# Allow quick local test (will use MOCK: by default if module present)
print('serial_aux: test run')
try:
send_serial_sequence()
print('Done')
except Exception as e:
print('Error:', e)