152 lines
4.7 KiB
Python
152 lines
4.7 KiB
Python
"""
|
|
Example: Using BusMonitor1553 as an ARTOS Module
|
|
|
|
This demonstrates how the ARTOS Collector would integrate and use
|
|
the BusMonitor1553 module during automated test execution.
|
|
"""
|
|
import time
|
|
from pathlib import Path
|
|
from pybusmonitor1553.core.bus_monitor_core import BusMonitorCore
|
|
|
|
|
|
def example_artos_collector_workflow():
|
|
"""
|
|
Simulates the ARTOS Collector orchestrating a test with BusMonitor1553.
|
|
|
|
This shows:
|
|
1. Module initialization
|
|
2. Session start/stop
|
|
3. Message monitoring with callbacks
|
|
4. Recording for deterministic replay
|
|
5. Status monitoring
|
|
"""
|
|
|
|
# 1. INITIALIZATION PHASE
|
|
print("=" * 60)
|
|
print("ARTOS Collector: Initializing BusMonitor1553 module...")
|
|
print("=" * 60)
|
|
|
|
bus_monitor = BusMonitorCore()
|
|
|
|
# Configure the module with network parameters
|
|
config = {
|
|
'ip': '127.0.0.1',
|
|
'send_port': 5001,
|
|
'recv_port': 5002
|
|
}
|
|
|
|
success = bus_monitor.initialize(config)
|
|
if not success:
|
|
print("ERROR: Failed to initialize BusMonitor1553")
|
|
return
|
|
|
|
print(f"✓ BusMonitor1553 initialized: {bus_monitor.get_status()}\n")
|
|
|
|
# 2. CALLBACK REGISTRATION
|
|
print("=" * 60)
|
|
print("ARTOS Collector: Registering test validation callbacks...")
|
|
print("=" * 60)
|
|
|
|
def on_radar_mode_change(message):
|
|
"""Called when A2 message (radar mode) is received."""
|
|
print(f" → TEST HOOK: Radar mode changed to: {message}")
|
|
|
|
def on_target_detected(message):
|
|
"""Called when B1 message (target data) is received."""
|
|
print(f" → TEST HOOK: Target detected: {message}")
|
|
|
|
bus_monitor.register_callback("msg_a2", on_radar_mode_change)
|
|
bus_monitor.register_callback("msg_b1", on_target_detected)
|
|
print("✓ Callbacks registered\n")
|
|
|
|
# 3. START SESSION WITH RECORDING
|
|
print("=" * 60)
|
|
print("ARTOS Collector: Starting test session with recording...")
|
|
print("=" * 60)
|
|
|
|
recording_path = Path("test_recordings/session_001.json")
|
|
recording_path.parent.mkdir(exist_ok=True)
|
|
|
|
bus_monitor.start_recording(filepath=recording_path)
|
|
bus_monitor.start_session()
|
|
|
|
print(f"✓ Session started, recording to: {recording_path}\n")
|
|
|
|
# 4. TEST EXECUTION PHASE
|
|
print("=" * 60)
|
|
print("ARTOS Collector: Executing test scenario...")
|
|
print("=" * 60)
|
|
|
|
# Simulate waiting for specific messages during test
|
|
print(" Waiting for msg_a2 (radar initialization)...")
|
|
if bus_monitor.wait_for_message("msg_a2", timeout=5.0):
|
|
print(" ✓ msg_a2 received within timeout")
|
|
else:
|
|
print(" ✗ TIMEOUT: msg_a2 not received")
|
|
|
|
print(" Waiting for msg_b1 (first target)...")
|
|
if bus_monitor.wait_for_message("msg_b1", timeout=10.0):
|
|
print(" ✓ msg_b1 received within timeout")
|
|
else:
|
|
print(" ✗ TIMEOUT: msg_b1 not received")
|
|
|
|
# Let the test run for a bit
|
|
print("\n Test running... (collecting data for 3 seconds)")
|
|
time.sleep(3)
|
|
|
|
# 5. STATUS MONITORING
|
|
print("\n" + "=" * 60)
|
|
print("ARTOS Collector: Checking module status...")
|
|
print("=" * 60)
|
|
|
|
status = bus_monitor.get_status()
|
|
print(f" Module status: {status}")
|
|
|
|
if not status['connected']:
|
|
print(" ⚠ WARNING: Module shows disconnected state!")
|
|
|
|
if status['errors']:
|
|
print(f" ⚠ ERRORS detected: {status['errors']}")
|
|
|
|
print(f" Recorded events: {status['recorded_events']}")
|
|
|
|
# 6. CLEANUP PHASE
|
|
print("\n" + "=" * 60)
|
|
print("ARTOS Collector: Stopping session and saving results...")
|
|
print("=" * 60)
|
|
|
|
saved_path = bus_monitor.stop_recording(save=True)
|
|
bus_monitor.stop_session()
|
|
|
|
print(f"✓ Session stopped")
|
|
print(f"✓ Recording saved to: {saved_path}")
|
|
|
|
# 7. REPLAY DEMONSTRATION
|
|
print("\n" + "=" * 60)
|
|
print("ARTOS Collector: Loading recording for analysis...")
|
|
print("=" * 60)
|
|
|
|
# In a real scenario, this would be done in a separate test run
|
|
# to validate against recorded baseline
|
|
bus_monitor.load_recording(saved_path)
|
|
|
|
# Access recorded events
|
|
print(f"✓ Loaded {len(bus_monitor.recorder.events)} events")
|
|
|
|
# Filter events by type
|
|
received_messages = bus_monitor.recorder.get_events_by_type('msg_received')
|
|
print(f" - Messages received: {len(received_messages)}")
|
|
|
|
# Show first few events
|
|
print("\n First 3 recorded events:")
|
|
for i, event in enumerate(bus_monitor.recorder.events[:3]):
|
|
print(f" {i+1}. [{event.event_type}] {event.label} @ {event.timestamp:.6f}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("ARTOS Collector: Test complete!")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
example_artos_collector_workflow()
|