""" serial_aux.py Utility script that provides `send_serial_sequence()` to open a (real or mock) serial port, send a sequence of characters with a per-character delay, log all traffic to a separate log file and close the port. Configuration section below makes it easy for an operator to change params. Usage from test script: from serial_aux import send_serial_sequence, CONF send_serial_sequence() In simulation mode set the `PORT` to a value starting with `MOCK:` (for example `MOCK:aux1`) to route the traffic to the mock provided by `GRIFO_M_PBIT_mock.create_mock_terminal()`. """ from __future__ import annotations import time import os import logging from datetime import datetime from typing import Optional # Operator-editable configuration (top of file) # Default PORT will be overridden to a mock port when running in simulation # (i.e. when `GRIFO_M_PBIT_mock` is importable). The operator can still set # this to a real COM port (e.g. "COM5") when running against real hardware. PORT = "COM5" # Serial port (or use "MOCK:aux1" for simulation) BAUDRATE = 115200 BYTESIZE = 8 PARITY = 'N' STOPBITS = 1 # Delay between single characters (seconds) CHAR_DELAY_SEC = 0.05 # Default sequence: CTRL+X, three spaces, ENTER (CR) SEQUENCE = b"\x18 \r" LOG_FOLDER = os.path.join(os.path.dirname(__file__), '..', 'LOG') LOG_FILENAME_TEMPLATE = 'serial_aux_{port}_{ts}.log' def _ensure_log_folder(path: str) -> None: if not os.path.exists(path): os.makedirs(path, exist_ok=True) def send_serial_sequence(port: Optional[str] = None, baudrate: Optional[int] = None, char_delay: Optional[float] = None, sequence: Optional[bytes] = None, log_folder: Optional[str] = None) -> None: """ Open serial port, send `sequence` one byte at a time with `char_delay` seconds between bytes, log all sent/received traffic to a file, then close. If the provided port string starts with "MOCK:", the function will try to register and use the mock adapter provided by `GRIFO_M_PBIT_mock` (simulation). """ import importlib port = port or PORT baudrate = baudrate or BAUDRATE char_delay = char_delay if char_delay is not None else CHAR_DELAY_SEC sequence = sequence or SEQUENCE log_folder = log_folder or LOG_FOLDER _ensure_log_folder(log_folder) ts = datetime.now().strftime('%Y%m%d_%H%M%S') safe_port = port.replace(':', '_').replace('/', '_') log_path = os.path.join(log_folder, LOG_FILENAME_TEMPLATE.format(port=safe_port, ts=ts)) # Prepare logger for this session logger = logging.getLogger(f"serial_aux.{safe_port}.{ts}") logger.setLevel(logging.DEBUG) fh = logging.FileHandler(log_path, encoding='utf-8') fh.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) logger.info(f"Opening serial port: {port} @ {baudrate}bps") # If using mock port (simulation), ask the mock module to register an adapter ser = None try: # If user requested a MOCK: port explicitly, use that. Otherwise try # to detect simulation environment: if GRIFO_M_PBIT_mock is importable # we prefer a mock adapter so the helper works during --simulate runs. mock_available = False try: import importlib mock_mod = importlib.import_module('GRIFO_M_PBIT_mock') mock_available = True except Exception: mock_mod = None use_mock = False if isinstance(port, str) and port.upper().startswith('MOCK:'): use_mock = True elif mock_available and (port is None or port == PORT or not str(port).upper().startswith('COM')): # When running simulation prefer mock if default port left unchanged use_mock = True if use_mock: if mock_mod is None: raise ImportError('Mock module not available to provide mock serial port') register = getattr(mock_mod, 'register_mock_serial_port', None) if register is None: raise ImportError('Mock module does not provide register_mock_serial_port') ser = register(port or ('MOCK:aux1'), log_path=log_path) logger.info(f"Using mock serial adapter for {port or 'MOCK:aux1'}") else: # Try using pyserial for real hardware try: import serial except Exception: logger.exception('pyserial not available; cannot open real serial port') raise try: ser = serial.Serial(port=port, baudrate=baudrate, bytesize=BYTESIZE, parity=PARITY, stopbits=STOPBITS, timeout=0.2) logger.info(f"Opened real serial port {port}") except Exception as e: logger.warning(f"Could not open real port {port}: {e}") # Fallback to mock if available if mock_available and mock_mod is not None: logger.info('Falling back to mock serial adapter') register = getattr(mock_mod, 'register_mock_serial_port') ser = register('MOCK:aux1', log_path=log_path) else: raise # Start reader loop in-line: poll for inbound data and log it while sending start_time = time.time() logger.info(f"Sending sequence ({len(sequence)} bytes) with {char_delay}s char delay") # Send bytes one by one for b in sequence: data = bytes([b]) try: ser.write(data) except Exception: # Some mock adapters may expose write() under a different name if hasattr(ser, 'write'): raise logger.debug(f"TX {data!r}") time.sleep(char_delay) # A short wait to allow device to respond time.sleep(0.2) # Poll for available inbound data and log a few reads try: if hasattr(ser, 'in_waiting'): avail = getattr(ser, 'in_waiting') # if callable property if callable(avail): avail = avail() else: avail = 0 except Exception: avail = 0 if avail: try: r = ser.read(avail) logger.debug(f"RX {r!r}") except Exception: logger.exception('Error while reading from serial') # Close the connection try: ser.close() logger.info('Serial port closed') except Exception: logger.exception('Error closing serial') except Exception as exc: logger.exception(f"Error in send_serial_sequence: {exc}") # Ensure connection closed on error try: if ser and hasattr(ser, 'close'): ser.close() except Exception: pass raise finally: # Remove handler to avoid duplicate logs on repeated calls for h in list(logger.handlers): logger.removeHandler(h) h.close() if __name__ == '__main__': # Allow quick local test (will use MOCK: by default if module present) print('serial_aux: test run') try: send_serial_sequence() print('Done') except Exception as e: print('Error:', e)