PlatSim_Genova/TestEnvironment/env/leo_terminal_serial.py
2026-01-30 16:38:33 +01:00

130 lines
4.4 KiB
Python

from serial import *
import logging
import threading
import time
ESCAPE_CHAR = 13
class RxFromSerial(threading.Thread):
def __init__(self, serial_obj=None, thread_name='Terminal2Serial'):
super().__init__(name=thread_name)
self.terminal = serial_obj
def run(self):
if self.terminal.ready:
logging.info('Terminal connected. Rx thread started')
while self.terminal.ready:
try:
if self.terminal.mode == 'EXPECT_LF':
# blocking until a linefeed is discovered or timeout exceeds
read_bytes = self.terminal.read_until(b'\r')
else:
read_bytes = self.terminal.read()
#print(read_bytes[-1])
if read_bytes == b'' or read_bytes[-1] != ESCAPE_CHAR:
#logging.info('Terminal timeout. Flush bytes')
pass
if read_bytes != b'' and read_bytes[-1] == ESCAPE_CHAR:
self.terminal.read_action(read_bytes, len(read_bytes))
except Exception as e:
logging.error(e)
break
logging.info('Terminal disconnected. Rx thread stopped')
else:
logging.warning('Terminal connect failed. Rx thread not started')
class Terminal2Serial(Serial):
def __init__(self, port='COM1', baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE,
timeout=0.5, xonxoff=False, rtscts=False, write_timeout=None, dsrdtr=False, inter_byte_timeout=None,
exclusive=None, mode='EXPECT_LF'):
try:
super().__init__(port, baudrate, bytesize, parity, stopbits, timeout, xonxoff, rtscts, write_timeout, dsrdtr,
inter_byte_timeout, exclusive)
except ValueError as e:
logging.error(e)
except SerialException as e:
logging.error(e)
self.ready = True
self.mode = mode
self.rxTask = RxFromSerial(serial_obj=self)
def __del__(self):
try:
self.close()
super().__del__()
except ValueError as e:
logging.error(e)
except SerialException as e:
logging.error(e)
def _check_status(self):
try:
if not self.ready:
self.open()
self.ready = True
except FileNotFoundError as e:
logging.warning(e)
except SerialException as e:
logging.warning(e)
self.ready = False
return self.ready
def write2terminal(self, data, **kvargs):
timeout = kvargs.get('timeout', 0.1)
if self._check_status() and data:
data = data + '\n'
try:
ret = self.write(data.encode('utf_8'))
if timeout:
time.sleep(timeout)
return ret
except Exception as e:
if timeout:
time.sleep(timeout)
logging.warning(e)
return -1
else:
if timeout:
time.sleep(timeout)
logging.warning('Terminal is disabled or no data provided')
return -1
def disconnect(self):
self.ready = False
try:
self.rxTask.join(self.timeout+1.0)
if self.rxTask.is_alive():
logging.error('Terminal Rx thread is still alive !!!')
except RuntimeError as e:
logging.error(e)
pass
try:
self.close()
except SerialException as e:
logging.error(e)
def connect(self):
if self._check_status():
self.rxTask.start()
else:
logging.error('Cannot connect Terminal')
# define the action to performed after read data, actually simply print to console
def read_action(self, data, length):
if length > 0:
if self.mode == 'EXPECT_LF' and data:
dummy = (data.decode('utf_8','ignore')).rstrip()
print(dummy)
else:
print(data.decode('utf_8'))
if __name__ == '__main__':
theTerminal = Terminal2Serial()
theTerminal.connect()
theTerminal.write2terminal('ls')
time.sleep(300)
theTerminal.disconnect()