from serial import * import logging import threading import time ESCAPE_CHAR = 13 class RxFromSerial(threading.Thread): def __init__(self, serial_obj=None, thread_name='Terminal2Serial'): super().__init__(name=thread_name) self.terminal = serial_obj def run(self): if self.terminal.ready: logging.info('Terminal connected. Rx thread started') while self.terminal.ready: try: if self.terminal.mode == 'EXPECT_LF': # blocking until a linefeed is discovered or timeout exceeds read_bytes = self.terminal.read_until(b'\r') else: read_bytes = self.terminal.read() #print(read_bytes[-1]) if read_bytes == b'' or read_bytes[-1] != ESCAPE_CHAR: #logging.info('Terminal timeout. Flush bytes') pass if read_bytes != b'' and read_bytes[-1] == ESCAPE_CHAR: self.terminal.read_action(read_bytes, len(read_bytes)) except Exception as e: logging.error(e) break logging.info('Terminal disconnected. Rx thread stopped') else: logging.warning('Terminal connect failed. Rx thread not started') class Terminal2Serial(Serial): def __init__(self, port='COM1', baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=0.5, xonxoff=False, rtscts=False, write_timeout=None, dsrdtr=False, inter_byte_timeout=None, exclusive=None, mode='EXPECT_LF'): try: super().__init__(port, baudrate, bytesize, parity, stopbits, timeout, xonxoff, rtscts, write_timeout, dsrdtr, inter_byte_timeout, exclusive) except ValueError as e: logging.error(e) except SerialException as e: logging.error(e) self.ready = True self.mode = mode self.rxTask = RxFromSerial(serial_obj=self) def __del__(self): try: self.close() super().__del__() except ValueError as e: logging.error(e) except SerialException as e: logging.error(e) def _check_status(self): try: if not self.ready: self.open() self.ready = True except FileNotFoundError as e: logging.warning(e) except SerialException as e: logging.warning(e) self.ready = False return self.ready def write2terminal(self, data, **kvargs): timeout = kvargs.get('timeout', 0.1) if self._check_status() and data: data = data + '\n' try: ret = self.write(data.encode('utf_8')) if timeout: time.sleep(timeout) return ret except Exception as e: if timeout: time.sleep(timeout) logging.warning(e) return -1 else: if timeout: time.sleep(timeout) logging.warning('Terminal is disabled or no data provided') return -1 def disconnect(self): self.ready = False try: self.rxTask.join(self.timeout+1.0) if self.rxTask.is_alive(): logging.error('Terminal Rx thread is still alive !!!') except RuntimeError as e: logging.error(e) pass try: self.close() except SerialException as e: logging.error(e) def connect(self): if self._check_status(): self.rxTask.start() else: logging.error('Cannot connect Terminal') # define the action to performed after read data, actually simply print to console def read_action(self, data, length): if length > 0: if self.mode == 'EXPECT_LF' and data: dummy = (data.decode('utf_8','ignore')).rstrip() print(dummy) else: print(data.decode('utf_8')) if __name__ == '__main__': theTerminal = Terminal2Serial() theTerminal.connect() theTerminal.write2terminal('ls') time.sleep(300) theTerminal.disconnect()