PlatSim_Genova/TestEnvironment/scripts/GRIFO_M_PBIT_mock.py

1470 lines
65 KiB
Python

"""
GRIFO_M_PBIT_mock.py - Simulation Mode for Test Execution Without Hardware
This module provides mock implementations of hardware interfaces to allow
test execution without physical radar system or 1553 bus interface.
Usage:
python GRIFO_M_PBIT.py --simulate
Features:
- Simulates 1553 message reception (B6, B8, B9)
- Configurable BIT timing and pass/fail scenarios
- Simulates serial terminal messages (%%E, %%F, RECYCLE)
- Simulates power control (no-op)
- Zero modification to production test code
Author: Test Automation Team
Date: 2026-01-29
"""
import time
import random
import logging
from typing import Any, AnyStr
# Try to import GUI (optional - won't break if not available)
try:
from GRIFO_M_PBIT_gui import TestMonitorGUI
GUI_AVAILABLE = True
except ImportError:
GUI_AVAILABLE = False
TestMonitorGUI = None
# ====================
# SIMULATION CONFIGURATION
# ====================
# BIT completion timing (seconds)
# Fast mode for simulation testing (use 15-25 for realistic timing)
PBIT_TIME_MIN = 2.0
PBIT_TIME_MAX = 5.0
# Simulation scenarios for B6/B8 field values
# 'normal': All tests pass (false = pass for inverse logic fields)
# 'pedestal_fail': Only pedestal fails (expected HW setup limitation)
# 'processor_fail': Processor status fails
# 'transmitter_fail': Transmitter status fails
# 'receiver_fail': Receiver status fails
# 'random_failures': Random failures to test drill-down logic
# 'recycle_recovery_1': Processor fail → 1 RECYCLE → auto-recovery (self-healing)
# 'recycle_recovery_2': Processor fail → 2 RECYCLEs → auto-recovery (slower self-healing)
# 'recycle_fail_3': Processor fail → 3+ RECYCLEs → NO recovery (requires power cycle)
# 'sp_tests_fail': Signal Processor individual tests fail (SP1, SP2, SP3)
# 'tx_tests_fail': Transmitter individual tests fail (TX10, TX11, TX12)
# 'agc_tests_fail': AGC individual tests fail (AGC1, AGC2, AGC5)
# 'mixed_b8_failures': Random B8 test failures (demonstrates test failure analysis)
# 'mixed': Varies between scenarios based on iteration (auto-configured per run)
SIMULATION_SCENARIO = 'mixed' # Options: 'normal', 'processor_fail', 'transmitter_fail', 'receiver_fail', 'random_failures', 'recycle_recovery_1', 'recycle_recovery_2', 'recycle_fail_3', 'sp_tests_fail', 'tx_tests_fail', 'agc_tests_fail', 'mixed_b8_failures', 'mixed'
# Number of runs for mixed mode (can be changed freely)
NUM_RUNS = 10 # Set this to any number (e.g., 50, 100, etc.)
# Template scenarios available for random generation
# These are the building blocks used to create the dynamic scenario list
# Each scenario defines expected behavior for test validation
AVAILABLE_SCENARIOS_INFO = {
'normal': {
'name': 'Normal Operation',
'description': 'All systems operational - no failures',
'expected_failures': [],
'expected_passes': ['All B6 status fields (except pedestal)', 'All B8 diagnostic fields', 'Target generation'],
'notes': 'Pedestal always fails due to HW test setup limitation (unit not present)'
},
'processor_fail': {
'name': 'Processor Failure',
'description': 'Processor subsystem failure detected',
'expected_failures': ['B6: processor_status = FAIL', 'B6: radar_fail_status = RDR_FAIL'],
'expected_passes': ['Other B6 fields', 'B8 diagnostic fields', 'Target generation'],
'notes': 'Test should detect processor failure and trigger B8 drill-down'
},
'transmitter_fail': {
'name': 'Transmitter Failure',
'description': 'Transmitter subsystem failure detected',
'expected_failures': ['B6: trasmitter_status = FAIL', 'B6: radar_fail_status = RDR_FAIL'],
'expected_passes': ['Other B6 fields', 'B8 diagnostic fields', 'Target generation'],
'notes': 'Test should detect transmitter failure and trigger B8 drill-down'
},
'receiver_fail': {
'name': 'Receiver Failure',
'description': 'Receiver subsystem failure detected',
'expected_failures': ['B6: receiver_status = FAIL', 'B6: radar_fail_status = RDR_FAIL'],
'expected_passes': ['Other B6 fields', 'B8 diagnostic fields', 'Target generation'],
'notes': 'Test should detect receiver failure and trigger B8 drill-down'
},
'random_failures': {
'name': 'Random Component Failures',
'description': 'Random selection of component failures (30% proc, 20% tx, 15% rx)',
'expected_failures': ['Variable - one or more of: processor_status, trasmitter_status, receiver_status'],
'expected_passes': ['Non-failed components', 'B8 diagnostic fields', 'Target generation'],
'notes': 'Validates test handling of multiple simultaneous failures'
},
'sp_tests_fail': {
'name': 'Signal Processor Test Failures',
'description': 'Individual SP tests fail (SP1, SP2, SP3)',
'expected_failures': ['B8: test_sp1_timer1_up = FAIL', 'B8: test_sp2_timer_dma_pxc_if = FAIL', 'B8: test_sp3_timer_internal = FAIL'],
'expected_passes': ['B6 status fields (except processor)', 'Other B8 tests'],
'notes': 'Demonstrates granular test failure analysis - specific SP component failures'
},
'tx_tests_fail': {
'name': 'Transmitter Test Failures',
'description': 'Individual TX tests fail (TX10, TX11, TX12)',
'expected_failures': ['B8: test_tx10_hv_ps_over_temperature_warning = FAIL', 'B8: test_tx11_twt_helix_over_current = FAIL', 'B8: test_tx12_cathode_to_helix_arc = FAIL'],
'expected_passes': ['B6 status fields (except transmitter)', 'Other B8 tests'],
'notes': 'Demonstrates granular test failure analysis - specific TX high-voltage issues'
},
'agc_tests_fail': {
'name': 'AGC Test Failures',
'description': 'Individual AGC tests fail (AGC1, AGC2, AGC5)',
'expected_failures': ['B8: test_agc1_internal_xyp_ram = FAIL', 'B8: test_agc2_external_xyp_ram = FAIL', 'B8: test_agc5_dual_port_ram = FAIL'],
'expected_passes': ['B6 status fields', 'Other B8 tests'],
'notes': 'Demonstrates granular test failure analysis - AGC memory subsystem failures'
},
'mixed_b8_failures': {
'name': 'Mixed B8 Test Failures',
'description': 'Random selection of individual B8 test failures across subsystems',
'expected_failures': ['Variable - random B8 tests from SP, TX, AGC, DP, IS categories'],
'expected_passes': ['B6 status fields (may show degradation)', 'Non-failed B8 tests'],
'notes': 'Best scenario to demonstrate test failure analysis with diverse test IDs'
},
'recycle_recovery_1': {
'name': 'RECYCLE Self-Healing (1 cycle)',
'description': 'Processor fails, then auto-recovers after 1 RECYCLE event',
'expected_failures': ['Initial: processor_status = FAIL', 'Serial: 1x RECYCLE event detected'],
'expected_passes': ['After RECYCLE: processor_status = PASS', 'Final: All systems OK'],
'notes': 'Simulates transient failure with fast self-healing. Test should detect RECYCLE and eventual recovery'
},
'recycle_recovery_2': {
'name': 'RECYCLE Self-Healing (2 cycles)',
'description': 'Processor fails, then auto-recovers after 2 RECYCLE events',
'expected_failures': ['Initial: processor_status = FAIL', 'Serial: 2x RECYCLE events detected'],
'expected_passes': ['After 2nd RECYCLE: processor_status = PASS', 'Final: All systems OK'],
'notes': 'Simulates persistent failure requiring multiple recovery attempts'
},
'recycle_fail_3': {
'name': 'RECYCLE No Recovery (3+ cycles)',
'description': 'Processor fails and NEVER recovers despite 3 RECYCLE events',
'expected_failures': ['Persistent: processor_status = FAIL', 'Serial: 3x RECYCLE events detected', 'No recovery achieved'],
'expected_passes': ['None - system remains in failed state'],
'notes': 'Simulates hard failure requiring power cycle. Test should fail and report persistent failure'
},
'pedestal_fail': {
'name': 'Pedestal Unit Failure',
'description': 'Pedestal subsystem failure (expected in current HW setup)',
'expected_failures': ['B6: pedestal_status = FAIL'],
'expected_passes': ['All other B6 fields', 'B8 diagnostic fields', 'Target generation'],
'notes': 'Known limitation: pedestal unit not present in test setup (expected failure)'
},
}
# Extract scenario names for backward compatibility
AVAILABLE_SCENARIOS = list(AVAILABLE_SCENARIOS_INFO.keys())
# Minimum percentage of 'normal' runs (no failures)
MIN_NORMAL_PERCENTAGE = 0.10 # 10% minimum
# Serial message simulation
SIMULATE_SERIAL_ERRORS = True # Generate occasional %%E messages
SIMULATE_SERIAL_FATAL = True # Generate occasional %%F messages
SIMULATE_RECYCLE_EVENTS = True # Generate RECYCLE at power-on and occasionally during operation
# Global reference to terminal for RECYCLE coordination
_global_terminal_ref = None
# Global scenario list (generated dynamically)
_scenario_list = None
# Global variable to store user-requested number of runs
_requested_runs = None
# Global reference to GUI (if enabled)
_gui_monitor = None
_gui_enabled = False
# Target acquisition simulation config
_run_on_target = False
_target_distance = 1180
_target_min_cycles = 3
_target_max_cycles = 12
# Target detection simulation: per-run success/fail pattern
# Controls whether target appears for each run to simulate realistic scenarios
# True = target visible (PASS), False = target not visible (FAIL)
_target_visibility_pattern = [
True, # Run 1: Target visible - PASS
False, # Run 2: Target not visible - FAIL
True, # Run 3: Target visible - PASS
# Pattern repeats for additional runs
]
def ask_num_runs() -> int:
"""
Ask user how many test runs to execute.
Returns:
Number of runs requested by user (minimum 1)
"""
global _gui_enabled
msg = ""
print(msg)
logging.info(msg)
msg = "=" * 80
print(msg)
logging.info(msg)
msg = "GRIFO PBIT - SIMULATION MODE"
print(msg)
logging.info(msg)
msg = "=" * 80
print(msg)
logging.info(msg)
msg = ""
print(msg)
logging.info(msg)
# Ask about GUI first
if GUI_AVAILABLE:
while True:
try:
gui_input = input("Enable real-time GUI monitor? (y/n) [y]: ").strip().lower()
if gui_input in ['', 'y', 'yes']:
_gui_enabled = True
msg = "✓ GUI monitor will be enabled"
print(msg)
logging.info(msg)
break
elif gui_input in ['n', 'no']:
_gui_enabled = False
msg = "✓ GUI monitor disabled (console only)"
print(msg)
logging.info(msg)
break
else:
msg = "Please enter 'y' or 'n'"
print(msg)
# Don't log invalid inputs
except (KeyboardInterrupt, EOFError):
_gui_enabled = False
break
msg = ""
print(msg)
logging.info(msg)
else:
msg = "[INFO] GUI monitor not available (tkinter import failed)"
print(msg)
logging.info(msg)
msg = ""
print(msg)
logging.info(msg)
while True:
try:
user_input = input("How many test runs do you want to execute? (minimum 1): ")
num_runs = int(user_input)
if num_runs < 1:
msg = f"Error: Number of runs must be at least 1. You entered: {num_runs}"
print(msg)
# Don't log errors
continue
msg = ""
print(msg)
logging.info(msg)
msg = f"✓ Configured for {num_runs} test run(s)"
print(msg)
logging.info(msg)
msg = ""
print(msg)
logging.info(msg)
# Show scenario distribution preview
if num_runs <= 20:
msg = f"Generating randomized scenario list for {num_runs} runs..."
print(msg)
logging.info(msg)
else:
msg = f"Generating randomized scenario list for {num_runs} runs..."
print(msg)
logging.info(msg)
msg = f"(At least {int(num_runs * MIN_NORMAL_PERCENTAGE)} runs will be 'normal' with no failures)"
print(msg)
logging.info(msg)
msg = ""
print(msg)
logging.info(msg)
return num_runs
except ValueError:
msg = f"Error: Please enter a valid number. You entered: '{user_input}'"
print(msg)
# Don't log errors
except KeyboardInterrupt:
msg = "\n\nOperation cancelled by user."
print(msg)
logging.warning(msg)
import sys
sys.exit(0)
except EOFError:
msg = "\n\nNo input received. Using default: 10 runs"
print(msg)
logging.info(msg)
return 10
def _log_scenario_info(scenario_name: str, run_number: int, total_runs: int):
"""
Log detailed scenario information in a highly visible format.
Args:
scenario_name: Name of the scenario being executed
run_number: Current run number (1-based)
total_runs: Total number of runs
"""
if scenario_name not in AVAILABLE_SCENARIOS_INFO:
logging.warning(f"[MOCK] Unknown scenario: {scenario_name}")
return
info = AVAILABLE_SCENARIOS_INFO[scenario_name]
# Create highly visible separator
logging.info("")
logging.info("#" * 100)
logging.info("#" * 100)
logging.info(f"### RUN {run_number}/{total_runs}: {info['name'].upper()}")
logging.info("#" * 100)
logging.info("")
logging.info(f"SCENARIO: {scenario_name}")
logging.info(f"DESCRIPTION: {info['description']}")
logging.info("")
logging.info("EXPECTED FAILURES:")
if info['expected_failures']:
for failure in info['expected_failures']:
logging.info(f"{failure}")
else:
logging.info(" (none)")
logging.info("")
logging.info("EXPECTED PASSES:")
if info['expected_passes']:
for pass_item in info['expected_passes']:
logging.info(f"{pass_item}")
else:
logging.info(" (none)")
logging.info("")
if info['notes']:
logging.info(f"NOTES: {info['notes']}")
logging.info("")
logging.info("#" * 100)
logging.info("#" * 100)
logging.info("")
# Update GUI if enabled
global _gui_monitor
if _gui_monitor:
_gui_monitor.update_scenario(
name=info['name'],
description=info['description'],
expected_failures=info['expected_failures'],
expected_passes=info['expected_passes'],
notes=info['notes']
)
_gui_monitor.log_event('info', f"RUN {run_number}/{total_runs}: {info['name']}")
def _generate_scenario_list(num_runs: int, min_normal_pct: float = 0.10) -> list:
"""
Generate a dynamic list of scenarios for the specified number of runs.
Args:
num_runs: Total number of runs to generate scenarios for
min_normal_pct: Minimum percentage of 'normal' scenarios (default 10%)
Returns:
List of scenario names to be used for each run
"""
# Calculate minimum number of 'normal' scenarios
min_normal = max(1, int(num_runs * min_normal_pct))
# Get non-normal scenarios from available templates
failure_scenarios = [s for s in AVAILABLE_SCENARIOS if s != 'normal']
# Start with guaranteed 'normal' scenarios
scenarios = ['normal'] * min_normal
# Fill remaining slots with random scenarios (including more 'normal' if random picks it)
remaining = num_runs - min_normal
for _ in range(remaining):
scenarios.append(random.choice(AVAILABLE_SCENARIOS))
# Shuffle to randomize order
random.shuffle(scenarios)
# Log scenario distribution
scenario_counts = {}
for s in AVAILABLE_SCENARIOS:
count = scenarios.count(s)
if count > 0:
scenario_counts[s] = count
logging.info(f"[MOCK] Generated {num_runs} scenarios:")
for scenario, count in sorted(scenario_counts.items()):
pct = (count / num_runs) * 100
logging.info(f" - {scenario}: {count} runs ({pct:.1f}%)")
return scenarios
# ====================
# MOCK 1553 INTERFACE
# ====================
class MockGrifo1553Interface:
"""
Mock implementation of the 1553 interface that simulates message reception
and field value responses.
Simulates the behavior of the real interpreter.PyInterfaceManager interface
without requiring hardware or native libraries.
"""
def __init__(self):
"""Initialize mock interface with simulated state."""
global _scenario_list, _requested_runs
self._message_counters = {
'B6_MsgRdrSettingsAndParametersTellback': 0,
'B8_MsgBitReport': 0,
'B9': 0,
}
# Do not auto-start here; start() will be invoked on the first power-on
self._started = False
self._bit_available = False
self._start_time = None
self._bit_available_time = None
self._run_count = 0 # Track how many times start() has been called (0 until first start)
self._recycle_scenario = None # Track if this is a RECYCLE recovery scenario
self._recycle_count = 0 # Count RECYCLEs in current run
# Generate dynamic scenario list if in mixed mode and not already generated
# Only generate once - if _scenario_list is already set, use it
if SIMULATION_SCENARIO == 'mixed' and _scenario_list is None:
# Use user-requested runs if available, otherwise fall back to NUM_RUNS
num_runs_to_use = _requested_runs if _requested_runs is not None else NUM_RUNS
_scenario_list = _generate_scenario_list(num_runs_to_use, MIN_NORMAL_PERCENTAGE)
# Update GUI with total runs (only on first initialization)
if _gui_monitor:
_gui_monitor.update_status(run_total=len(_scenario_list))
logging.info(f"[MOCK] GUI updated with {len(_scenario_list)} total runs")
# Provide a minimal, safe initialization of field_values so other modules
# can query fields before the first start() without raising AttributeError.
# Full scenario-specific initialization happens in start().
self._field_values = {}
b6_status_fields = [
"array_status",
"pedestal_status",
"pressurization_status",
"processor_over_temperature_alarm",
"processor_status",
"receiver_status",
"rx_front_end_status",
"servoloop_over_temperature_alarm",
"servoloop_status",
"trasmitter_over_temperature_alarm",
"trasmitter_status",
]
for field in b6_status_fields:
full_name = f"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_{field}"
self._field_values[full_name] = "false"
# Pedestal is a known missing unit
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_OK"
# B9 fields: Initialize with default values (will be set properly in _initialize_field_values)
# Target visibility is determined per-run based on _target_visibility_pattern
self._field_values["b9_t_num"] = 0
self._field_values["b9_t1_rng"] = 0
logging.info(f"[MOCK] 1553 Interface initialized (awaiting start)")
# Update GUI
if _gui_monitor:
_gui_monitor.log_event('info', 'Mock 1553 interface initialized')
def _initialize_field_values(self):
"""Initialize simulated field_values based on configuration scenario."""
global _scenario_list
# Determine scenario for this run
current_scenario = SIMULATION_SCENARIO
# CRITICAL DEBUG: Log entry to this function
print(f"\n{'='*100}\n[CRITICAL DEBUG] _initialize_field_values() CALLED\n{'='*100}")
print(f"run_count = {self._run_count}")
print(f"SIMULATION_SCENARIO = '{SIMULATION_SCENARIO}'")
print(f"current_scenario = '{current_scenario}'")
print(f"_scenario_list = {_scenario_list}")
logging.info(f"\n{'='*100}\n[CRITICAL DEBUG] _initialize_field_values() CALLED\n{'='*100}")
logging.info(f"run_count = {self._run_count}")
logging.info(f"SIMULATION_SCENARIO = '{SIMULATION_SCENARIO}'")
logging.info(f"current_scenario = '{current_scenario}'")
logging.info(f"_scenario_list = {_scenario_list}")
# If mixed mode, use dynamically generated scenario list
if current_scenario == 'mixed':
print(f"[DEBUG] Entering MIXED MODE branch")
logging.info(f"[DEBUG] Entering MIXED MODE branch")
if _scenario_list is None:
_scenario_list = _generate_scenario_list(NUM_RUNS, MIN_NORMAL_PERCENTAGE)
# Use modulo to cycle through list if runs exceed list length
# _run_count is 1-based (starts at 1), so subtract 1 for 0-based array indexing
scenario_index = (self._run_count - 1) % len(_scenario_list)
current_scenario = _scenario_list[scenario_index]
print(f"[DEBUG] Mixed mode: scenario_index={scenario_index}, selected scenario='{current_scenario}'")
logging.info(f"[DEBUG] Mixed mode: scenario_index={scenario_index}, selected scenario='{current_scenario}'")
# Log detailed scenario information in highly visible format
print(f"[DEBUG] About to call _log_scenario_info('{current_scenario}', {self._run_count}, {len(_scenario_list)})")
logging.info(f"[DEBUG] About to call _log_scenario_info('{current_scenario}', {self._run_count}, {len(_scenario_list)})")
_log_scenario_info(current_scenario, self._run_count, len(_scenario_list))
print(f"[DEBUG] _log_scenario_info() completed")
logging.info(f"[DEBUG] _log_scenario_info() completed")
else:
print(f"[DEBUG] Entering SINGLE MODE branch (not mixed)")
logging.info(f"[DEBUG] Entering SINGLE MODE branch (not mixed)")
# Single scenario mode - still log it prominently
_log_scenario_info(current_scenario, self._run_count, 1)
print(f"[DEBUG] _log_scenario_info() completed")
logging.info(f"[DEBUG] _log_scenario_info() completed")
# Default: All fields pass (inverse logic: false = pass, true = fail)
self._field_values = {}
# === B6 LRU Status Fields (inverse logic) ===
b6_status_fields = [
"array_status",
"pedestal_status",
"pressurization_status",
"processor_over_temperature_alarm",
"processor_status",
"receiver_status",
"rx_front_end_status",
"servoloop_over_temperature_alarm",
"servoloop_status",
"trasmitter_over_temperature_alarm",
"trasmitter_status",
]
for field in b6_status_fields:
full_name = f"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_{field}"
self._field_values[full_name] = "false" # Pass by default
# Simulate HW test setup limitation: pedestal unit not present (ALWAYS fail)
# This is a known limitation tracked in KNOWN_FAILURES list in GRIFO_M_PBIT.py
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status"] = "true"
# radar_fail_status is enum: RDR_OK = pass, RDR_FAIL = fail
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_OK"
# Apply scenario-specific failures
if current_scenario == 'pedestal_fail':
# Simulate known HW limitation: pedestal not present
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status"] = "true"
logging.info("[MOCK APPLY] ✗ Setting pedestal_status = FAIL")
elif current_scenario == 'processor_fail':
# Processor failure
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL" # Aggregate flag
logging.info("[MOCK APPLY] ✗ Setting processor_status = FAIL")
logging.info("[MOCK APPLY] ✗ Setting radar_fail_status = RDR_FAIL")
elif current_scenario == 'transmitter_fail':
# Transmitter failure
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL" # Aggregate flag
logging.info("[MOCK APPLY] ✗ Setting trasmitter_status = FAIL")
logging.info("[MOCK APPLY] ✗ Setting radar_fail_status = RDR_FAIL")
elif current_scenario == 'receiver_fail':
# Receiver failure
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_receiver_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL" # Aggregate flag
logging.info("[MOCK APPLY] ✗ Setting receiver_status = FAIL")
logging.info("[MOCK APPLY] ✗ Setting radar_fail_status = RDR_FAIL")
elif current_scenario == 'random_failures':
# Randomly fail some B6 fields to trigger B8 drill-down
has_failure = False
failures = []
if random.random() < 0.3:
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "true"
has_failure = True
failures.append("processor_status")
if random.random() < 0.2:
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_status"] = "true"
has_failure = True
failures.append("trasmitter_status")
if random.random() < 0.15:
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_receiver_status"] = "true"
has_failure = True
failures.append("receiver_status")
if has_failure:
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL" # Aggregate flag
logging.info(f"[MOCK APPLY] ✗ Random failures applied: {', '.join(failures)}")
logging.info("[MOCK APPLY] ✗ Setting radar_fail_status = RDR_FAIL")
else:
logging.info("[MOCK APPLY] ✓ No random failures triggered (all passed)")
elif current_scenario == 'recycle_recovery_1':
# Processor fails initially, then auto-recovers after 1 RECYCLE
# Simulates self-healing behavior (transient failure)
self._recycle_scenario = 'recovery_1'
logging.info("[MOCK APPLY] ⚠ RECYCLE scenario enabled: 1 cycle to recovery")
elif current_scenario == 'recycle_recovery_2':
# Processor fails initially, then auto-recovers after 2 RECYCLEs
# Simulates slower self-healing (needs multiple attempts)
self._recycle_scenario = 'recovery_2'
logging.info("[MOCK APPLY] ⚠ RECYCLE scenario enabled: 2 cycles to recovery")
elif current_scenario == 'recycle_fail_3':
# Processor fails and does NOT recover even after 3 RECYCLEs
# Requires power cycle (run will fail)
self._recycle_scenario = 'fail_3'
logging.info("[MOCK APPLY] ⚠ RECYCLE scenario enabled: NO recovery (persistent failure)")
elif current_scenario == 'sp_tests_fail':
# Signal Processor individual tests fail
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL"
# Specific B8 SP test failures (inverse logic: true = fail)
self._field_values["signal_processor_test_results_SignalProcessorTestResults_test_sp1_timer1_up"] = "true"
self._field_values["signal_processor_test_results_SignalProcessorTestResults_test_sp2_timer_dma_pxc_if"] = "true"
self._field_values["signal_processor_test_results_SignalProcessorTestResults_test_sp3_timer_internal"] = "true"
logging.info("[MOCK APPLY] ✗ Setting processor_status = FAIL")
logging.info("[MOCK APPLY] ✗ B8 failures: SP1 (timer1_up), SP2 (timer_dma_pxc_if), SP3 (timer_internal)")
elif current_scenario == 'tx_tests_fail':
# Transmitter individual tests fail
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL"
# Specific B8 TX test failures (inverse logic: true = fail)
self._field_values["transmitter_test_results_w1_TransmitterTestResultsW1_test_tx10_hv_ps_over_temperature_warning"] = "true"
self._field_values["transmitter_test_results_w1_TransmitterTestResultsW1_test_tx11_twt_helix_over_current"] = "true"
self._field_values["transmitter_test_results_w1_TransmitterTestResultsW1_test_tx12_cathode_to_helix_arc"] = "true"
logging.info("[MOCK APPLY] ✗ Setting trasmitter_status = FAIL")
logging.info("[MOCK APPLY] ✗ B8 failures: TX10 (hv_ps_over_temp), TX11 (twt_helix_overcurrent), TX12 (cathode_arc)")
elif current_scenario == 'agc_tests_fail':
# AGC individual tests fail (no category-level failure, just B8 tests)
# This shows that B8 can fail without B6 category failure
# Specific B8 AGC test failures (inverse logic: true = fail)
self._field_values["agc_test_results_AGCTestResults_test_agc1_internal_xyp_ram"] = "true"
self._field_values["agc_test_results_AGCTestResults_test_agc2_external_xyp_ram"] = "true"
self._field_values["agc_test_results_AGCTestResults_test_agc5_dual_port_ram"] = "true"
logging.info("[MOCK APPLY] ✗ B8 failures only (no B6 category failure): AGC1, AGC2, AGC5 (memory tests)")
elif current_scenario == 'mixed_b8_failures':
# Random B8 test failures across subsystems
failures = []
# Signal Processor tests (30% chance each)
if random.random() < 0.3:
self._field_values["signal_processor_test_results_SignalProcessorTestResults_test_sp1_timer1_up"] = "true"
failures.append("SP1")
if random.random() < 0.3:
self._field_values["signal_processor_test_results_SignalProcessorTestResults_test_sp5_video1_without_ad"] = "true"
failures.append("SP5")
# Transmitter tests (25% chance each)
if random.random() < 0.25:
self._field_values["transmitter_test_results_w1_TransmitterTestResultsW1_test_tx10_hv_ps_over_temperature_warning"] = "true"
failures.append("TX10")
if random.random() < 0.25:
self._field_values["transmitter_test_results_w1_TransmitterTestResultsW1_test_tx5_tx_rf_output_level"] = "true"
failures.append("TX5")
# AGC tests (20% chance each)
if random.random() < 0.2:
self._field_values["agc_test_results_AGCTestResults_test_agc1_internal_xyp_ram"] = "true"
failures.append("AGC1")
if random.random() < 0.2:
self._field_values["agc_test_results_AGCTestResults_test_agc6_agc_machine"] = "true"
failures.append("AGC6")
# Data Processor tests (15% chance each)
if random.random() < 0.15:
self._field_values["data_processor_test_results_DataProcessorTestResults_test_dp1_486_cpu_tests"] = "true"
failures.append("DP1")
# Integrated System tests (15% chance each)
if random.random() < 0.15:
self._field_values["integrated_system_test_results_IntegratedSystemTestResults_test_is1_upconverter_chain_levels"] = "true"
failures.append("IS1")
if failures:
# Set category failures if subsystem affected
if any(f.startswith('SP') for f in failures):
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "true"
if any(f.startswith('TX') for f in failures):
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_status"] = "true"
if failures: # Any failure triggers radar fail
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL"
logging.info(f"[MOCK APPLY] ✗ Mixed B8 test failures: {', '.join(failures)}")
else:
logging.info("[MOCK APPLY] ✓ No random B8 failures triggered (all passed)")
else: # 'normal'
logging.info("[MOCK APPLY] ✓ All systems nominal (no failures injected)")
# === B8 Diagnostic Fields (inverse logic for test results) ===
# Initialize all B8 fields as pass (false)
# These are checked only if B6 shows real failures
# BIT report control fields
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_bit_report_available"] = "false" # Initially not available
# === B9 Target Message Fields (for tgt_gen test) ===
# These are numeric fields, not boolean
# Use _target_visibility_pattern to determine if target is visible for this run
# BUT only if run_on_target is True
global _target_visibility_pattern, _run_on_target
run_index = (self._run_count - 1) % len(_target_visibility_pattern)
target_visible = _target_visibility_pattern[run_index]
# Only simulate target if run_on_target is enabled
if _run_on_target and target_visible:
self._field_values["b9_t_num"] = 1 # Number of targets (integer) - 1 target visible
self._field_values["b9_t1_rng"] = 1180 # Target 1 range (integer) - in expected range 1179-1186
# Set _last_simulated_target for statistics reporting
self._last_simulated_target = {
'distance': 1180,
'appeared_after_cycles': run_index + 1 # Use run_index as cycle indicator
}
logging.info(f"[MOCK] Run {self._run_count}: Target will be VISIBLE (pattern index {run_index})")
else:
self._field_values["b9_t_num"] = 0 # Number of targets (integer) - no targets visible
self._field_values["b9_t1_rng"] = 0 # No range when no targets
# Clear _last_simulated_target when target not visible or test disabled
self._last_simulated_target = None
logging.info(f"[MOCK] Run {self._run_count}: Target will be NOT VISIBLE (pattern index {run_index})")
def start(self):
"""Start the interface (simulate bus monitoring)."""
global _global_terminal_ref
self._started = True
self._start_time = time.perf_counter()
self._bit_available = False
self._bit_available_time = self._start_time + random.uniform(PBIT_TIME_MIN, PBIT_TIME_MAX)
self._recycle_count = 0 # Reset for new run
# Don't reset message counters - they should continue incrementing
# to simulate continuous bus monitoring even across power cycles
# Increment run counter BEFORE initializing field values (so scenario is correct)
self._run_count += 1
# Re-initialize field values for this run (applies scenario based on run_count)
self._initialize_field_values()
# Handle RECYCLE recovery scenarios
if self._recycle_scenario:
self._simulate_recycle_scenario()
logging.info(f"[MOCK] 1553 Interface started - simulating BIT execution (Run {self._run_count})")
# Update GUI
if _gui_monitor:
_gui_monitor.update_status(run_current=self._run_count, power_on=True, pbit_time=0.0)
_gui_monitor.log_event('info', f'Starting run {self._run_count}')
def _simulate_recycle_scenario(self):
"""
Simulate RECYCLE recovery scenarios with serial terminal messages.
Three scenarios:
1. recovery_1: Processor fails, 1 RECYCLE, then recovers
2. recovery_2: Processor fails, 2 RECYCLEs, then recovers
3. fail_3: Processor fails, 3+ RECYCLEs, never recovers (requires power cycle)
"""
global _global_terminal_ref
if self._recycle_scenario == 'recovery_1':
# Processor fails initially
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL"
logging.info("[MOCK RECYCLE] Initial processor failure detected")
# Simulate 1 RECYCLE via serial terminal
if _global_terminal_ref:
time.sleep(0.5) # Delay before RECYCLE
_global_terminal_ref._simulate_recycle_message()
self._recycle_count += 1
# After RECYCLE, system recovers
time.sleep(0.3)
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "false"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_OK"
logging.info("[MOCK RECYCLE] System recovered after 1 RECYCLE (self-healing)")
elif self._recycle_scenario == 'recovery_2':
# Processor fails initially
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL"
logging.info("[MOCK RECYCLE] Initial processor failure detected")
# Simulate 2 RECYCLEs via serial terminal
if _global_terminal_ref:
for i in range(2):
time.sleep(0.5) # Delay before each RECYCLE
_global_terminal_ref._simulate_recycle_message()
self._recycle_count += 1
logging.info(f"[MOCK RECYCLE] RECYCLE {i+1}/2")
# After 2nd RECYCLE, system recovers
time.sleep(0.3)
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "false"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_OK"
logging.info("[MOCK RECYCLE] System recovered after 2 RECYCLEs (slower self-healing)")
elif self._recycle_scenario == 'fail_3':
# Processor fails initially and stays failed
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"] = "true"
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"] = "RDR_FAIL"
logging.info("[MOCK RECYCLE] Initial processor failure detected")
# Simulate 3 RECYCLEs via serial terminal (all fail to recover)
if _global_terminal_ref:
for i in range(3):
time.sleep(0.5) # Delay before each RECYCLE
_global_terminal_ref._simulate_recycle_message()
self._recycle_count += 1
logging.info(f"[MOCK RECYCLE] RECYCLE {i+1}/3 - NO RECOVERY")
# System NEVER recovers - stays in fail state
logging.warning("[MOCK RECYCLE] System did NOT recover after 3 RECYCLEs - requires power cycle")
def stop(self):
"""Stop the interface."""
self._started = False
logging.info("[MOCK] 1553 Interface stopped")
def getSingleMessageReceivedSz(self, msg_name: str) -> int:
"""
Get simulated message reception counter.
Args:
msg_name: Message name (e.g., 'B6_MsgRdrSettingsAndParametersTellback')
Returns:
Simulated counter that increments over time
"""
if not self._started:
logging.warning(f"[MOCK] getSingleMessageReceivedSz called but interface not started! Returning 0")
return 0
# Increment counter to simulate periodic message reception
self._message_counters[msg_name] += 1
current_count = self._message_counters[msg_name]
# Log periodically for debugging
if current_count % 100 == 0:
logging.debug(f"[MOCK] Message {msg_name}: counter at {current_count}")
# Simulate BIT becoming available after configured time
if not self._bit_available and time.perf_counter() >= self._bit_available_time:
self._bit_available = True
self._field_values["radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_bit_report_available"] = "true"
bit_time = time.perf_counter() - self._start_time
logging.info(f"[MOCK] BIT report available after {bit_time:.1f}s")
# Update GUI
if _gui_monitor:
_gui_monitor.update_status(pbit_available=True, pbit_time=bit_time)
_gui_monitor.log_event('success', f'BIT completed in {bit_time:.1f}s')
# Update GUI with current PBIT time
if _gui_monitor and self._started:
current_time = time.perf_counter() - self._start_time
_gui_monitor.update_status(pbit_time=current_time)
return self._message_counters.get(msg_name, 0)
def getMessageFieldValue(self, msg_name: str, field_name: str):
"""
Get simulated field value.
Args:
msg_name: Message name
field_name: Field name
Returns:
Simulated field value (type depends on field: str, int, float, bool)
"""
# Special handling for bit_report_available field
if field_name == "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_bit_report_available":
return "true" if self._bit_available else "false"
# Return configured value or default based on field type
value = self._field_values.get(field_name)
if value is None:
# Default value based on field name pattern
if any(x in field_name for x in ['_num', '_rng', '_count', '_id']):
value = 0 # Numeric fields default to 0
else:
value = "false" # Boolean/enum fields default to "false" (pass)
# Add some logging for visibility during testing
if value not in [0, "false", "RDR_OK"] and random.random() < 0.1: # Log non-default values occasionally
logging.debug(f"[MOCK] Field {field_name.split('_')[-1]}: {value}")
return value
def isMessageReadOnly(self, msg_name: str) -> bool:
"""Check if message is read-only (RX from radar perspective)."""
# B6, B7, B8, B9 are all RX messages from radar
return msg_name.startswith('B')
def assignMessageFieldValue(self, msg_name: str, field_name: str, value: Any) -> bool:
"""Assign value to TX message field (not used in this test)."""
logging.debug(f"[MOCK] assignMessageFieldValue: {msg_name}.{field_name} = {value}")
return True
def commitChanges(self):
"""Commit TX message changes (not used in this test)."""
pass
def sendLastCommittedMessage(self, msg_name: str) -> bool:
"""Send TX message (not used in this test)."""
return True
def getMessageErrorSz(self) -> int:
"""Get simulated error count."""
return 0
def logStart(self, level: int, directory: str):
"""Start logging (no-op in simulation)."""
logging.info(f"[MOCK] Log started at level {level} in directory: {directory}")
def logStop(self):
"""Stop logging (no-op in simulation)."""
logging.info("[MOCK] Log stopped")
# ====================
# MOCK SERIAL TERMINAL
# ====================
class MockSerialTerminal:
"""
Mock serial terminal that simulates message reception without physical serial port.
Extends the real GrifoSerialTerminal interface for compatibility.
"""
def __init__(self):
"""Initialize mock serial terminal."""
# Initialize same statistics structure as real terminal
self._serial_stats = {
'total_messages': 0,
'error_messages': 0,
'fatal_messages': 0,
'recycle_count': 0,
'error_details': [],
'fatal_details': [],
'recycle_details': [],
}
self._connected = False
logging.info("[MOCK] Serial terminal initialized")
def connect(self):
"""Simulate serial connection."""
self._connected = True
self._message_timer = time.time() # Track time for periodic messages
logging.info("[MOCK] Serial terminal connected")
# Simulate RECYCLE event at connection (power-on)
if SIMULATE_RECYCLE_EVENTS:
self._simulate_recycle_message()
# Occasional extra RECYCLE during startup (simulate unstable startup)
if SIMULATE_RECYCLE_EVENTS and random.random() < 0.3: # 30% chance
time.sleep(0.1)
self._simulate_recycle_message()
def disconnect(self):
"""Simulate serial disconnection."""
self._connected = False
logging.info("[MOCK] Serial terminal disconnected")
def _simulate_recycle_message(self):
"""Simulate RECYCLE event message."""
timestamp = time.strftime('%H:%M:%S')
message = "%%F-10747-SP*: ***** RECYCLE!******"
self._serial_stats['fatal_messages'] += 1
self._serial_stats['recycle_count'] += 1
self._serial_stats['total_messages'] += 1
self._serial_stats['fatal_details'].append((timestamp, message))
self._serial_stats['recycle_details'].append((timestamp, message))
logging.info(f"[MOCK] Simulated RECYCLE event: {message}")
def _simulate_periodic_messages(self):
"""Simulate occasional serial messages during operation."""
if not self._connected:
return
# Simulate occasional error messages
if SIMULATE_SERIAL_ERRORS and random.random() < 0.05: # 5% chance
timestamp = time.strftime('%H:%M:%S')
error_msgs = [
"%%E-INFO: System status check OK",
"%%E-WARN: Temperature sensor reading nominal",
"%%E-DEBUG: Communication link active",
]
message = random.choice(error_msgs)
self._serial_stats['error_messages'] += 1
self._serial_stats['total_messages'] += 1
self._serial_stats['error_details'].append((timestamp, message))
logging.debug(f"[MOCK] Simulated error message: {message}")
# Simulate occasional fatal messages (rare)
if SIMULATE_SERIAL_FATAL and random.random() < 0.01: # 1% chance
timestamp = time.strftime('%H:%M:%S')
fatal_msgs = [
"%%F-CRITICAL: Subsystem test failed",
"%%F-ALARM: Hardware diagnostic issue",
]
message = random.choice(fatal_msgs)
self._serial_stats['fatal_messages'] += 1
self._serial_stats['total_messages'] += 1
self._serial_stats['fatal_details'].append((timestamp, message))
logging.warning(f"[MOCK] Simulated fatal message: {message}")
def get_serial_statistics(self):
"""Get current serial statistics (same interface as real terminal)."""
# Simulate some periodic messages
self._simulate_periodic_messages()
return {
'total_messages': self._serial_stats['total_messages'],
'error_messages': self._serial_stats['error_messages'],
'fatal_messages': self._serial_stats['fatal_messages'],
'recycle_count': self._serial_stats['recycle_count'],
'error_details': list(self._serial_stats['error_details']),
'fatal_details': list(self._serial_stats['fatal_details']),
'recycle_details': list(self._serial_stats['recycle_details']),
}
def reset_serial_statistics(self):
"""Reset statistics for new test run."""
self._serial_stats = {
'total_messages': 0,
'error_messages': 0,
'fatal_messages': 0,
'recycle_count': 0,
'error_details': [],
'fatal_details': [],
'recycle_details': [],
}
logging.debug("[MOCK] Serial statistics reset")
# ====================
# MOCK POWER CONTROL
# ====================
class MockBrainBox:
"""Mock power control interface (BrainBox)."""
def __init__(self):
self._power_state = False
self._default_timeout = 5.0 # Default timeout for operations in seconds
self.timeout = self._default_timeout # Exposed timeout attribute
logging.info("[MOCK] BrainBox initialized")
def default_timeout(self):
"""Return default timeout for operations."""
return self._default_timeout
def check(self, expected_result, *fields, **kwargs):
"""Simulate power state check."""
if 'MAIN_POWER' in fields:
success = (self._power_state == expected_result)
value = self._power_state
error = None if success else f"Expected {expected_result}, got {value}"
return success, value, error
return False, None, "Unknown field"
def set(self, value, *fields, **kwargs):
"""Simulate power control."""
if 'MAIN_POWER' in fields:
self._power_state = value
state = "ON" if value else "OFF"
logging.info(f"[MOCK] Power {state}")
# If mock 1553 interface is available, trigger its start/stop
try:
import sys
if 'leo_grifo_1553' in sys.modules:
mock_mod = sys.modules['leo_grifo_1553']
if hasattr(mock_mod, 'theGrifo1553') and mock_mod.theGrifo1553 is not None:
try:
iface = mock_mod.theGrifo1553.getInterface()
if value:
# Power ON -> start new run (increments _run_count)
if hasattr(iface, 'start'):
iface.start()
else:
# Power OFF -> stop interface
if hasattr(iface, 'stop'):
iface.stop()
except Exception:
logging.debug('[MOCK] Could not start/stop mock 1553 interface', exc_info=True)
except Exception:
logging.debug('[MOCK] Error while attempting to control mock 1553 interface', exc_info=True)
return True, None
return False, "Unknown field"
# ====================
# MOCK GRIFO INTERFACE WRAPPER
# ====================
class MockGrifoInstrumentInterface:
"""
Mock wrapper that mimics GrifoInstrumentInterface behavior.
This class provides the same interface as the real theGrifo1553 singleton
to ensure transparent operation in simulation mode.
"""
def __init__(self):
self._interface = MockGrifo1553Interface()
self._timeout = 0.2
self.timeout = 0.2 # Add timeout attribute for compatibility
def default_timeout(self):
"""Return default timeout value (method, not attribute)."""
return self._timeout
def getInterface(self):
"""Return the mock interface object."""
return self._interface
def check(self, expected_result, *fields, **kwargs):
"""
Simulate check operation (same signature as real interface).
Args:
expected_result: Expected value or range (min, max) tuple
fields: (message, field) tuple
Returns:
Tuple of (success: bool, value: Any, error: str)
"""
if len(fields) != 2:
return False, None, 'message and field expected as argument'
msg, field = fields
value = self._interface.getMessageFieldValue(msg, field)
# Compare with expected result
if isinstance(expected_result, tuple) and len(expected_result) == 2:
# Range check: (min, max)
min_val, max_val = expected_result
success = (min_val <= value <= max_val)
elif isinstance(expected_result, str):
# String comparison
success = (value == expected_result)
else:
# Direct value comparison
success = (value == expected_result)
error = None if success else f"Expected {expected_result}, got {value}"
return success, value, error
def set(self, value, *fields, **kwargs):
"""Simulate set operation (not used in PBIT test)."""
return True, None
def get(self, *fields, **kwargs):
"""Simulate get operation."""
if len(fields) != 2:
return None, 'message and field expected as argument'
msg, field = fields
value = self._interface.getMessageFieldValue(msg, field)
return value, None
# ====================
# SIMULATION SETUP
# ====================
def initialize_simulation():
"""
Initialize simulation with user interaction.
This function should be called ONCE at the beginning, before setup_simulation().
It asks the user for configuration (number of runs, GUI) and stores it in globals.
"""
global _requested_runs, _gui_enabled
# Only ask if not already initialized
if _requested_runs is not None:
logging.info("[MOCK] Simulation already initialized, skipping user input")
return
# Ask user for configuration if in mixed mode
if SIMULATION_SCENARIO == 'mixed':
_requested_runs = ask_num_runs()
else:
_requested_runs = NUM_RUNS
# Ask whether to simulate target acquisition during runs
global _run_on_target, _target_distance, _target_min_cycles, _target_max_cycles
while True:
try:
tgt_input = input("Simulate target acquisition during runs? (y/n) [n]: ").strip().lower()
if tgt_input in ['', 'n', 'no']:
_run_on_target = False
break
if tgt_input in ['y', 'yes']:
_run_on_target = True
# Ask for fixed target distance (default 1180)
try:
d_in = input(f"Target distance to simulate (meters) [{_target_distance}]: ").strip()
if d_in != '':
_target_distance = int(d_in)
except Exception:
logging.info('[MOCK] Invalid distance input, using default')
# Ask cycles range
try:
min_in = input(f"Min scan cycles before target appears [{_target_min_cycles}]: ").strip()
if min_in != '':
_target_min_cycles = max(1, int(min_in))
max_in = input(f"Max scan cycles before target appears [{_target_max_cycles}]: ").strip()
if max_in != '':
_target_max_cycles = max(_target_min_cycles, int(max_in))
except Exception:
logging.info('[MOCK] Invalid cycles input, using defaults')
break
print("Please answer 'y' or 'n'.")
except (KeyboardInterrupt, EOFError):
_run_on_target = False
break
def setup_simulation():
"""
Setup simulation mode by injecting mock modules BEFORE any imports.
This function creates mock modules and injects them into sys.modules
so that when GRIFO_M_PBIT.py imports them, it gets the mocks instead
of trying to load the real hardware modules.
Creates mock modules:
- leo_grifo_1553 with theGrifo1553 mock singleton
- leo_grifo_io_box with theBrainBox mock singleton
- interpreter (fake SWIG module to prevent import errors)
This must be called BEFORE any code tries to import the hardware modules.
NOTE: User interaction (ask_num_runs) should be done BEFORE calling this function.
"""
import sys
import types
global _gui_monitor, _gui_enabled
# Initialize GUI if enabled (only if not already initialized)
if _gui_enabled and GUI_AVAILABLE and _gui_monitor is None:
logging.info("[MOCK] Starting GUI monitor...")
_gui_monitor = TestMonitorGUI()
_gui_monitor.start()
_gui_monitor.log_event('info', 'Simulation mode activated')
logging.info("[MOCK] GUI monitor started successfully")
logging.info("="*80)
logging.info("SIMULATION MODE ACTIVATED")
logging.info("="*80)
logging.info(f"Scenario: {SIMULATION_SCENARIO}")
if SIMULATION_SCENARIO == 'mixed':
logging.info(f"Number of Runs: {_requested_runs}")
logging.info(f"BIT Time Range: {PBIT_TIME_MIN}-{PBIT_TIME_MAX}s")
logging.info(f"Serial Errors: {SIMULATE_SERIAL_ERRORS}")
logging.info(f"Serial Fatal: {SIMULATE_SERIAL_FATAL}")
logging.info(f"Recycle Events: {SIMULATE_RECYCLE_EVENTS}")
logging.info("="*80)
# Create mock interpreter module (fake SWIG binding)
mock_interpreter = types.ModuleType('interpreter')
sys.modules['interpreter'] = mock_interpreter
logging.info("[MOCK] Injected fake 'interpreter' module (SWIG binding)")
# Create mock leo_grifo_1553 module with theGrifo1553 singleton
mock_leo_grifo_1553 = types.ModuleType('leo_grifo_1553')
mock_leo_grifo_1553.theGrifo1553 = MockGrifoInstrumentInterface()
mock_leo_grifo_1553.GrifoInstrumentInterface = type(mock_leo_grifo_1553.theGrifo1553)
sys.modules['leo_grifo_1553'] = mock_leo_grifo_1553
logging.info("[MOCK] Injected 'leo_grifo_1553' module with mock theGrifo1553")
# Create mock leo_grifo_io_box module with theBrainBox singleton
mock_leo_grifo_io_box = types.ModuleType('leo_grifo_io_box')
mock_leo_grifo_io_box.theBrainBox = MockBrainBox()
mock_leo_grifo_io_box.BrainBox = type(mock_leo_grifo_io_box.theBrainBox)
sys.modules['leo_grifo_io_box'] = mock_leo_grifo_io_box
logging.info("[MOCK] Injected 'leo_grifo_io_box' module with mock theBrainBox")
# Note: Serial terminal is created locally in test_proc(), so we'll need
# to handle it differently. We can't monkey-patch it here.
# Solution: Modify test to check for --simulate and create MockSerialTerminal
logging.info("[MOCK] Simulation setup complete - test can now run without hardware")
def create_mock_terminal():
"""
Create mock serial terminal for simulation mode.
This should be called instead of leo_grifo_terminal.GrifoSerialTerminal()
when --simulate flag is active.
Returns:
MockSerialTerminal instance
"""
global _global_terminal_ref
terminal = MockSerialTerminal()
_global_terminal_ref = terminal # Store reference for RECYCLE coordination
return terminal
# ====================
# CONFIGURATION HELPERS
# ====================
def set_simulation_scenario(scenario: str):
"""
Change simulation scenario dynamically.
Args:
scenario: One of 'normal', 'pedestal_fail', 'random_failures'
"""
global SIMULATION_SCENARIO
SIMULATION_SCENARIO = scenario
logging.info(f"[MOCK] Simulation scenario changed to: {scenario}")
def set_bit_timing(min_time: float, max_time: float):
"""
Configure BIT completion timing range.
Args:
min_time: Minimum BIT time in seconds
max_time: Maximum BIT time in seconds
"""
global PBIT_TIME_MIN, PBIT_TIME_MAX
PBIT_TIME_MIN = min_time
PBIT_TIME_MAX = max_time
logging.info(f"[MOCK] BIT timing set to {min_time}-{max_time}s")
def set_simulation_target(run_on_target: bool, distance: int = 1180, min_cycles: int = 3, max_cycles: int = 12):
"""
Configure target acquisition simulation parameters.
Args:
run_on_target: Enable/disable simulated target acquisition
distance: Fixed target range to present when target appears
min_cycles: Minimum scan cycles before target appears
max_cycles: Maximum scan cycles before target appears
"""
global _run_on_target, _target_distance, _target_min_cycles, _target_max_cycles
_run_on_target = bool(run_on_target)
_target_distance = int(distance)
_target_min_cycles = max(1, int(min_cycles))
_target_max_cycles = max(_target_min_cycles, int(max_cycles))
logging.info(f"[MOCK] Target simulation configured: enabled={_run_on_target}, distance={_target_distance}, cycles={_target_min_cycles}-{_target_max_cycles}")
def mock_tgt_gen_alone(interface) -> bool:
"""
Simulate target acquisition similar to `tgt_gen_alone()` in production.
The mock will wait a random number of scan cycles (between configured min/max)
and then set B9 fields to indicate a found target at the configured distance.
Returns:
True if target simulated, False otherwise
"""
global _run_on_target, _target_distance, _target_min_cycles, _target_max_cycles
if not _run_on_target:
logging.info("[MOCK] Target simulation not enabled")
return False
try:
cycles = random.randint(_target_min_cycles, _target_max_cycles)
logging.info(f"[MOCK] Simulating target acquisition: will appear after {cycles} scan cycles")
# Use interface to simulate activity during cycles
for i in range(cycles):
# Increment message counters if available
try:
interface.getSingleMessageReceivedSz("B9")
except Exception:
pass
time.sleep(0.05)
# Inject target B9 values
try:
# If interface exposes _field_values, update directly
if hasattr(interface, '_field_values'):
interface._field_values['b9_t_num'] = 1
interface._field_values['b9_t1_rng'] = int(_target_distance)
# Record metadata about simulated target for reporting
try:
interface._last_simulated_target = {
'distance': int(_target_distance),
'appeared_after_cycles': cycles
}
except Exception:
pass
else:
# Fallback: attempt to call assignMessageFieldValue/commit if available
if hasattr(interface, 'assignMessageFieldValue'):
try:
interface.assignMessageFieldValue('B9', 'b9_t_num', 1)
interface.assignMessageFieldValue('B9', 'b9_t1_rng', int(_target_distance))
if hasattr(interface, 'commitChanges'):
interface.commitChanges()
except Exception:
pass
except Exception:
logging.debug('[MOCK] Error injecting target fields', exc_info=True)
logging.info(f"[MOCK] Target simulated at distance {_target_distance}")
return True
except Exception:
logging.exception('[MOCK] Exception during mock target acquisition')
return False
if __name__ == '__main__':
print(__doc__)
print("\nThis module should not be run directly.")
print("Usage: python GRIFO_M_PBIT.py --simulate")
print("\nIMPORTANT: Call order for proper initialization:")
print(" 1. initialize_simulation() - Ask user for config (ONCE)")
print(" 2. setup_simulation() - Inject mock modules")
print(" 3. Run test code - Uses mock infrastructure")