11 KiB
Integrazione di pybusmonitor1553 con ARTOS / uso come modulo
Questo documento descrive come caricare, configurare e utilizzare il modulo pybusmonitor1553 all'interno di un'applicazione esterna (es. ARTOS Collector). Fornisce istruzioni chiare per l'inizializzazione, il controllo, la ricezione/invio di messaggi e le pratiche consigliate per l'integrazione.
Scopo
- Consentire a un'applicazione esterna di usare
pybusmonitor1553come modulo aggiuntivo. - Fornire esempi pratici per inizializzazione, registrazione callback, invio comandi, registrazione/replay e diagnostica.
Componenti chiave
BusMonitorCore— controller principale e interfaccia di alto livello per integrazione.ConnectionManager— gestisce import lazy e loop di invio/ricezione.- Libreria dei messaggi:
Grifo_E_1553lib(inclusa nel package). - Esempio di riferimento:
examples/artos_integration_example.py.
Requisiti
- Python compatibile con il repository.
- Le dipendenze del repository devono essere installate (usare l'ambiente del progetto).
- Accesso di rete UDP alle porte usate per invio/ricezione.
- Variabili d'ambiente utili:
PYBM_RX_IP(bind, default127.0.0.1)PYBM_RX_PORT(porta ricezione, default5002)PYBM_TX_PORT(porta invio, default5001)
Installazione e avvio rapido
Eseguire il monitor standalone (GUI):
python -m pybusmonitor1553
Per usare il modulo da un'altra applicazione, importare e istanziare BusMonitorCore (vedi esempi sotto).
Pattern di integrazione consigliato
Sequenza tipica quando si usa pybusmonitor1553 come modulo:
- Creare istanza:
bm = BusMonitorCore() - Inizializzare:
bm.initialize(config)(config opzionale, altrimenti usa env) - Registrare callback:
bm.register_callback(label, callback) - Avviare sessione:
bm.start_session() - Fermare sessione:
bm.stop_session() - Usare:
get_status(),get_message(label),wait_for_message(label, timeout)per interrogazioni e sincronizzazioni
API principali (sintesi)
BusMonitorCore()— crea l'istanza del modulo.initialize(config: Dict) -> bool— importa la libreria dei messaggi e prepara i wrapper.configpuò contenere'ip','send_port','recv_port'.
start_session()/stop_session()— avvia/ferma i loop di invio/ricezione.register_callback(label: str, callback: Callable)— callback chiamata al ricevimento di messaggi (argomento: oggetto messaggio).get_status() -> Dict— restituisce stato corrente:is_running,connected,messages_count,recording,recorded_events,errors.get_message(label: str) -> Optional[Any]— ottiene il wrapper del messaggio.get_all_messages() -> Dict[str, Any]— lista wrapper disponibili.wait_for_message(label: str, timeout: float) -> bool— blocca fino al messaggio o timeout.start_recording(filepath: Optional[Path]),stop_recording(save: bool=True),load_recording(filepath: Path)— registrazione e replay.
Note importanti sulle callback
- I callback vengono invocati dal thread di ricezione: devono essere non-bloccanti. Per elaborazioni pesanti, spostare il lavoro su un worker thread o una coda.
Come inviare comandi / modificare messaggi
- Recuperare il wrapper:
msg = bm.get_message("A2")
- Modificare campi usando l'API dei field (varia per messaggio):
try:
msg.message.rdr_mode_command.set_master_mode(1)
except Exception:
msg.message.rdr_mode_command.raw = 1
# Invio esplicito se supportato
if hasattr(msg, 'send'):
msg.send()
- Se il wrapper non espone
.send(), assicurarsi che ilMajorFrame/send-loop sia attivo constart_session().
Esempio di integrazione (snippet)
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore
bm = BusMonitorCore()
config = {'ip': '127.0.0.1', 'send_port': 5001, 'recv_port': 5002}
if not bm.initialize(config):
raise RuntimeError("Impossibile inizializzare BusMonitor1553")
def on_a2(msg):
print("A2 ricevuto:", msg)
bm.register_callback("msg_a2", on_a2)
bm.start_recording()
bm.start_session()
if bm.wait_for_message("msg_a2", timeout=5.0):
print("A2 arrivato")
print(bm.get_status())
bm.stop_recording(save=True)
bm.stop_session()
Per un esempio completo e commentato vedere: examples/artos_integration_example.py.
Best practices
- Non bloccare i callback: usare code/worker.
- Verificare
initialize()emanager.import_errorper diagnosi di import. - Usare
wait_for_messageper sincronizzare test automatizzati. - Usare
start_recording/stop_recordingper acquisire eventi di test e riprodurli. - Per modifiche ai messaggi, consultare le definizioni in
Grifo_E_1553lib.messages.
Logging e diagnostica
- Abilitare logging di debug:
import logging
logging.basicConfig(level=logging.DEBUG)
- Controllare
bm.get_status()['errors']ebm.manager.import_errorper problemi di import/bind.
Testing e validazione
- Usare
examples/artos_integration_example.pycome caso di test. - Controlli chiave:
initialize()ritornaTrue.start_session()attivais_runningeconnected.wait_for_message()e le callback vengono eseguite entro i timeout previsti.
Troubleshooting rapido
- ImportError libreria messaggi: controllare
bm.manager.import_error. - Nessun messaggio ricevuto: verificare bind IP/porte e che
init_recv_sock()non fallisca. - Callback non chiamata: verificare label usata (es.
"msg_a2"vs"A2") conget_all_messages(). - Invio non funzionante: assicurarsi che il send-loop sia attivo (
start_session()) e che il wrapper supporti.send().
Riferimenti nel repository
- Codice core: pybusmonitor1553/core/bus_monitor_core.py
- Gestione connessioni: pybusmonitor1553/core/connection_manager.py
- Esempio ARTOS: examples/artos_integration_example.py
- Entrypoint GUI: pybusmonitor1553/main.py
Se vuoi, posso aggiungere al documento esempi specifici per l'invio di A2/A1 con i nomi dei campi letti dalle definizioni nel pacchetto Grifo_E_1553lib.
Esempi dettagliati: A1/A2 (comandi) e B6/B7 (stato)
Qui sotto trovi esempi pratici completi che mostrano come impostare comandi su A1/A2 e come leggere stati/tellback su B6/B7. Gli snippet assumono che bm sia un'istanza inizializzata di BusMonitorCore e che la sessione sia avviata (start_session()).
1) Esempio: inviare comando A1 (impostazioni / parametri)
Questo esempio mostra come ottenere il wrapper A1, modificare un campo e inviare il messaggio (se il wrapper espone .send()). Si forniscono fallback per l'accesso ai campi.
from pathlib import Path
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore
bm = BusMonitorCore()
bm.initialize({'ip': '127.0.0.1'})
bm.start_session()
# Recupera wrapper A1
msg_a1 = bm.get_message('A1')
if msg_a1 is None:
raise RuntimeError('Wrapper A1 non trovato')
# Esempio: impostare un parametro nelle impostazioni operative (campo `settings`)
try:
# API field-specific: usare i metodi esposti da RDROperationalSettings se disponibili
# Qui usiamo il campo `settings` presente in MsgRdrSettingsAndParameters
if hasattr(msg_a1.message.settings, 'set_history_level'):
msg_a1.message.settings.set_history_level(4)
else:
msg_a1.message.settings.raw = 4
except Exception:
raise
# Invio esplicito se disponibile
if hasattr(msg_a1, 'send'):
msg_a1.send()
else:
# Se non c'è send, MajorFrame invierà A1 al prossimo ciclo se configurato
print('Invio A1: il wrapper non espone .send(); MajorFrame invierà il valore al prossimo frame')
print('A1 inviato / aggiornato')
bm.stop_session()
2) Esempio: inviare comando A2 (modalità / comandi radar)
L'esempio mostra come cambiare la modalità principale (campo rdr_mode_command) in A2.
bm = BusMonitorCore()
bm.initialize({})
bm.start_session()
msg_a2 = bm.get_message('A2')
if msg_a2 is None:
raise RuntimeError('Wrapper A2 non trovato')
# Impostare modalità radar (campo `rdr_mode_command` presente in MsgRdrOperationCommand)
try:
if hasattr(msg_a2.message.rdr_mode_command, 'set_master_mode'):
msg_a2.message.rdr_mode_command.set_master_mode(1) # valore ICD di esempio
else:
msg_a2.message.rdr_mode_command.raw = 1
except Exception:
raise
if hasattr(msg_a2, 'send'):
msg_a2.send()
else:
print('A2 aggiornato (MajorFrame invierà il dato)')
bm.stop_session()
3) Esempio: leggere stato B6 (RdrSettingsAndParametersTellback)
B6 viene usato come tellback per ottenere impostazioni e parametri dal radar. Questo snippet mostra come registrare una callback per lettura asincrona e come leggere il valore al volo.
bm = BusMonitorCore()
bm.initialize({})
bm.start_session()
def on_b6(msg):
try:
# I campi nel tellback usano suffissi _tellback (es. settings_tellback)
settings_tb = getattr(msg.message, 'settings_tellback', None)
freq_tb = getattr(msg.message, 'frequency_tellback', None)
print('B6 ricevuto: settings_tellback=', settings_tb, ' frequency_tellback=', freq_tb)
except Exception as e:
print('Errore nella callback B6:', e)
bm.register_callback('msg_b6', on_b6)
# Alternativa: lettura sincrona via get_message + accesso a campi tellback
msg_b6 = bm.get_message('B6')
if msg_b6:
try:
print('B6 - settings_tellback.raw:', getattr(msg_b6.message, 'settings_tellback').raw)
except Exception:
pass
import time
time.sleep(2)
bm.stop_session()
4) Esempio: leggere stato B7 (RdrStatusTellback)
B7 contiene lo stato operativo; l'esempio mostra lettura e callback.
bm = BusMonitorCore()
bm.initialize({})
bm.start_session()
def on_b7(msg):
try:
# Il tellback per lo stato usa il campo `rdr_mode_tellback` e param*_tellback
rdr_status = getattr(msg.message, 'rdr_mode_tellback', None)
print('B7 ricevuto: rdr_mode_tellback=', rdr_status)
except Exception as e:
print('Errore nella callback B7:', e)
bm.register_callback('msg_b7', on_b7)
# Lettura diretta
msg_b7 = bm.get_message('B7')
if msg_b7:
try:
print('B7 - rdr_mode_tellback:', getattr(msg_b7.message, 'rdr_mode_tellback'))
except Exception:
pass
time.sleep(2)
bm.stop_session()
Note sui nomi dei campi
- I nomi dei campi usati negli esempi (
settings,rdr_mode_command,status, ecc.) sono quelli comunemente esposti dai wrappers interni ma possono variare a seconda della versione delle classi inGrifo_E_1553lib.messages. - Per conoscere i nomi esatti, usare
print(sorted(dir(msg.message)))sul wrapper ottenuto daget_message()oppure consultare i file inpybusmonitor1553/Grifo_E_1553lib/messages/.
Raccomandazioni finali
- Testare gli snippet in un ambiente di laboratorio prima di usarli in produzione.
- Proteggere i percorsi critici (callback, invio) con try/except e logging.