SXXXXXXX_PyBusMonitor1553/examples/artos_integration_example.py
2025-12-22 10:44:32 +01:00

152 lines
4.7 KiB
Python

"""
Example: Using BusMonitor1553 as an ARTOS Module
This demonstrates how the ARTOS Collector would integrate and use
the BusMonitor1553 module during automated test execution.
"""
import time
from pathlib import Path
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore
def example_artos_collector_workflow():
"""
Simulates the ARTOS Collector orchestrating a test with BusMonitor1553.
This shows:
1. Module initialization
2. Session start/stop
3. Message monitoring with callbacks
4. Recording for deterministic replay
5. Status monitoring
"""
# 1. INITIALIZATION PHASE
print("=" * 60)
print("ARTOS Collector: Initializing BusMonitor1553 module...")
print("=" * 60)
bus_monitor = BusMonitorCore()
# Configure the module with network parameters
config = {
'ip': '127.0.0.1',
'send_port': 5001,
'recv_port': 5002
}
success = bus_monitor.initialize(config)
if not success:
print("ERROR: Failed to initialize BusMonitor1553")
return
print(f"✓ BusMonitor1553 initialized: {bus_monitor.get_status()}\n")
# 2. CALLBACK REGISTRATION
print("=" * 60)
print("ARTOS Collector: Registering test validation callbacks...")
print("=" * 60)
def on_radar_mode_change(message):
"""Called when A2 message (radar mode) is received."""
print(f" → TEST HOOK: Radar mode changed to: {message}")
def on_target_detected(message):
"""Called when B1 message (target data) is received."""
print(f" → TEST HOOK: Target detected: {message}")
bus_monitor.register_callback("msg_a2", on_radar_mode_change)
bus_monitor.register_callback("msg_b1", on_target_detected)
print("✓ Callbacks registered\n")
# 3. START SESSION WITH RECORDING
print("=" * 60)
print("ARTOS Collector: Starting test session with recording...")
print("=" * 60)
recording_path = Path("test_recordings/session_001.json")
recording_path.parent.mkdir(exist_ok=True)
bus_monitor.start_recording(filepath=recording_path)
bus_monitor.start_session()
print(f"✓ Session started, recording to: {recording_path}\n")
# 4. TEST EXECUTION PHASE
print("=" * 60)
print("ARTOS Collector: Executing test scenario...")
print("=" * 60)
# Simulate waiting for specific messages during test
print(" Waiting for msg_a2 (radar initialization)...")
if bus_monitor.wait_for_message("msg_a2", timeout=5.0):
print(" ✓ msg_a2 received within timeout")
else:
print(" ✗ TIMEOUT: msg_a2 not received")
print(" Waiting for msg_b1 (first target)...")
if bus_monitor.wait_for_message("msg_b1", timeout=10.0):
print(" ✓ msg_b1 received within timeout")
else:
print(" ✗ TIMEOUT: msg_b1 not received")
# Let the test run for a bit
print("\n Test running... (collecting data for 3 seconds)")
time.sleep(3)
# 5. STATUS MONITORING
print("\n" + "=" * 60)
print("ARTOS Collector: Checking module status...")
print("=" * 60)
status = bus_monitor.get_status()
print(f" Module status: {status}")
if not status['connected']:
print(" ⚠ WARNING: Module shows disconnected state!")
if status['errors']:
print(f" ⚠ ERRORS detected: {status['errors']}")
print(f" Recorded events: {status['recorded_events']}")
# 6. CLEANUP PHASE
print("\n" + "=" * 60)
print("ARTOS Collector: Stopping session and saving results...")
print("=" * 60)
saved_path = bus_monitor.stop_recording(save=True)
bus_monitor.stop_session()
print(f"✓ Session stopped")
print(f"✓ Recording saved to: {saved_path}")
# 7. REPLAY DEMONSTRATION
print("\n" + "=" * 60)
print("ARTOS Collector: Loading recording for analysis...")
print("=" * 60)
# In a real scenario, this would be done in a separate test run
# to validate against recorded baseline
bus_monitor.load_recording(saved_path)
# Access recorded events
print(f"✓ Loaded {len(bus_monitor.recorder.events)} events")
# Filter events by type
received_messages = bus_monitor.recorder.get_events_by_type('msg_received')
print(f" - Messages received: {len(received_messages)}")
# Show first few events
print("\n First 3 recorded events:")
for i, event in enumerate(bus_monitor.recorder.events[:3]):
print(f" {i+1}. [{event.event_type}] {event.label} @ {event.timestamp:.6f}")
print("\n" + "=" * 60)
print("ARTOS Collector: Test complete!")
print("=" * 60)
if __name__ == "__main__":
example_artos_collector_workflow()