SXXXXXXX_PyBusMonitor1553/doc/ARTOS-Integration-Guide.md
2025-12-22 10:44:32 +01:00

9.5 KiB

BusMonitor1553 - ARTOS Integration Guide

Panoramica

Questo documento descrive come il progetto PyBusMonitor1553 è strutturato per operare sia come applicazione standalone sia come modulo ARTOS.

Strategia Dual-Mode

┌─────────────────────────────────────────────────────────────┐
│                   PyBusMonitor1553 Project                    │
│                                                               │
│  ┌──────────────────────────────────────────────────────┐   │
│  │              BusMonitorCore (BaseModule)             │   │
│  │  - Implements ARTOS interface                        │   │
│  │  - Protocol-agnostic business logic                  │   │
│  │  - Recording & Replay support                        │   │
│  └─────────────┬────────────────────────────────────────┘   │
│                │                                              │
│       ┌────────┴─────────┐                                   │
│       │                  │                                   │
│  ┌────▼─────┐      ┌─────▼──────┐                          │
│  │   GUI    │      │  ARTOS     │                          │
│  │ Tkinter  │      │ Collector  │                          │
│  │ (Local)  │      │ (Remote)   │                          │
│  └──────────┘      └────────────┘                          │
│                                                              │
│  Standalone Mode     Plugin Mode                            │
└──────────────────────────────────────────────────────────────┘

Architettura dei Moduli

Core Components

1. base_module.py - ARTOS Interface

Definisce il contratto che tutti i moduli ARTOS devono rispettare:

class BaseModule(abc.ABC):
    @abc.abstractmethod
    def initialize(self, config: Dict[str, Any]) -> bool
    
    @abc.abstractmethod
    def start_session(self) -> None
    
    @abc.abstractmethod
    def stop_session(self) -> None
    
    @abc.abstractmethod
    def get_status(self) -> Dict[str, Any]

2. bus_monitor_core.py - Business Logic

Implementa BaseModule e fornisce:

  • Gestione connessioni UDP 1553
  • Sistema di notifiche via callback
  • Recording & Replay dei messaggi
  • Timestamp normalization per sincronizzazione multi-modulo

3. data_recorder.py - Recording System

Gestisce la cattura e riproduzione di sequenze di test:

  • Salvataggio su file JSON
  • Filtraggio eventi per tipo/label
  • Time-window queries per correlazione temporale

Workflow di Utilizzo

Modalità Standalone (Sviluppo/Debug)

# Run with GUI
python -m pybusmonitor1553

# Run in CLI mode
python -m pybusmonitor1553 --cli

La GUI usa internamente BusMonitorCore ma con interfaccia Tkinter locale.

Modalità ARTOS (Integrazione)

from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore

# ARTOS Collector instantiates the module
bus_monitor = BusMonitorCore()

# Configure network parameters
config = {
    'ip': '192.168.1.100',
    'send_port': 5001,
    'recv_port': 5002
}

# Initialize and start
if bus_monitor.initialize(config):
    bus_monitor.start_session()
    
    # Register callbacks for test validation
    bus_monitor.register_callback("msg_a2", validate_radar_mode)
    
    # Wait for expected messages
    if bus_monitor.wait_for_message("msg_b1", timeout=5.0):
        print("Target detected!")

Vedi examples/artos_integration_example.py per un esempio completo.


Time Normalization (Sincronizzazione Multi-Modulo)

Uno dei requisiti chiave di ARTOS è la capacità di correlare eventi provenienti da sorgenti diverse (Bus 1553, Video SFP, Target Simulator).

Strategia Implementata

  1. System Receipt Timestamp: Ogni evento viene marcato con datetime.now().timestamp() al momento della ricezione
  2. Hardware Timestamp: Se disponibile, viene preservato il timestamp hardware del bus 1553
  3. Recording Format: Entrambi i timestamp vengono salvati nel file di recording
{
  "session_start_time": 1703257821.456789,
  "events": [
    {
      "timestamp": 1703257822.123456,      // System time
      "hw_timestamp": 0.123456,            // Hardware time (optional)
      "event_type": "msg_received",
      "label": "msg_a2",
      "data": { ... }
    }
  ]
}

Come ARTOS Collector Usa i Timestamp

Il Collector può:

  • Ordinare eventi globalmente usando timestamp
  • Calcolare latenze confrontando timestamp tra moduli
  • Rilevare anomalie temporali (es. target iniettato ma non ricevuto entro Nms)

Esempio di correlazione:

# Get all events in a 100ms window
window_events = bus_monitor.recorder.get_time_window(
    start_time=target_injection_time,
    end_time=target_injection_time + 0.1  # 100ms window
)

# Check if radar responded within window
radar_responses = [e for e in window_events if e.label == "msg_b1"]
if radar_responses:
    latency = radar_responses[0].timestamp - target_injection_time
    print(f"Radar response latency: {latency*1000:.2f}ms")

Recording & Replay

Recording Workflow

from pathlib import Path

# Start recording
bus_monitor.start_recording(filepath=Path("recordings/test_001.json"))
bus_monitor.start_session()

# ... test execution ...

# Stop and save
bus_monitor.stop_recording(save=True)
bus_monitor.stop_session()

Replay Workflow

# Load baseline recording
bus_monitor.load_recording(Path("recordings/baseline.json"))

# Compare with new live data
for event in bus_monitor.recorder.events:
    if event.label == "msg_a2":
        # Validate current msg_a2 matches recorded baseline
        current_msg = bus_monitor.get_message("msg_a2")
        if not validate_match(current_msg, event.data):
            print("REGRESSION DETECTED!")

Callback System (Test Hooks)

Il sistema di callback permette al Collector di reagire in tempo reale agli eventi del bus:

def validate_radar_mode_change(message):
    """Called automatically when msg_a2 arrives"""
    expected_mode = test_scenario.expected_radar_mode
    actual_mode = message.radar_mode
    
    if actual_mode != expected_mode:
        test_result.add_failure(
            f"Mode mismatch: expected {expected_mode}, got {actual_mode}"
        )

bus_monitor.register_callback("msg_a2", validate_radar_mode_change)

Status Monitoring

Il metodo get_status() fornisce al Collector informazioni in tempo reale:

status = bus_monitor.get_status()

# Example output:
{
    "is_running": True,
    "connected": True,
    "recording": True,
    "recorded_events": 1247,
    "errors": []  # or list of error strings
}

Il Collector può usare queste informazioni per:

  • Verificare la salute del modulo durante l'esecuzione
  • Diagnosticare problemi di connettività
  • Decidere se interrompere un test fallito

Packaging e Distribuzione

Per Sviluppo Standalone

Il progetto attuale rimane pienamente funzionale come applicazione standalone:

pip install -e .
python -m pybusmonitor1553

Per Integrazione ARTOS

Il Collector può importare il modulo come libreria:

# In ARTOS Collector project
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore

class ARTOSCollector:
    def __init__(self):
        self.bus_monitor = BusMonitorCore()
        self.video_receiver = VideoReceiverCore()
        self.target_sim = TargetSimulatorCore()

Nessuna modifica necessaria al codice di BusMonitor - funziona in entrambi i contesti!


Vantaggi di Questa Architettura

✓ Separazione dei Concern

  • Core: Logica di business indipendente dall'interfaccia
  • GUI: Layer di presentazione opzionale
  • ARTOS: Interfaccia standardizzata per l'orchestratore

✓ Testabilità

  • Sviluppo standalone permette test approfonditi
  • Recording/Replay fornisce test deterministici
  • Callbacks permettono validazioni automatiche

✓ Riusabilità

  • Stesso core usato da GUI e ARTOS
  • Nessuna duplicazione di codice
  • Facile manutenzione

✓ Flessibilità

  • Modifica dei parametri via config dict
  • Callback system estensibile
  • Recording format aperto (JSON)

Roadmap Futura

Short-term (Integrazione MessageDB)

  • Collegare MessageDB al core
  • Implementare get_message() completo
  • Serializzazione corretta dei messaggi 1553

Mid-term (Funzionalità ARTOS)

  • Replay engine automatico
  • Diff tool per confrontare recordings
  • Metriche di performance (latency histograms, etc.)

Long-term (Multi-module Sync)

  • Cross-module timestamp correlation
  • Unified event timeline visualization
  • Automated anomaly detection

Riferimenti

  • ARTOS Design Doc: doc/ARTOS_design.md
  • Example Integration: examples/artos_integration_example.py
  • Message Protocol: doc/ICD-Implementation-Notes.md

Per domande o supporto, vedere la documentazione nel folder doc/.