""" Esempio eseguibile: comandi A1/A2 e letture B6/B7 usando BusMonitorCore. Usare per testare l'integrazione: avviare il file come script Python. """ import time from pathlib import Path from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore def main(): bm = BusMonitorCore() # Config (opzionale) - lasciare vuoto per usare le env config = { 'ip': '127.0.0.1', 'send_port': 5001, 'recv_port': 5002, } print('Inizializzo BusMonitor...') if not bm.initialize(config): print('Errore: impossibile inizializzare:', getattr(bm.manager, 'import_error', None)) return # Callback di esempio per A2, B6, B7 def cb_a2(msg): print('[CB] A2 ricevuto:', type(msg), getattr(msg, 'message', None)) def cb_b6(msg): # Show available attributes for quick debugging print('[CB] B6 ricevuto: fields =', sorted([n for n in dir(msg.message) if not n.startswith('_')])) settings_tb = getattr(msg.message, 'settings_tellback', None) if settings_tb is not None: print(' settings_tellback (raw if present):', getattr(settings_tb, 'raw', settings_tb)) def cb_b7(msg): print('[CB] B7 ricevuto: fields =', sorted([n for n in dir(msg.message) if not n.startswith('_')])) rdr_tb = getattr(msg.message, 'rdr_mode_tellback', None) print(' rdr_mode_tellback =', rdr_tb) bm.register_callback('msg_a2', cb_a2) bm.register_callback('msg_b6', cb_b6) bm.register_callback('msg_b7', cb_b7) # Avvia sessione print('Avvio sessione...') bm.start_session() # Esempio: impostare A1 (settings) print('Aggiorno A1.settings...') msg_a1 = bm.get_message('A1') if msg_a1 is None: print('Wrapper A1 non trovato') else: try: if hasattr(msg_a1.message.settings, 'set_history_level'): msg_a1.message.settings.set_history_level(4) else: setattr(msg_a1.message.settings, 'raw', getattr(msg_a1.message.settings, 'raw', 0) | 0x4) if hasattr(msg_a1, 'send'): msg_a1.send() print('A1 aggiornato') except Exception as e: print('Errore aggiornando A1:', e) # Esempio: impostare A2 (rdr_mode_command) print('Aggiorno A2.rdr_mode_command...') msg_a2 = bm.get_message('A2') if msg_a2 is None: print('Wrapper A2 non trovato') else: try: if hasattr(msg_a2.message.rdr_mode_command, 'set_master_mode'): msg_a2.message.rdr_mode_command.set_master_mode(1) else: setattr(msg_a2.message.rdr_mode_command, 'raw', 1) if hasattr(msg_a2, 'send'): msg_a2.send() print('A2 aggiornato') except Exception as e: print('Errore aggiornando A2:', e) # Registrazione opzionale rec_path = Path('test_recordings/examples_session.json') rec_path.parent.mkdir(exist_ok=True) bm.start_recording(filepath=rec_path) # Attendi ricezioni e stampa status periodico for i in range(6): st = bm.get_status() print(f'status (iter {i}): is_running={st.get("is_running")}, connected={st.get("connected")}, messages_count={st.get("messages_count")}, recorded_events={st.get("recorded_events")}') time.sleep(1) # Stop recording and session saved = bm.stop_recording(save=True) print('Recording saved to', saved) bm.stop_session() print('Sessione fermata') if __name__ == '__main__': main()