9.5 KiB
BusMonitor1553 - ARTOS Integration Guide
Panoramica
Questo documento descrive come il progetto PyBusMonitor1553 è strutturato per operare sia come applicazione standalone sia come modulo ARTOS.
Strategia Dual-Mode
┌─────────────────────────────────────────────────────────────┐
│ PyBusMonitor1553 Project │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ BusMonitorCore (BaseModule) │ │
│ │ - Implements ARTOS interface │ │
│ │ - Protocol-agnostic business logic │ │
│ │ - Recording & Replay support │ │
│ └─────────────┬────────────────────────────────────────┘ │
│ │ │
│ ┌────────┴─────────┐ │
│ │ │ │
│ ┌────▼─────┐ ┌─────▼──────┐ │
│ │ GUI │ │ ARTOS │ │
│ │ Tkinter │ │ Collector │ │
│ │ (Local) │ │ (Remote) │ │
│ └──────────┘ └────────────┘ │
│ │
│ Standalone Mode Plugin Mode │
└──────────────────────────────────────────────────────────────┘
Architettura dei Moduli
Core Components
1. base_module.py - ARTOS Interface
Definisce il contratto che tutti i moduli ARTOS devono rispettare:
class BaseModule(abc.ABC):
@abc.abstractmethod
def initialize(self, config: Dict[str, Any]) -> bool
@abc.abstractmethod
def start_session(self) -> None
@abc.abstractmethod
def stop_session(self) -> None
@abc.abstractmethod
def get_status(self) -> Dict[str, Any]
2. bus_monitor_core.py - Business Logic
Implementa BaseModule e fornisce:
- Gestione connessioni UDP 1553
- Sistema di notifiche via callback
- Recording & Replay dei messaggi
- Timestamp normalization per sincronizzazione multi-modulo
3. data_recorder.py - Recording System
Gestisce la cattura e riproduzione di sequenze di test:
- Salvataggio su file JSON
- Filtraggio eventi per tipo/label
- Time-window queries per correlazione temporale
Workflow di Utilizzo
Modalità Standalone (Sviluppo/Debug)
# Run with GUI
python -m pybusmonitor1553
# Run in CLI mode
python -m pybusmonitor1553 --cli
La GUI usa internamente BusMonitorCore ma con interfaccia Tkinter locale.
Modalità ARTOS (Integrazione)
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore
# ARTOS Collector instantiates the module
bus_monitor = BusMonitorCore()
# Configure network parameters
config = {
'ip': '192.168.1.100',
'send_port': 5001,
'recv_port': 5002
}
# Initialize and start
if bus_monitor.initialize(config):
bus_monitor.start_session()
# Register callbacks for test validation
bus_monitor.register_callback("msg_a2", validate_radar_mode)
# Wait for expected messages
if bus_monitor.wait_for_message("msg_b1", timeout=5.0):
print("Target detected!")
Vedi examples/artos_integration_example.py per un esempio completo.
Time Normalization (Sincronizzazione Multi-Modulo)
Uno dei requisiti chiave di ARTOS è la capacità di correlare eventi provenienti da sorgenti diverse (Bus 1553, Video SFP, Target Simulator).
Strategia Implementata
- System Receipt Timestamp: Ogni evento viene marcato con
datetime.now().timestamp()al momento della ricezione - Hardware Timestamp: Se disponibile, viene preservato il timestamp hardware del bus 1553
- Recording Format: Entrambi i timestamp vengono salvati nel file di recording
{
"session_start_time": 1703257821.456789,
"events": [
{
"timestamp": 1703257822.123456, // System time
"hw_timestamp": 0.123456, // Hardware time (optional)
"event_type": "msg_received",
"label": "msg_a2",
"data": { ... }
}
]
}
Come ARTOS Collector Usa i Timestamp
Il Collector può:
- Ordinare eventi globalmente usando
timestamp - Calcolare latenze confrontando timestamp tra moduli
- Rilevare anomalie temporali (es. target iniettato ma non ricevuto entro Nms)
Esempio di correlazione:
# Get all events in a 100ms window
window_events = bus_monitor.recorder.get_time_window(
start_time=target_injection_time,
end_time=target_injection_time + 0.1 # 100ms window
)
# Check if radar responded within window
radar_responses = [e for e in window_events if e.label == "msg_b1"]
if radar_responses:
latency = radar_responses[0].timestamp - target_injection_time
print(f"Radar response latency: {latency*1000:.2f}ms")
Recording & Replay
Recording Workflow
from pathlib import Path
# Start recording
bus_monitor.start_recording(filepath=Path("recordings/test_001.json"))
bus_monitor.start_session()
# ... test execution ...
# Stop and save
bus_monitor.stop_recording(save=True)
bus_monitor.stop_session()
Replay Workflow
# Load baseline recording
bus_monitor.load_recording(Path("recordings/baseline.json"))
# Compare with new live data
for event in bus_monitor.recorder.events:
if event.label == "msg_a2":
# Validate current msg_a2 matches recorded baseline
current_msg = bus_monitor.get_message("msg_a2")
if not validate_match(current_msg, event.data):
print("REGRESSION DETECTED!")
Callback System (Test Hooks)
Il sistema di callback permette al Collector di reagire in tempo reale agli eventi del bus:
def validate_radar_mode_change(message):
"""Called automatically when msg_a2 arrives"""
expected_mode = test_scenario.expected_radar_mode
actual_mode = message.radar_mode
if actual_mode != expected_mode:
test_result.add_failure(
f"Mode mismatch: expected {expected_mode}, got {actual_mode}"
)
bus_monitor.register_callback("msg_a2", validate_radar_mode_change)
Status Monitoring
Il metodo get_status() fornisce al Collector informazioni in tempo reale:
status = bus_monitor.get_status()
# Example output:
{
"is_running": True,
"connected": True,
"recording": True,
"recorded_events": 1247,
"errors": [] # or list of error strings
}
Il Collector può usare queste informazioni per:
- Verificare la salute del modulo durante l'esecuzione
- Diagnosticare problemi di connettività
- Decidere se interrompere un test fallito
Packaging e Distribuzione
Per Sviluppo Standalone
Il progetto attuale rimane pienamente funzionale come applicazione standalone:
pip install -e .
python -m pybusmonitor1553
Per Integrazione ARTOS
Il Collector può importare il modulo come libreria:
# In ARTOS Collector project
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore
class ARTOSCollector:
def __init__(self):
self.bus_monitor = BusMonitorCore()
self.video_receiver = VideoReceiverCore()
self.target_sim = TargetSimulatorCore()
Nessuna modifica necessaria al codice di BusMonitor - funziona in entrambi i contesti!
Vantaggi di Questa Architettura
✓ Separazione dei Concern
- Core: Logica di business indipendente dall'interfaccia
- GUI: Layer di presentazione opzionale
- ARTOS: Interfaccia standardizzata per l'orchestratore
✓ Testabilità
- Sviluppo standalone permette test approfonditi
- Recording/Replay fornisce test deterministici
- Callbacks permettono validazioni automatiche
✓ Riusabilità
- Stesso core usato da GUI e ARTOS
- Nessuna duplicazione di codice
- Facile manutenzione
✓ Flessibilità
- Modifica dei parametri via
configdict - Callback system estensibile
- Recording format aperto (JSON)
Roadmap Futura
Short-term (Integrazione MessageDB)
- Collegare
MessageDBal core - Implementare
get_message()completo - Serializzazione corretta dei messaggi 1553
Mid-term (Funzionalità ARTOS)
- Replay engine automatico
- Diff tool per confrontare recordings
- Metriche di performance (latency histograms, etc.)
Long-term (Multi-module Sync)
- Cross-module timestamp correlation
- Unified event timeline visualization
- Automated anomaly detection
Riferimenti
- ARTOS Design Doc:
doc/ARTOS_design.md - Example Integration:
examples/artos_integration_example.py - Message Protocol:
doc/ICD-Implementation-Notes.md
Per domande o supporto, vedere la documentazione nel folder doc/.