SXXXXXXX_PyBusMonitor1553/examples/artos_integration_commands.py

105 lines
3.5 KiB
Python

"""
Esempio eseguibile: comandi A1/A2 e letture B6/B7 usando BusMonitorCore.
Usare per testare l'integrazione: avviare il file come script Python.
"""
import time
from pathlib import Path
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore
def main():
bm = BusMonitorCore()
# Config (opzionale) - lasciare vuoto per usare le env
config = {
'ip': '127.0.0.1',
'send_port': 5001,
'recv_port': 5002,
}
print('Inizializzo BusMonitor...')
if not bm.initialize(config):
print('Errore: impossibile inizializzare:', getattr(bm.manager, 'import_error', None))
return
# Callback di esempio per A2, B6, B7
def cb_a2(msg):
print('[CB] A2 ricevuto:', type(msg), getattr(msg, 'message', None))
def cb_b6(msg):
# Show available attributes for quick debugging
print('[CB] B6 ricevuto: fields =', sorted([n for n in dir(msg.message) if not n.startswith('_')]))
settings_tb = getattr(msg.message, 'settings_tellback', None)
if settings_tb is not None:
print(' settings_tellback (raw if present):', getattr(settings_tb, 'raw', settings_tb))
def cb_b7(msg):
print('[CB] B7 ricevuto: fields =', sorted([n for n in dir(msg.message) if not n.startswith('_')]))
rdr_tb = getattr(msg.message, 'rdr_mode_tellback', None)
print(' rdr_mode_tellback =', rdr_tb)
bm.register_callback('msg_a2', cb_a2)
bm.register_callback('msg_b6', cb_b6)
bm.register_callback('msg_b7', cb_b7)
# Avvia sessione
print('Avvio sessione...')
bm.start_session()
# Esempio: impostare A1 (settings)
print('Aggiorno A1.settings...')
msg_a1 = bm.get_message('A1')
if msg_a1 is None:
print('Wrapper A1 non trovato')
else:
try:
if hasattr(msg_a1.message.settings, 'set_history_level'):
msg_a1.message.settings.set_history_level(4)
else:
setattr(msg_a1.message.settings, 'raw', getattr(msg_a1.message.settings, 'raw', 0) | 0x4)
if hasattr(msg_a1, 'send'):
msg_a1.send()
print('A1 aggiornato')
except Exception as e:
print('Errore aggiornando A1:', e)
# Esempio: impostare A2 (rdr_mode_command)
print('Aggiorno A2.rdr_mode_command...')
msg_a2 = bm.get_message('A2')
if msg_a2 is None:
print('Wrapper A2 non trovato')
else:
try:
if hasattr(msg_a2.message.rdr_mode_command, 'set_master_mode'):
msg_a2.message.rdr_mode_command.set_master_mode(1)
else:
setattr(msg_a2.message.rdr_mode_command, 'raw', 1)
if hasattr(msg_a2, 'send'):
msg_a2.send()
print('A2 aggiornato')
except Exception as e:
print('Errore aggiornando A2:', e)
# Registrazione opzionale
rec_path = Path('test_recordings/examples_session.json')
rec_path.parent.mkdir(exist_ok=True)
bm.start_recording(filepath=rec_path)
# Attendi ricezioni e stampa status periodico
for i in range(6):
st = bm.get_status()
print(f'status (iter {i}): is_running={st.get("is_running")}, connected={st.get("connected")}, messages_count={st.get("messages_count")}, recorded_events={st.get("recorded_events")}')
time.sleep(1)
# Stop recording and session
saved = bm.stop_recording(save=True)
print('Recording saved to', saved)
bm.stop_session()
print('Sessione fermata')
if __name__ == '__main__':
main()