204 lines
7.4 KiB
Python
204 lines
7.4 KiB
Python
"""
|
|
serial_aux.py
|
|
|
|
Utility script that provides `send_serial_sequence()` to open a (real or
|
|
mock) serial port, send a sequence of characters with a per-character delay,
|
|
log all traffic to a separate log file and close the port.
|
|
|
|
Configuration section below makes it easy for an operator to change params.
|
|
|
|
Usage from test script:
|
|
from serial_aux import send_serial_sequence, CONF
|
|
send_serial_sequence()
|
|
|
|
In simulation mode set the `PORT` to a value starting with `MOCK:` (for example
|
|
`MOCK:aux1`) to route the traffic to the mock provided by
|
|
`GRIFO_M_PBIT_mock.create_mock_terminal()`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
# Operator-editable configuration (top of file)
|
|
# Default PORT will be overridden to a mock port when running in simulation
|
|
# (i.e. when `GRIFO_M_PBIT_mock` is importable). The operator can still set
|
|
# this to a real COM port (e.g. "COM5") when running against real hardware.
|
|
PORT = "COM5" # Serial port (or use "MOCK:aux1" for simulation)
|
|
BAUDRATE = 115200
|
|
BYTESIZE = 8
|
|
PARITY = 'N'
|
|
STOPBITS = 1
|
|
# Delay between single characters (seconds)
|
|
CHAR_DELAY_SEC = 0.05
|
|
# Default sequence: CTRL+X, three spaces, ENTER (CR)
|
|
SEQUENCE = b"\x18 \r"
|
|
LOG_FOLDER = os.path.join(os.path.dirname(__file__), '..', 'LOG')
|
|
LOG_FILENAME_TEMPLATE = 'serial_aux_{port}_{ts}.log'
|
|
|
|
|
|
def _ensure_log_folder(path: str) -> None:
|
|
if not os.path.exists(path):
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
|
|
def send_serial_sequence(port: Optional[str] = None,
|
|
baudrate: Optional[int] = None,
|
|
char_delay: Optional[float] = None,
|
|
sequence: Optional[bytes] = None,
|
|
log_folder: Optional[str] = None) -> None:
|
|
"""
|
|
Open serial port, send `sequence` one byte at a time with `char_delay`
|
|
seconds between bytes, log all sent/received traffic to a file, then close.
|
|
|
|
If the provided port string starts with "MOCK:", the function will try to
|
|
register and use the mock adapter provided by `GRIFO_M_PBIT_mock` (simulation).
|
|
"""
|
|
import importlib
|
|
|
|
port = port or PORT
|
|
baudrate = baudrate or BAUDRATE
|
|
char_delay = char_delay if char_delay is not None else CHAR_DELAY_SEC
|
|
sequence = sequence or SEQUENCE
|
|
log_folder = log_folder or LOG_FOLDER
|
|
|
|
_ensure_log_folder(log_folder)
|
|
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
safe_port = port.replace(':', '_').replace('/', '_')
|
|
log_path = os.path.join(log_folder, LOG_FILENAME_TEMPLATE.format(port=safe_port, ts=ts))
|
|
|
|
# Prepare logger for this session
|
|
logger = logging.getLogger(f"serial_aux.{safe_port}.{ts}")
|
|
logger.setLevel(logging.DEBUG)
|
|
fh = logging.FileHandler(log_path, encoding='utf-8')
|
|
fh.setLevel(logging.DEBUG)
|
|
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
|
|
fh.setFormatter(formatter)
|
|
logger.addHandler(fh)
|
|
|
|
logger.info(f"Opening serial port: {port} @ {baudrate}bps")
|
|
|
|
# If using mock port (simulation), ask the mock module to register an adapter
|
|
ser = None
|
|
try:
|
|
# If user requested a MOCK: port explicitly, use that. Otherwise try
|
|
# to detect simulation environment: if GRIFO_M_PBIT_mock is importable
|
|
# we prefer a mock adapter so the helper works during --simulate runs.
|
|
mock_available = False
|
|
try:
|
|
import importlib
|
|
mock_mod = importlib.import_module('GRIFO_M_PBIT_mock')
|
|
mock_available = True
|
|
except Exception:
|
|
mock_mod = None
|
|
|
|
use_mock = False
|
|
if isinstance(port, str) and port.upper().startswith('MOCK:'):
|
|
use_mock = True
|
|
elif mock_available and (port is None or port == PORT or not str(port).upper().startswith('COM')):
|
|
# When running simulation prefer mock if default port left unchanged
|
|
use_mock = True
|
|
|
|
if use_mock:
|
|
if mock_mod is None:
|
|
raise ImportError('Mock module not available to provide mock serial port')
|
|
register = getattr(mock_mod, 'register_mock_serial_port', None)
|
|
if register is None:
|
|
raise ImportError('Mock module does not provide register_mock_serial_port')
|
|
ser = register(port or ('MOCK:aux1'), log_path=log_path)
|
|
logger.info(f"Using mock serial adapter for {port or 'MOCK:aux1'}")
|
|
else:
|
|
# Try using pyserial for real hardware
|
|
try:
|
|
import serial
|
|
except Exception:
|
|
logger.exception('pyserial not available; cannot open real serial port')
|
|
raise
|
|
|
|
try:
|
|
ser = serial.Serial(port=port, baudrate=baudrate, bytesize=BYTESIZE,
|
|
parity=PARITY, stopbits=STOPBITS, timeout=0.2)
|
|
logger.info(f"Opened real serial port {port}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not open real port {port}: {e}")
|
|
# Fallback to mock if available
|
|
if mock_available and mock_mod is not None:
|
|
logger.info('Falling back to mock serial adapter')
|
|
register = getattr(mock_mod, 'register_mock_serial_port')
|
|
ser = register('MOCK:aux1', log_path=log_path)
|
|
else:
|
|
raise
|
|
|
|
# Start reader loop in-line: poll for inbound data and log it while sending
|
|
start_time = time.time()
|
|
logger.info(f"Sending sequence ({len(sequence)} bytes) with {char_delay}s char delay")
|
|
|
|
# Send bytes one by one
|
|
for b in sequence:
|
|
data = bytes([b])
|
|
try:
|
|
ser.write(data)
|
|
except Exception:
|
|
# Some mock adapters may expose write() under a different name
|
|
if hasattr(ser, 'write'):
|
|
raise
|
|
logger.debug(f"TX {data!r}")
|
|
time.sleep(char_delay)
|
|
|
|
# A short wait to allow device to respond
|
|
time.sleep(0.2)
|
|
|
|
# Poll for available inbound data and log a few reads
|
|
try:
|
|
if hasattr(ser, 'in_waiting'):
|
|
avail = getattr(ser, 'in_waiting')
|
|
# if callable property
|
|
if callable(avail):
|
|
avail = avail()
|
|
else:
|
|
avail = 0
|
|
except Exception:
|
|
avail = 0
|
|
|
|
if avail:
|
|
try:
|
|
r = ser.read(avail)
|
|
logger.debug(f"RX {r!r}")
|
|
except Exception:
|
|
logger.exception('Error while reading from serial')
|
|
|
|
# Close the connection
|
|
try:
|
|
ser.close()
|
|
logger.info('Serial port closed')
|
|
except Exception:
|
|
logger.exception('Error closing serial')
|
|
|
|
except Exception as exc:
|
|
logger.exception(f"Error in send_serial_sequence: {exc}")
|
|
# Ensure connection closed on error
|
|
try:
|
|
if ser and hasattr(ser, 'close'):
|
|
ser.close()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
finally:
|
|
# Remove handler to avoid duplicate logs on repeated calls
|
|
for h in list(logger.handlers):
|
|
logger.removeHandler(h)
|
|
h.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Allow quick local test (will use MOCK: by default if module present)
|
|
print('serial_aux: test run')
|
|
try:
|
|
send_serial_sequence()
|
|
print('Done')
|
|
except Exception as e:
|
|
print('Error:', e)
|