""" GRIFO_M_PBIT.py - Automated Power-On BIT Test for GRIFO-F/TH Radar This script performs comprehensive Built-In Test (BIT) verification with power cycling: - Executes configurable number of test repetitions (default: 10) - Power cycles radar between runs to simulate cold-start conditions - Monitors B6 LRU (Line Replaceable Unit) status fields - Performs detailed B8 diagnostic drill-down on real failures - Detects 1553 bus communication loss for fast-fail behavior - Generates comprehensive statistics report with timing analysis Test Flow: 1. Power off radar (wait_before=1s, wait_after=4s for stabilization) 2. Power on radar (wait_after=100ms) and wait for initialization 3. Execute BIT and wait for completion (timeout: 182s) 4. Monitor 1553 bus health continuously during BIT execution 5. Verify all 12 B6 LRU status fields 6. If real failures detected, drill-down into 185 B8 diagnostic fields 7. Track statistics (timing, pass/fail counts, failure details) 8. Repeat for configured number of cycles 9. Generate final comprehensive report with aggregate statistics Configuration Options: NUMBER_OF_REPETITIONS: Number of test cycles to execute (default: 10) PBIT_SEC_TIME: BIT completion timeout in seconds (default: 182s, matches target2) COMM_LOSS_THRESHOLD: 1553 comm loss detection threshold in iterations (default: 20) EXPORT_STATISTICS_CSV: Export statistics to CSV file (default: True) FORCE_B8_DRILL_DOWN: If True, always perform B8 drill-down even when only known failures are detected (matches target2 behavior for complete SW requirement verification). Default: False KNOWN_FAILURES: List of expected failures due to HW setup limitations Power Cycling Timing (aligned with target2): - Power OFF: wait_before=1s (stabilization before action), wait_after=4s (settle time) - Power ON: wait_after=100ms (initialization delay) Author: Test Automation Team Date: 2026-01-29 Last Updated: 2026-02-02 (aligned with target2 timing and behavior) """ import __init__ import signal import time,sys,os import logging import csv import json from datetime import datetime from leo_grifo_common import * from test_common_function import * from leo_grifo_test_report import testReport from leo_grifo_1553 import theGrifo1553 #import leo_grifo_serial_maintnance from logger import logger_setup import leo_grifo_terminal import pdb import traceback # Optional auxiliary serial helper (external script) try: from serial_aux import send_serial_sequence except Exception: send_serial_sequence = None NUMBER_OF_REPETITIONS = 10 # Total test cycles to execute (4 perfect runs = 40%) PBIT_SEC_TIME = 182 # BIT completion timeout in seconds (target2 uses 182s) COMM_LOSS_THRESHOLD = 20 # 1553 bus comm loss detection: iterations without msg count increase EXPORT_STATISTICS_CSV = True # Export statistics to CSV file for Excel/external analysis # ==================== # TARGET DETECTION CONFIGURATION # ==================== # Target detection test runs AFTER PBIT completion (if PBIT passed). # Enable via ask_production_config() or simulation configuration. # This verifies radar can detect simulated targets using B9 message monitoring. TARGET_DETECTION_TIMEOUT_SEC = 10.0 # Target detection timeout: max seconds to wait for target visibility TARGET_EXPECTED_RANGE = 2536 # Expected target range in ICD units (from target3) TARGET_RANGE_TOLERANCE_LOW = 1000 # Range tolerance: lower bound (default: -1000 units) TARGET_RANGE_TOLERANCE_HIGH = 200 # Range tolerance: upper bound (default: +200 units) TARGET_HIT_THRESHOLD = 2 # Number of successful range validations required (at least 2 in 10s) # ==================== # AUX SERIAL CONFIGURATION # ==================== # Enable auxiliary serial helper by default (can be overridden by operator) # Set to False to disable auxiliary serial communication from tests. ENABLE_AUX_SERIAL = False # ==================== # TIMING CONSTANTS # ==================== TELLBACK_VERIFY_TIMEOUT_SEC = 5.0 # Default timeout for verifying tellbacks (STBY/SILENCE) TELLBACK_VERIFY_STEP_SEC = 0.5 # Poll step for tellback verification TELLBACK_POST_SET_DELAY_SEC = 2.0 # Delay after setting STBY/SILENCE to allow radar to update tellbacks STBY_POST_UNSET_DELAY_SEC = 2.0 # Delay after removing STBY to allow radar to exit standby # ==================== # TELLBACK VERIFICATION CONFIGURATION # ==================== # Enable or disable tellback verification on B7 message. # # When ENABLED (True): # - Verifies STANDBY/SILENCE commands via B7 tellbacks # - Adds 2-5 seconds of verification time # - More robust but slower # # When DISABLED (False): # - Uses "fire and forget" approach (style) # - Minimal delays (~1.5s total) # - Faster but assumes commands are accepted # # Default: False (matches proven implementation) ENABLE_TELLBACK_VERIFICATION = False # ==================== # B8 DRILL-DOWN CONFIGURATION # ==================== # FORCE_B8_DRILL_DOWN: If True, always perform B8 drill-down even when only known failures # are detected. This matches target2 behavior where B8 is checked unconditionally to verify # SW requirements and obtain complete step fail statistics. # # Default: False (optimized behavior - B8 only on real failures) # Set to True: Replicate target2 behavior for complete SW requirement verification FORCE_B8_DRILL_DOWN = True # ==================== # KNOWN FAILURES CONFIGURATION # ==================== # List of field names that are expected to fail due to HW test setup limitations. # These failures are tracked but do not trigger B8 drill-down or test failure. # # Use case: When test HW setup lacks physical components (e.g., pedestal unit), # certain status checks will always fail. Adding them here prevents false negatives. # # Format: Full field name from bit_fields tuple # Note: Known failures are reported separately in statistics but don't affect test verdict # Note: radar_fail_status is NOT in this list - it's an aggregate flag checked contextually KNOWN_FAILURES = [ "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status", # Add more known HW setup limitations here as needed ] interruptRequest = False # Global flag for graceful Ctrl-C handling # ==================== # TEST STATISTICS TRACKING # ==================== # Global dictionary to track statistics across all test repetitions. # Populated during test execution and used to generate final comprehensive report. # # Structure: # repetitions: List of dicts, one per run, containing: # - repetition: Run number (1-based) # - pbit_time: BIT completion time in seconds # - bit_available: Boolean, True if BIT completed # - success: Boolean, overall run result (pass/fail) # - b6_total/pass/fail/known_fail: B6 LRU status check counts # - b8_checked/pass/fail: B8 diagnostic check counts # - failures: List of (field_name, value) tuples for real failures # - known_failures: List of (field_name, value) tuples for expected failures # total_runs: Counter for completed test runs # successful_runs: Counter for runs with no real failures # failed_runs: Counter for runs with real failures detected test_statistics = { 'repetitions': [], # List of per-run statistics dictionaries 'total_runs': 0, # Total number of completed runs 'successful_runs': 0, # Number of runs that passed (only known failures allowed) 'failed_runs': 0, # Number of runs with real failures detected } def signal_handler(sig, frame): """Handle Ctrl-C signal for graceful test termination.""" global interruptRequest logging.info("Ctrl-C detected, exiting gracefully...") interruptRequest = True def analyze_test_failures(stats): """ Analyze all test failures across runs and generate detailed failure frequency table. Extracts test identifiers (e.g., SP1, TX10) from field names and aggregates failure counts with descriptions, sorted by failure percentage (descending). Args: stats: test_statistics dictionary containing all run data Returns: list: List of dicts with keys: - test_id: Test identifier (e.g., "SP1", "TX10", "AGC5") - description: Human-readable test description - full_field: Complete field name for reference - occurrences: Number of times this test failed - percentage: Failure rate as percentage of total runs - runs_failed: List of run numbers where this test failed """ import re test_failure_counts = {} # Key: (test_id, description, full_field), Value: list of run numbers total_runs = stats['total_runs'] for run in stats['repetitions']: run_num = run['repetition'] # Analyze both real failures and known failures (for complete picture) all_failures = run.get('failures', []) + run.get('known_failures', []) for failure_item in all_failures: # Handle both tuple formats: (field, value) or (category, field, value) if len(failure_item) >= 2: field_name = failure_item[0] if len(failure_item) == 2 else failure_item[1] else: continue # Extract test identifier from field name # Pattern: "test_XX##_description" where XX is 1-3 letters, ## is 1-2 digits # Examples: "test_sp1_", "test_tx10_", "test_agc5_", "test_is1_" test_match = re.search(r'test_([a-z]{1,3})(\d{1,2})_(.+)', field_name, re.IGNORECASE) if test_match: prefix = test_match.group(1).upper() # SP, TX, AGC, etc. number = test_match.group(2) # 1, 10, 5, etc. description = test_match.group(3) # timer1_up, hv_ps_over_temperature_warning, etc. test_id = f"{prefix}{number}" # "SP1", "TX10", etc. # Clean up description: replace underscores with spaces, capitalize clean_desc = description.replace('_', ' ').title() key = (test_id, clean_desc, field_name) if key not in test_failure_counts: test_failure_counts[key] = [] test_failure_counts[key].append(run_num) # Build result list failure_analysis = [] for (test_id, description, full_field), run_list in test_failure_counts.items(): occurrences = len(run_list) percentage = (occurrences / total_runs * 100) if total_runs > 0 else 0 failure_analysis.append({ 'test_id': test_id, 'description': description, 'full_field': full_field, 'occurrences': occurrences, 'percentage': percentage, 'runs_failed': sorted(set(run_list)) # Unique, sorted run numbers }) # Sort by percentage (descending), then by test_id (ascending) for ties failure_analysis.sort(key=lambda x: (-x['percentage'], x['test_id'])) return failure_analysis def generate_final_statistics_report(report, stats): """ Generate comprehensive final statistics report with professional PDF formatting. Instead of using add_comment() which registers text as steps, this function prepares structured data and passes it to the PDF generator for rendering as dedicated sections with professional tables. Produces professional test summary suitable for formal documentation and presentations. Includes aggregate statistics, timing analysis, failure categorization, and test verdict. Args: report: testReport object for PDF generation stats: test_statistics dictionary containing all run data Report Sections (rendered as dedicated PDF chapters): 1. Per-Run Summary: Table with 1553 + Serial stats for each run 2. Global Aggregate: Combined statistics from all runs 3. Timing Analysis: PBIT performance metrics 4. Known Failures: HW setup limitations tracking 5. Test Failure Analysis: Detailed frequency table of failed tests (NEW) Returns: None (data is passed to report via set_custom_statistics) """ # Prepare structured data for PDF generation instead of ASCII art # Calculate aggregate statistics total_b6_checks = sum(r['b6_total'] for r in stats['repetitions']) total_b6_pass = sum(r['b6_pass'] for r in stats['repetitions']) total_b6_fail = sum(r['b6_fail'] for r in stats['repetitions']) total_b6_known = sum(r['b6_known_fail'] for r in stats['repetitions']) total_b8_checks = sum(r['b8_checked'] for r in stats['repetitions']) total_b8_pass = sum(r['b8_pass'] for r in stats['repetitions']) total_b8_fail = sum(r['b8_fail'] for r in stats['repetitions']) total_serial_msgs = sum(r.get('serial_total', 0) for r in stats['repetitions']) total_serial_errors = sum(r.get('serial_errors', 0) for r in stats['repetitions']) total_serial_fatal = sum(r.get('serial_fatal', 0) for r in stats['repetitions']) total_serial_recycles = sum(r.get('serial_recycles', 0) for r in stats['repetitions']) # Calculate target detection statistics target_tests = [r.get('target_detected', None) for r in stats['repetitions']] target_detected_count = sum(1 for t in target_tests if t is True) target_not_detected_count = sum(1 for t in target_tests if t is False) target_pass_rate = (target_detected_count / len(target_tests) * 100) if target_tests else 0.0 target_times = [r.get('target_test_time', 0.0) for r in stats['repetitions'] if r.get('target_detected') is not None] if target_times: avg_target_time = sum(target_times) / len(target_times) min_target_time = min(target_times) max_target_time = max(target_times) else: avg_target_time = min_target_time = max_target_time = 0.0 # Calculate timing statistics pbit_times = [r['pbit_time'] for r in stats['repetitions'] if r['bit_available']] if pbit_times: avg_pbit = sum(pbit_times) / len(pbit_times) min_pbit = min(pbit_times) max_pbit = max(pbit_times) variance = sum((t - avg_pbit) ** 2 for t in pbit_times) / len(pbit_times) std_dev = variance ** 0.5 else: avg_pbit = min_pbit = max_pbit = std_dev = None # Generate detailed test failure analysis test_failure_analysis = analyze_test_failures(stats) # Prepare structured data dictionary for PDF rendering custom_statistics = { 'repetitions': stats['repetitions'], # Per-run data with all metrics 'aggregate': { # Overall test summary 'total_runs': stats['total_runs'], 'successful_runs': stats['successful_runs'], 'failed_runs': stats['failed_runs'], # 1553 Bus statistics 'total_b6_checks': total_b6_checks, 'total_b6_pass': total_b6_pass, 'total_b6_fail': total_b6_fail, 'total_b6_known': total_b6_known, 'total_b8_checks': total_b8_checks, 'total_b8_pass': total_b8_pass, 'total_b8_fail': total_b8_fail, # Serial communication statistics 'total_serial_msgs': total_serial_msgs, 'total_serial_errors': total_serial_errors, 'total_serial_fatal': total_serial_fatal, 'total_serial_recycles': total_serial_recycles, # Target detection statistics 'target_detected_count': target_detected_count, 'target_not_detected_count': target_not_detected_count, 'target_pass_rate': target_pass_rate, 'avg_target_time': avg_target_time, 'min_target_time': min_target_time, 'max_target_time': max_target_time, # Timing analysis 'avg_pbit_time': avg_pbit, 'min_pbit_time': min_pbit, 'max_pbit_time': max_pbit, 'std_dev_pbit': std_dev, }, 'test_failure_analysis': test_failure_analysis # NEW: Detailed test failure frequency table } # Pass structured data to report for professional PDF rendering # This will generate dedicated chapters with native PDF tables # instead of mixing ASCII art with step execution logs report.set_custom_statistics(custom_statistics) # Log summary to console for immediate feedback logging.info("="*90) logging.info(" FINAL TEST STATISTICS SUMMARY") logging.info("="*90) logging.info(f"Total Runs: {stats['total_runs']}") logging.info(f"Successful: {stats['successful_runs']} ({stats['successful_runs']/stats['total_runs']*100:.1f}%)") logging.info(f"Failed: {stats['failed_runs']} ({stats['failed_runs']/stats['total_runs']*100:.1f}%)") logging.info(f"B6 Checks: {total_b6_checks} (Pass: {total_b6_pass}, Fail: {total_b6_fail}, Known: {total_b6_known})") logging.info(f"B8 Checks: {total_b8_checks} (Pass: {total_b8_pass}, Fail: {total_b8_fail})") logging.info(f"Serial: {total_serial_msgs} messages ({total_serial_errors} errors, {total_serial_fatal} fatal, {total_serial_recycles} recycles)") logging.info(f"Target: {target_detected_count} detected, {target_not_detected_count} not detected ({target_pass_rate:.1f}% pass rate)") if avg_pbit is not None: logging.info(f"PBIT Timing: avg={avg_pbit:.2f}s, min={min_pbit:.2f}s, max={max_pbit:.2f}s, σ={std_dev:.2f}s") logging.info("="*90) logging.info("Detailed statistics will be available in the PDF report") # Return custom_statistics for optional CSV export return custom_statistics def export_statistics_to_csv(custom_statistics, test_name, output_folder): """ Export test statistics to CSV file for external analysis (Excel, etc.). Creates a CSV file with three sections: 1. Per-Run Statistics: Detailed results for each run 2. Aggregate Statistics: Overall summary metrics 3. Problem Distribution: Analysis of failure types Args: custom_statistics: Dictionary with 'repetitions' and 'aggregate' data test_name: Base name for the CSV file (e.g., "GRIFO_M_PBIT_20260129_153432") output_folder: Absolute path to folder where CSV will be saved (same as PDF) Returns: Path to generated CSV file, or None if export failed """ try: # Create output folder if it doesn't exist if not os.path.exists(output_folder): os.makedirs(output_folder) logging.info(f"Created output folder: {output_folder}") # Create CSV filename with absolute path csv_filename = f"{test_name}_statistics.csv" csv_path = os.path.join(output_folder, csv_filename) logging.info(f"Exporting statistics to CSV: {csv_path}") with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) # Section 1: Per-Run Statistics writer.writerow(['PER-RUN STATISTICS']) writer.writerow([]) # Blank line # Headers for per-run data (append detailed columns as JSON strings) headers = [ 'Run', 'Result', 'PBIT Time (s)', 'Start Time', 'End Time', 'Run Duration (s)', 'Scenario', 'B6 Total', 'B6 Pass', 'B6 Fail', 'B6 Known', 'B8 Checked', 'B8 Pass', 'B8 Fail', 'Serial Msgs', 'Serial Errors', 'Serial Fatal', 'Serial Recycles', 'Real Failures', 'Known Failures', 'Target Simulated', 'Failures Detail (JSON)', 'Known Failures Detail (JSON)', 'Serial Details (JSON)' ] writer.writerow(headers) # Per-run data rows for run in custom_statistics['repetitions']: # Prepare detailed JSON fields for precise per-run analysis # failures: list of tuples (field, error) or (category, field, error) failures = [] for item in run.get('failures', []): try: # normalize tuple/list entries if isinstance(item, (list, tuple)): failures.append(list(item)) else: failures.append([str(item)]) except Exception: failures.append([str(item)]) known_failures = [] for item in run.get('known_failures', []): try: if isinstance(item, (list, tuple)): known_failures.append(list(item)) else: known_failures.append([str(item)]) except Exception: known_failures.append([str(item)]) serial_details = run.get('serial_details', []) row = [ run['repetition'], 'PASS' if run['success'] else 'FAIL', f"{run.get('pbit_time', 0):.2f}", run.get('start_time', ''), run.get('end_time', ''), f"{run.get('run_duration', 0) if run.get('run_duration') is not None else 0:.2f}", run.get('scenario', ''), run.get('b6_total', 0), run.get('b6_pass', 0), run.get('b6_fail', 0), run.get('b6_known_fail', 0), run.get('b8_checked', 0), run.get('b8_pass', 0), run.get('b8_fail', 0), run.get('serial_total', 0), run.get('serial_errors', 0), run.get('serial_fatal', 0), run.get('serial_recycles', 0), len(run.get('failures', [])), len(run.get('known_failures', [])), json.dumps(run.get('target_simulated', None), ensure_ascii=False), json.dumps(failures, ensure_ascii=False), json.dumps(known_failures, ensure_ascii=False), json.dumps(serial_details, ensure_ascii=False), ] writer.writerow(row) writer.writerow([]) # Blank line writer.writerow([]) # Extra blank line # Section 2: Aggregate Statistics writer.writerow(['AGGREGATE STATISTICS']) writer.writerow([]) # Blank line writer.writerow(['Metric', 'Value']) agg = custom_statistics['aggregate'] # Overall metrics writer.writerow(['Total Runs', agg['total_runs']]) writer.writerow(['Successful Runs', agg['successful_runs']]) writer.writerow(['Failed Runs', agg['failed_runs']]) writer.writerow(['Success Rate (%)', f"{agg['successful_runs']/agg['total_runs']*100:.1f}" if agg['total_runs'] > 0 else "0.0"]) writer.writerow([]) # Blank line # B6 LRU Status writer.writerow(['B6 Total Checks', agg['total_b6_checks']]) writer.writerow(['B6 Pass', agg['total_b6_pass']]) writer.writerow(['B6 Fail', agg['total_b6_fail']]) writer.writerow(['B6 Known Fail', agg['total_b6_known']]) writer.writerow([]) # Blank line # B8 Diagnostics writer.writerow(['B8 Total Checks', agg['total_b8_checks']]) writer.writerow(['B8 Pass', agg['total_b8_pass']]) writer.writerow(['B8 Fail', agg['total_b8_fail']]) writer.writerow([]) # Blank line # Serial Communication writer.writerow(['Serial Total Messages', agg['total_serial_msgs']]) writer.writerow(['Serial Errors', agg['total_serial_errors']]) writer.writerow(['Serial Fatal', agg['total_serial_fatal']]) writer.writerow(['Serial Recycles', agg['total_serial_recycles']]) writer.writerow([]) # Blank line # Timing Statistics writer.writerow(['Average PBIT Time (s)', f"{agg['avg_pbit_time']:.2f}" if agg['avg_pbit_time'] is not None else "N/A"]) writer.writerow(['Min PBIT Time (s)', f"{agg['min_pbit_time']:.2f}" if agg['min_pbit_time'] is not None else "N/A"]) writer.writerow(['Max PBIT Time (s)', f"{agg['max_pbit_time']:.2f}" if agg['max_pbit_time'] is not None else "N/A"]) writer.writerow(['Std Dev PBIT Time (s)', f"{agg['std_dev_pbit']:.2f}" if agg['std_dev_pbit'] is not None else "N/A"]) writer.writerow([]) # Blank line writer.writerow([]) # Extra blank line # Section 3: Known Failures (Ignored) writer.writerow(['KNOWN FAILURES (IGNORED IN STATISTICS)']) writer.writerow([]) writer.writerow(['These failures are expected due to HW test setup limitations and do not affect test verdict:']) writer.writerow([]) # List known failures from KNOWN_FAILURES constant from GRIFO_M_PBIT import KNOWN_FAILURES for known_field in KNOWN_FAILURES: # Extract clean field name if 'RdrHealthStatusAndBitReport_' in known_field: clean_name = known_field.split('RdrHealthStatusAndBitReport_')[-1] else: clean_name = known_field.split('_')[-1] if '_' in known_field else known_field clean_name = clean_name.replace('_', ' ').title() writer.writerow([f" - {clean_name}"]) writer.writerow([]) # Blank line writer.writerow([]) # Extra blank line # Section 4: Test Failure Analysis (NEW - Detailed test-by-test breakdown) writer.writerow(['TEST FAILURE ANALYSIS']) writer.writerow([]) # Blank line writer.writerow(['Detailed breakdown of individual test failures, sorted by failure percentage (highest first)']) writer.writerow([]) # Blank line test_failure_analysis = custom_statistics.get('test_failure_analysis', []) if test_failure_analysis: # Table headers writer.writerow(['Test ID', 'Description', 'Occurrences', '% of Total Runs', 'Runs Where Failed', 'Full Field Name']) for test in test_failure_analysis: runs_str = ', '.join(map(str, test['runs_failed'])) writer.writerow([ test['test_id'], test['description'], test['occurrences'], f"{test['percentage']:.1f}%", runs_str, test['full_field'] ]) else: writer.writerow(['No test failures detected - all tests passed!']) writer.writerow([]) # Blank line writer.writerow([]) # Extra blank line # Section 5: Problem Distribution Analysis (category-level summary) writer.writerow(['PROBLEM DISTRIBUTION ANALYSIS (Category Level)']) writer.writerow([]) # Blank line # Analyze problem types from repetitions (same logic as PDF) problem_counts = {} total_runs = agg['total_runs'] perfect_runs = agg['successful_runs'] for run in custom_statistics['repetitions']: if not run['success']: # Extract FULL field names from failures (not just last parts) for field, value in run['failures']: # Remove common prefix but keep full field identifier # Example: "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status" # -> "processor_status" if 'RdrHealthStatusAndBitReport_' in field: # Extract everything after message name test_name_clean = field.split('RdrHealthStatusAndBitReport_')[-1] elif '_' in field and len(field.split('_')) > 3: # For other messages, keep last 4 parts for context parts = field.split('_') test_name_clean = '_'.join(parts[-4:]) else: test_name_clean = field # Clean up for display (capitalize, keep underscores for clarity) test_name_clean = test_name_clean.replace('_', ' ').title() problem_counts[test_name_clean] = problem_counts.get(test_name_clean, 0) + 1 # Serial problems if run.get('serial_fatal', 0) > 0: problem_counts['Serial Communication (Fatal)'] = problem_counts.get('Serial Communication (Fatal)', 0) + 1 if run.get('serial_recycles', 0) > 1: problem_counts['System Instability (Recycles)'] = problem_counts.get('System Instability (Recycles)', 0) + 1 if problem_counts: # Sort by frequency (descending) sorted_problems = sorted(problem_counts.items(), key=lambda x: x[1], reverse=True) writer.writerow(['Problem Type', 'Occurrences', '% of Total Runs', '% of Failed Runs']) for problem, count in sorted_problems: pct_total = (count / total_runs * 100) if total_runs > 0 else 0 pct_failed = (count / (total_runs - perfect_runs) * 100) if (total_runs - perfect_runs) > 0 else 0 writer.writerow([ problem, count, f"{pct_total:.1f}", f"{pct_failed:.1f}" ]) else: writer.writerow(['No problems detected - all runs were successful!']) logging.info(f"Statistics exported successfully to: {csv_path}") return csv_path except Exception as e: logging.error(f"Failed to export statistics to CSV: {e}") logging.error(traceback.format_exc()) return None def tgt_gen(interface, timeout_sec=None, expected_range=2536, range_tolerance=(1000, 200), only_bc=True, enable_stim=True, hit_threshold=10): """ Target generation and detection test: stimulates radar with nav data and monitors B9 for target. This function implements active target generation by: 1. Optionally setting radar to STBY mode 2. Stimulating A4 (Nav Data) and A5 (INU High Speed) with incremental timetag 3. Monitoring B9 message for target detection (b9_t_num > 0) 4. Validating target range against expected value with tolerance Based on target3 implementation with improvements: - Configurable parameters (range, tolerance, hit threshold) - Structured return dict with detailed results - Timetag wrapping at 0x70ff - Delta message counting for monitoring Args: interface: Grifo1553 interface object timeout_sec: Max seconds to wait for target (default: TARGET_DETECTION_TIMEOUT_SEC) expected_range: Expected target range in ICD units (default: 2536) range_tolerance: (lower_tol, upper_tol) for range validation (default: (1000, 200)) only_bc: If True, send A2 STBY command before starting (default: True) enable_stim: If True, actively stimulate A4/A5 messages (default: True) hit_threshold: Number of successful detections before returning (default: 10) Returns: dict: { 'detected': bool, # True if target detected within timeout 'hits': int, # Number of successful range validations 'range': float, # Last detected target range (ICD units) 'iterations': int, # Total loop iterations executed 'time_to_detect': float, # Seconds until first detection (or timeout) 'message_count': int, # Final B9 message count 'timetag_final': int # Final timetag value } """ if timeout_sec is None: timeout_sec = TARGET_DETECTION_TIMEOUT_SEC logging.info(f'tgt_gen() - timeout: {timeout_sec}s, range: {expected_range}±({range_tolerance[0]},{range_tolerance[1]})') # Get native interface for message counting (SWIG methods) try: native_interface = interface.getInterface() logging.info(f'[tgt_gen] Using native interface: {type(native_interface)}') except Exception as e: logging.warning(f'[tgt_gen] Could not get native interface: {e}, using wrapper') native_interface = interface # Configuration period_ms = 20 # Loop period in milliseconds (aligned with real hardware) timetag_increment = 150 # Timetag increment per iteration timetag_wrap = 0x70ff # Timetag wrap point timetag_reset = 10 # Timetag reset value after wrap # Initialize result structure result = { 'detected': False, 'hits': 0, 'range': 0, 'iterations': 0, 'time_to_detect': timeout_sec, 'message_count': 0, 'timetag_final': 0 } start_time = time.perf_counter() end_time = start_time + timeout_sec # Calculate absolute timeout # Phase 1: Optional STBY mode setting if only_bc: try: setValue(theGrifo1553, 0, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True) logging.info('[tgt_gen] Set radar to STBY mode') time.sleep(0.5) # Allow radar to process except Exception as e: logging.warning(f'[tgt_gen] Could not set STBY mode: {e}') # Phase 2: Initial A5 stimulation (if enabled) tt = 0 # Timetag counter if enable_stim: logging.info('[tgt_gen] Sending initial A5 messages...') a5_errors = 0 for i in range(10): time.sleep(0.020) try: ret, err = setValue(theGrifo1553, tt, "A5_MsgInuHighSpeed", "timetag_RelativeTimetag_raw", commitChanges=True) if not ret: a5_errors += 1 if i == 0: # Log only first failure logging.warning(f'[tgt_gen] A5 stim failed: {err}') except Exception as e: a5_errors += 1 if i == 0: # Log only first exception logging.warning(f'[tgt_gen] A5 stim exception: {e}') tt += timetag_increment if a5_errors >= 5: logging.warning(f'[tgt_gen] A5 stimulation not supported ({a5_errors}/10 errors) - may affect target detection') # Phase 3: Main loop - stimulate A4 and monitor B9 # Note: In simulation mode, target is already configured in _initialize_field_values() p_tt = 0 # Previous timetag from B9 hit = 0 # Successful range validations pcnt = 0 # Previous B9 message count max_iterations = int(timeout_sec / (period_ms / 1000.0)) first_detection = None logging.info(f'[tgt_gen] Starting target detection loop (max {max_iterations} iterations, timeout {timeout_sec}s)...') for i in range(max_iterations): # Check timeout FIRST before any processing current_time = time.perf_counter() if current_time >= end_time: elapsed = current_time - start_time logging.warning(f'[tgt_gen] ✗ TIMEOUT: No target detected after {elapsed:.1f}s (max: {timeout_sec}s)') result['time_to_detect'] = elapsed break # Log every 100 iterations to show progress without spam if i % 100 == 0: elapsed = current_time - start_time remaining = timeout_sec - elapsed logging.info(f'[tgt_gen] Loop iteration {i}/{max_iterations}... (elapsed: {elapsed:.1f}s, timeout in: {remaining:.1f}s)') time.sleep(period_ms / 1000.0) result['iterations'] = i + 1 # Stimulate A4 with incremental timetag if enable_stim: try: # Log periodically instead of every setValue to reduce log spam if i % 50 == 0: logging.debug(f'[tgt_gen] Stimulating A4 timetag (iter {i}, tt={tt})') ret, err = setValue(theGrifo1553, tt, "A4_MsgNavDataAndCursor", "timetag_RelativeTimetag_raw", commitChanges=True, verbose=False) if not ret and i == 0: logging.warning(f'[tgt_gen] A4 stim failed: {err} - disabling stimulation') enable_stim = False # Disable if not supported except Exception as e: if i == 0: # Log only first error to avoid spam logging.warning(f'[tgt_gen] A4 stim exception: {e} - disabling stimulation') enable_stim = False # Disable if exception occurs # Update timetag with wrapping tt += timetag_increment if tt > timetag_wrap: tt = timetag_reset result['timetag_final'] = tt # Read B9 message try: if i % 100 == 0: logging.info(f'[tgt_gen] Reading B9 message at iter {i}...') cnt = native_interface.getSingleMessageReceivedSz("B9") t_num = native_interface.getMessageFieldValue("B9", "b9_t_num") t_rng = native_interface.getMessageFieldValue("B9", "b9_t1_rng") t_tt = native_interface.getMessageFieldValue("B9", "b9_w12") result['message_count'] = cnt if cnt is not None else 0 # Log B9 values every 100 iterations if i % 100 == 0: logging.info(f'[tgt_gen] B9 values: cnt={cnt}, t_num={t_num}, t_rng={t_rng}, t_tt={t_tt}, p_tt={p_tt}') except Exception as e: logging.warning(f'[tgt_gen] B9 read error (iter {i}): {e}') if interruptRequest: break continue # Initialize p_tt on first valid read if p_tt == 0 and t_tt is not None: try: t_tt_int = int(t_tt) if isinstance(t_tt, (int, float)) else 0 if t_tt_int != 0: # Only initialize if timetag is non-zero p_tt = t_tt_int logging.info(f'[tgt_gen] Initialized p_tt={p_tt} at iteration {i}') continue # Skip detection on init iteration except Exception as e: logging.warning(f'[tgt_gen] Failed to initialize p_tt: {e}') # Log message delta every 10 iterations if (i % 10) == 0: if cnt is not None and cnt >= 0: dcnt = cnt - pcnt pcnt = cnt else: dcnt = -1 logging.debug(f'[tgt_gen] Iter {i:3d}: B9 count={cnt}, delta={dcnt}, tt={tt:04x}') # Check for target detection target_detected = False try: t_num_int = int(t_num) if t_num is not None else 0 t_tt_int = int(t_tt) if t_tt is not None else p_tt # Log detection check every 10 iterations if i % 10 == 0: logging.info(f'[tgt_gen] Detection check: t_num_int={t_num_int}, t_tt_int={t_tt_int}, p_tt={p_tt}, condition={(t_num_int > 0) or (p_tt != 0 and t_tt_int != p_tt)}') # Detection condition: t_num > 0 OR timetag changed if (t_num_int > 0) or (p_tt != 0 and t_tt_int != p_tt): target_detected = True result['range'] = float(t_rng) if t_rng is not None else 0 logging.info(f'[tgt_gen] ✓ TARGET DETECTED at iter {i}: num={t_num_int}, rng={t_rng}, tt={t_tt_int:04x}, p_tt={p_tt:04x}') # Validate range range_min = expected_range - range_tolerance[0] range_max = expected_range + range_tolerance[1] logging.info(f'[tgt_gen] Validating range: {t_rng} in [{range_min}, {range_max}]') ret_proc_sts, err = check(theGrifo1553, (range_min, range_max), "B9", "b9_t1_rng") if ret_proc_sts: hit += 1 result['hits'] = hit result['detected'] = True if first_detection is None: first_detection = time.perf_counter() result['time_to_detect'] = first_detection - start_time logging.info(f'[tgt_gen] ✓ Range validation passed (hit {hit}/{hit_threshold})') # Exit after threshold hits if hit >= hit_threshold: logging.info(f'[tgt_gen] Target acquisition complete ({hit} successful hits)') break else: logging.warning(f'[tgt_gen] ✗ Range validation failed: {t_rng} not in [{range_min}, {range_max}]') p_tt = t_tt_int except Exception as e: logging.debug(f'[tgt_gen] Detection check error: {e}') # Check for interrupt if interruptRequest: logging.info('[tgt_gen] Interrupted by user (Ctrl-C)') break # Final summary elapsed = time.perf_counter() - start_time if result['detected']: logging.info(f"[tgt_gen] ✓ SUCCESS: Target detected after {result['time_to_detect']:.2f}s " f"({result['hits']} hits, range={result['range']:.0f})") else: logging.warning(f"[tgt_gen] ✗ TIMEOUT: No target detected after {elapsed:.2f}s " f"({result['iterations']} iterations, {result['message_count']} B9 messages)") # Store result on interface for statistics try: if not hasattr(interface, '_last_target'): interface._last_target = {} interface._last_target = { 'detected': result['detected'], 'distance': result['range'], 'hits': result['hits'], 'iterations': result['iterations'], 'time_sec': result['time_to_detect'] } except Exception: pass return result def prepare_radar_for_target_test(interface, altitude_ft=8202, z_accel_g=1.08, azimuth_scan_width=1, range_scale=2, num_bars=1, inu_mode_word=0xAA54, intensity=127): """ Configure radar for target generation test. This function prepares the radar in the operational configuration required for target detection tests based on target3 implementation: 1. Set SILENCE mode (RF radiation OFF) 2. Set STBY mode 3. Configure scan parameters (azimuth, range scale, bars) 4. Configure navigation data (altitude, acceleration) 5. Configure INU mode word 6. Set symbology intensity 7. Verify tellbacks (STBY and SILENCE active) Args: interface: Grifo1553 interface object altitude_ft: Barometric altitude in feet (default: 8202 ft = 2500m) z_accel_g: Z-axis acceleration in g (default: 1.08g = 1060 raw units) azimuth_scan_width: Azimuth scan width selection (default: 1) range_scale: Range scale selection (default: 2, ~40nm) num_bars: Number of bars selection (default: 1) inu_mode_word: INU mode word (default: 0xAA54 from target3) intensity: Symbology intensity 0-255 (default: 127) Returns: dict: Configuration result with success status and details """ logging.info('[prepare_target_test] Configuring radar for target generation test...') result = { 'success': True, 'steps': {}, 'errors': [] } # Step 1: Set SILENCE mode (RF radiation OFF) try: ret, err = setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_silence", commitChanges=True) result['steps']['silence'] = ret if not ret: result['errors'].append(f'SILENCE command failed: {err}') result['success'] = False else: logging.info('[prepare_target_test] ✓ SILENCE mode set') time.sleep(TELLBACK_POST_SET_DELAY_SEC) except Exception as e: result['steps']['silence'] = False result['errors'].append(f'SILENCE exception: {e}') result['success'] = False logging.error(f'[prepare_target_test] ✗ SILENCE failed: {e}') # Step 2: Set STBY mode try: ret, err = setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True) result['steps']['stby'] = ret if not ret: result['errors'].append(f'STBY command failed: {err}') result['success'] = False else: logging.info('[prepare_target_test] ✓ STBY mode set') time.sleep(TELLBACK_POST_SET_DELAY_SEC) except Exception as e: result['steps']['stby'] = False result['errors'].append(f'STBY exception: {e}') result['success'] = False logging.error(f'[prepare_target_test] ✗ STBY failed: {e}') # Step 3: Configure scan parameters (A2 param1 fields) scan_params = [ (azimuth_scan_width, "param1_RdrFunAndParam1_azimuth_scan_width_selection", "Azimuth scan width"), (range_scale, "param1_RdrFunAndParam1_range_scale_selection", "Range scale"), (num_bars, "param1_RdrFunAndParam1_number_of_bars_selection", "Number of bars"), ] for value, field, desc in scan_params: try: ret, err = setValue(theGrifo1553, value, "A2_MsgRdrOperationCommand", field, commitChanges=True) result['steps'][field] = ret if ret: logging.info(f'[prepare_target_test] ✓ {desc} = {value}') else: logging.warning(f'[prepare_target_test] ⚠ {desc} failed: {err}') except Exception as e: result['steps'][field] = False logging.warning(f'[prepare_target_test] ⚠ {desc} exception: {e}') # Step 4: Configure navigation data (A4) # Convert altitude and acceleration to ICD raw units try: # Altitude: feet to raw units (assumption: raw = ft * 0.305 for meters conversion) altitude_raw = int(altitude_ft * 0.3048 / 1) # Convert ft to meters, then to raw (1:1?) # Use target3 value directly: 2500 raw units altitude_raw = 2500 # Z-acceleration: g to raw units # target3 uses 1060 raw = 1.08g, so raw = g * 981.5 z_accel_raw = int(z_accel_g * 981.5) nav_params = [ (z_accel_raw, "z_acceleration_Acceleration_raw", "Z acceleration", False), (altitude_raw, "corrected_baro_altitude_BaroAltitude_raw", "Barometric altitude", False), (altitude_raw, "radio_altitude_RadioAltitude_raw", "Radio altitude", True), # Only last one commits ] for value, field, desc, do_commit in nav_params: try: ret, err = setValue(theGrifo1553, value, "A4_MsgNavDataAndCursor", field, commitChanges=do_commit) result['steps'][field] = ret if ret: logging.info(f'[prepare_target_test] ✓ {desc} = {value}') else: logging.warning(f'[prepare_target_test] ⚠ {desc} failed: {err}') except Exception as e: result['steps'][field] = False logging.warning(f'[prepare_target_test] ⚠ {desc} exception: {e}') except Exception as e: logging.warning(f'[prepare_target_test] ⚠ Nav data config exception: {e}') # Step 5: Configure INU mode word (A5) try: ret, err = setValue(theGrifo1553, inu_mode_word, "A5_MsgInuHighSpeed", "mode_word", commitChanges=True) result['steps']['inu_mode'] = ret if ret: logging.info(f'[prepare_target_test] ✓ INU mode word = 0x{inu_mode_word:04X}') else: logging.warning(f'[prepare_target_test] ⚠ INU mode word failed: {err}') except Exception as e: result['steps']['inu_mode'] = False logging.warning(f'[prepare_target_test] ⚠ INU mode word exception: {e}') # Step 6: Set symbology intensity (A1) try: ret, err = setValue(theGrifo1553, intensity, "A1_MsgRdrSettingsAndParameters", "settings_RDROperationalSettings_rdr_symbology_intensity", commitChanges=True) result['steps']['intensity'] = ret if ret: logging.info(f'[prepare_target_test] ✓ Symbology intensity = {intensity}') else: logging.warning(f'[prepare_target_test] ⚠ Intensity failed: {err}') except Exception as e: result['steps']['intensity'] = False logging.warning(f'[prepare_target_test] ⚠ Intensity exception: {e}') # Step 7: Verify tellbacks (STBY and SILENCE active) - inline verification try: stby_ok = False silence_ok = False verify_details = {} # Try to verify STBY and SILENCE tellbacks start_verify = time.time() while (time.time() - start_verify) < TELLBACK_VERIFY_TIMEOUT_SEC: try: # Read tellback values directly to avoid type conversion issues stby_val = interface.getMessageFieldValue("B7_MsgRdrStatusTellback", "rdr_mode_tellback_RdrStatusTellback_stby_tellback") silence_val = interface.getMessageFieldValue("B7_MsgRdrStatusTellback", "rdr_mode_tellback_RdrStatusTellback_silence_tellback") # Check if values are available (not None) and match expected state # Accept multiple formats: "STBY_ON", "1", 1, True, etc. stby_str = str(stby_val).upper() if stby_val is not None else "" silence_str = str(silence_val).upper() if silence_val is not None else "" stby_ok = stby_val is not None and (stby_str in ["STBY_ON", "1", "TRUE"]) silence_ok = silence_val is not None and (silence_str in ["SILENCE_ON", "1", "TRUE"]) verify_details['stby'] = (stby_ok, stby_val) verify_details['silence'] = (silence_ok, silence_val) if stby_ok and silence_ok: break time.sleep(TELLBACK_VERIFY_STEP_SEC) except Exception as ve: verify_details['exception'] = str(ve) logging.debug(f'[prepare_target_test] Tellback read exception: {ve}') time.sleep(TELLBACK_VERIFY_STEP_SEC) result['steps']['stby_verified'] = stby_ok result['steps']['silence_verified'] = silence_ok result['verify_details'] = verify_details if stby_ok and silence_ok: logging.info('[prepare_target_test] ✓ Tellbacks verified: STBY and SILENCE active') else: logging.warning(f'[prepare_target_test] ⚠ Tellback verification incomplete: ' f'STBY={stby_ok}, SILENCE={silence_ok}') # Don't fail overall config if only verification fails except Exception as e: logging.warning(f'[prepare_target_test] ⚠ Tellback verification exception: {e}') # Final summary if result['success']: logging.info('[prepare_target_test] ✓ Radar configuration complete') else: logging.error(f"[prepare_target_test] ✗ Configuration incomplete ({len(result['errors'])} errors)") for err in result['errors']: logging.error(f" - {err}") return result def cleanup_radar_after_target_test(interface): """ Clean up radar configuration after target test. This function: 1. Unsets STBY mode (allow radar to exit standby) 2. Waits for radar to process the command 3. Optionally verifies STBY is inactive Args: interface: Grifo1553 interface object Returns: bool: True if cleanup successful, False otherwise """ logging.info('[cleanup_target_test] Cleaning up radar after target test...') success = True # Unset STBY mode (set to 0) try: ret, err = setValue(theGrifo1553, 0, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True) if ret: logging.info('[cleanup_target_test] ✓ STBY mode unset') time.sleep(STBY_POST_UNSET_DELAY_SEC) else: logging.warning(f'[cleanup_target_test] ⚠ STBY unset failed: {err}') success = False except Exception as e: logging.error(f'[cleanup_target_test] ✗ STBY unset exception: {e}') success = False # Optionally unset SILENCE (allow RF radiation if needed) # NOTE: In production, may want to keep SILENCE ON for safety # For now, we leave SILENCE as-is and let operator control it if success: logging.info('[cleanup_target_test] ✓ Cleanup complete') else: logging.warning('[cleanup_target_test] ⚠ Cleanup incomplete') return {'success': success, 'errors': []} def execute_pbit_test(interface, report, run_stats, bit_fields, bit_fields_categories, gui_monitor=None): """ Execute complete PBIT test: wait for BIT completion and verify all diagnostic fields. This function encapsulates the entire PBIT test flow: 1. Wait for BIT completion (max PBIT_SEC_TIME seconds) 2. Monitor 1553 bus health during wait 3. Verify all B6 LRU status fields (12 fields) 4. Perform B8 drill-down if failures detected (173 additional fields) 5. Return detailed results for statistics tracking Args: interface: Grifo1553 interface object report: testReport object for logging run_stats: Dictionary to update with test results bit_fields: Tuple of all diagnostic field names bit_fields_categories: Dictionary mapping categories to field lists gui_monitor: Optional GUI monitor for status updates Returns: bool: True if test passed (no real failures), False otherwise """ remaining_time = PBIT_SEC_TIME pbit_start_time = time.perf_counter() # Initialize 1553 communication loss detection variables msg_cnt = 0 mil1553_error_flag = COMM_LOSS_THRESHOLD # Set symbology intensity setValue(theGrifo1553, 100, "A1_MsgRdrSettingsAndParameters", "settings_RDROperationalSettings_rdr_symbology_intensity", commitChanges=True) test_passed = True while remaining_time > 0: start = time.perf_counter() ret_rep_is_avail = False for i in range(100): cnt = interface.getSingleMessageReceivedSz("B6_MsgRdrSettingsAndParametersTellback") value = interface.getMessageFieldValue( "B6_MsgRdrSettingsAndParametersTellback", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_bit_report_available" ) # Robust availability check ret_rep_is_avail = False try: if isinstance(value, bool): ret_rep_is_avail = value elif value is None: ret_rep_is_avail = False else: ret_rep_is_avail = str(value).strip().lower() in ("true", "1", "yes") except Exception: ret_rep_is_avail = False if ret_rep_is_avail is True: break # Monitor 1553 bus health if isinstance(cnt, (int, float)) and cnt >= 0: if cnt > msg_cnt: mil1553_error_flag = COMM_LOSS_THRESHOLD else: mil1553_error_flag -= 1 msg_cnt = int(cnt) else: mil1553_error_flag -= 1 # Check if communication is lost if mil1553_error_flag == 0: logging.critical(f"1553 bus communication lost - message counter stalled at {msg_cnt}") report.add_comment(f"CRITICAL: 1553 bus communication lost (counter stalled at {msg_cnt} messages)", False) if gui_monitor: gui_monitor.log_event('error', '1553 communication LOST - aborting test') return False time.sleep(0.05) if ret_rep_is_avail is True: time.sleep(0.02) run_stats['bit_available'] = True run_stats['pbit_time'] = time.perf_counter() - pbit_start_time report.add_comment(f"BIT report available after {run_stats['pbit_time']:.1f}s") if gui_monitor: gui_monitor.update_status(pbit_available=True, pbit_time=run_stats['pbit_time']) gui_monitor.log_event('success', f"BIT available after {run_stats['pbit_time']:.1f}s") # ===== PHASE 1: Verify ALL B6 LRU Status Fields ===== b6_lru_fields = bit_fields_categories['B6_LRU_Status'] b6_failures = [] b6_known_failures = [] radar_fail_status_field = "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status" for f in b6_lru_fields: if f == radar_fail_status_field: continue run_stats['b6_total'] += 1 ret, err = check(theGrifo1553, "false", "B6_MsgRdrSettingsAndParametersTellback", f) if gui_monitor and run_stats['b6_total'] % 3 == 0: gui_monitor.update_statistics( b6_total=run_stats['b6_total'], b6_pass=run_stats['b6_pass'], b6_fail=run_stats['b6_fail'], b6_known=run_stats['b6_known_fail'] ) if ret: run_stats['b6_pass'] += 1 else: if f in KNOWN_FAILURES: run_stats['b6_known_fail'] += 1 b6_known_failures.append((f, err)) logging.warning(f"Known failure (ignored): {f}") else: run_stats['b6_fail'] += 1 b6_failures.append((f, err)) test_passed = False run_stats['success'] = False # Check radar_fail_status run_stats['b6_total'] += 1 ret_radar_fail, err_radar_fail = check(theGrifo1553, "false", "B6_MsgRdrSettingsAndParametersTellback", radar_fail_status_field) if ret_radar_fail: run_stats['b6_pass'] += 1 else: if len(b6_failures) > 0: run_stats['b6_fail'] += 1 b6_failures.append((radar_fail_status_field, err_radar_fail)) test_passed = False run_stats['success'] = False logging.warning(f"Radar fail status: REAL failure (caused by: {', '.join([f.split('_')[-1] for f, _ in b6_failures[:3]])})") else: run_stats['b6_known_fail'] += 1 b6_known_failures.append((radar_fail_status_field, err_radar_fail)) logging.warning(f"Radar fail status: Known failure (caused only by pedestal)") logging.info(f"B6 LRU Status: {run_stats['b6_total']} total, " f"{run_stats['b6_pass']} pass, {run_stats['b6_fail']} fail, " f"{run_stats['b6_known_fail']} known") if gui_monitor: gui_monitor.update_statistics( b6_total=run_stats['b6_total'], b6_pass=run_stats['b6_pass'], b6_fail=run_stats['b6_fail'], b6_known=run_stats['b6_known_fail'] ) if run_stats['b6_fail'] > 0: gui_monitor.log_event('warning', f'B6: {run_stats["b6_fail"]} real failures detected') else: gui_monitor.log_event('success', 'B6: All checks passed') if b6_known_failures: run_stats['known_failures'].extend(b6_known_failures) logging.info(f" Known failures (HW setup): {len(b6_known_failures)}") if b6_failures: run_stats['failures'].extend(b6_failures) fail_summary = ', '.join([f.split('_')[-1] for f, _ in b6_failures[:3]]) logging.warning(f" Real failures: {fail_summary}{'...' if len(b6_failures) > 3 else ''}") # ===== PHASE 2: B8 Drill-down ===== should_drill_down = b6_failures or (FORCE_B8_DRILL_DOWN and b6_known_failures) if should_drill_down: if FORCE_B8_DRILL_DOWN and not b6_failures: report.add_comment(f"\nForced B8 drill-down (FORCE_B8_DRILL_DOWN=True): Verifying all {len(bit_fields) - 12} B8 diagnostic fields...") logging.info("[FORCE_B8_DRILL_DOWN] Performing B8 drill-down despite only known failures") else: report.add_comment(f"\nDrill-down: Verifying all {len(bit_fields) - 12} B8 diagnostic fields...") b8_fields = bit_fields[12:] b8_failures = [] for category, fields in list(bit_fields_categories.items())[1:]: category_fail = 0 category_pass = 0 for f in fields: run_stats['b8_checked'] += 1 ret, err = check(theGrifo1553, "false", "B8_MsgBitReport", f) if gui_monitor and run_stats['b8_checked'] % 10 == 0: gui_monitor.update_statistics( b8_checked=run_stats['b8_checked'], b8_pass=run_stats['b8_pass'], b8_fail=run_stats['b8_fail'] ) if ret: category_pass += 1 run_stats['b8_pass'] += 1 else: category_fail += 1 run_stats['b8_fail'] += 1 b8_failures.append((category, f, err)) test_passed = False if category_fail > 0: logging.warning(f"{category}: {category_fail}/{len(fields)} failures") logging.info(f"B8 Diagnostics: {run_stats['b8_checked']} checked, " f"{run_stats['b8_pass']} pass, {run_stats['b8_fail']} fail") if b8_failures: for cat, field, err in b8_failures: run_stats['failures'].append((field, err)) fail_by_cat = {} for cat, field, err in b8_failures: if cat not in fail_by_cat: fail_by_cat[cat] = [] fail_by_cat[cat].append(field.split('_')[-1]) for cat, fails in fail_by_cat.items(): logging.warning(f" {cat}: {len(fails)} failures - {', '.join(fails[:3])}{'...' if len(fails) > 3 else ''}") else: logging.info(f"All B6 LRU Status PASS (no B8 drill-down needed)") break time_passed = time.perf_counter() - start remaining_time -= time_passed if ret_rep_is_avail is True: remaining_time = 0 logging.info(f'{remaining_time:.1f}s remaining ...') return test_passed def tgt_gen_alone(interface, max_cycles=10000): """ Standalone target generation test with 1553 logging enabled. Executes tgt_gen() in a loop until target is detected or max_cycles reached. Useful for isolated target detection testing without full PBIT flow. Args: interface: Grifo1553 interface object max_cycles: Maximum number of tgt_gen() attempts (default: 10000) Returns: dict: Last tgt_gen() result dict, or None if interrupted """ logging.info(f'[tgt_gen_alone] Starting standalone target generation (max {max_cycles} cycles)...') # Start 1553 logging (level 3, to script directory) try: interface.logStart(3, os.path.dirname(sys.argv[0])) logging.info('[tgt_gen_alone] 1553 logging started') except Exception as e: logging.warning(f'[tgt_gen_alone] Could not start 1553 logging: {e}') result = None for n in range(max_cycles): logging.info(f'[tgt_gen_alone] Cycle {n+1}/{max_cycles}') # Run target generation result = tgt_gen(interface) # Check for success or interrupt if result and result.get('detected', False): logging.info(f"[tgt_gen_alone] ✓ Target detected on cycle {n+1}") break if interruptRequest: logging.info('[tgt_gen_alone] Interrupted by user') break # Short delay between cycles time.sleep(0.5) # Stop 1553 logging try: interface.logStop() logging.info('[tgt_gen_alone] 1553 logging stopped') except Exception as e: logging.warning(f'[tgt_gen_alone] Error stopping 1553 logging: {e}') if result is None: logging.warning('[tgt_gen_alone] No result obtained (interrupted or error)') return {'detected': False, 'hits': 0, 'range': 0, 'iterations': 0, 'time_to_detect': 0, 'message_count': 0, 'timetag_final': 0} return result def ask_production_config(): """ Ask user for test configuration in production mode (target reale). Returns: tuple: (num_runs, gui_enabled) """ print("") print("=" * 80) print("GRIFO PBIT - PRODUCTION MODE (Target Reale)") print("=" * 80) print("") # Check if GUI is available try: from GRIFO_M_PBIT_gui import TestMonitorGUI gui_available = True except ImportError: gui_available = False # Ask about GUI first (if available) gui_enabled = False if gui_available: while True: try: gui_input = input("Enable real-time GUI monitor? (y/n) [y]: ").strip().lower() if gui_input in ['', 'y', 'yes']: gui_enabled = True print("✓ GUI monitor will be enabled") break elif gui_input in ['n', 'no']: gui_enabled = False print("✓ GUI monitor disabled (console only)") break else: print("Please enter 'y' or 'n'") except (KeyboardInterrupt, EOFError): gui_enabled = False break print("") else: print("[INFO] GUI monitor not available (tkinter import failed)") print("") # Ask for number of runs while True: try: user_input = input(f"How many test runs do you want to execute? (minimum 1) [{NUMBER_OF_REPETITIONS}]: ").strip() if user_input == '': num_runs = NUMBER_OF_REPETITIONS break num_runs = int(user_input) if num_runs < 1: print(f"Error: Number of runs must be at least 1. You entered: {num_runs}") continue break except ValueError: print(f"Error: Invalid input. Please enter a number.") continue except (KeyboardInterrupt, EOFError): print("\n\n[INFO] Using default value") num_runs = NUMBER_OF_REPETITIONS break # Ask whether to perform target validation (acquire target on 1553) before runs # Default is 'n' to avoid unexpected hardware operations unless operator requests it run_on_target = False while True: try: target_input = input("Perform target acquisition on real hardware before runs? (y/n) [n]: ").strip().lower() if target_input in ['', 'n', 'no']: run_on_target = False break if target_input in ['y', 'yes']: run_on_target = True break print("Please enter 'y' or 'n'") except (KeyboardInterrupt, EOFError): run_on_target = False break print("") print(f"✓ Configured for {num_runs} test run(s)") print("") print("=" * 80) print("") return num_runs, gui_enabled, run_on_target def test_proc(): # ========== SIMULATION MODE SUPPORT ========== # Enable test execution without hardware using --simulate flag # Mock implementation in GRIFO_M_PBIT_mock.py provides simulated interfaces global gui_monitor gui_monitor = None # Reference to GUI monitor (if available) # Parse command-line arguments if '--simulate' in sys.argv: from GRIFO_M_PBIT_mock import initialize_simulation, setup_simulation, create_mock_terminal import GRIFO_M_PBIT_mock # Check if already initialized by launcher to avoid double initialization if GRIFO_M_PBIT_mock._requested_runs is None: # Not initialized yet, do it now (direct execution scenario) initialize_simulation() setup_simulation() # else: already initialized by launcher, skip # Get GUI reference if enabled gui_monitor = GRIFO_M_PBIT_mock._gui_monitor use_mock_terminal = True else: # ========== PRODUCTION MODE (Target Reale) ========== # Ask user for configuration (number of runs, GUI, and whether to run target acquisition) runs_total, gui_enabled, run_on_target = ask_production_config() # Initialize GUI if enabled if gui_enabled: try: from GRIFO_M_PBIT_gui import TestMonitorGUI gui_monitor = TestMonitorGUI() gui_monitor.start() gui_monitor.update_status(run_total=runs_total) gui_monitor.log_event('info', f'Production mode - {runs_total} runs configured') try: gui_monitor.update_scenario( name='Production Run', description='Executing test on real hardware (target).', expected_failures=[], expected_passes=[], notes='Running in production mode' ) except Exception: pass logging.info("GUI monitor started successfully") except Exception as e: logging.warning(f"Failed to start GUI monitor: {e}") gui_monitor = None use_mock_terminal = False # Determine total runs to execute. In simulate mode, prefer user-requested runs from mock. if use_mock_terminal: try: runs_total = GRIFO_M_PBIT_mock._requested_runs if GRIFO_M_PBIT_mock._requested_runs is not None else NUMBER_OF_REPETITIONS except Exception: runs_total = NUMBER_OF_REPETITIONS # In production mode, runs_total is already set by ask_production_config() # ========== END SIMULATION SUPPORT ========== global report, test_statistics # Complete bit_fields: All B6 LRU status + All B8 degradation/SRU/test fields # Total: 185 fields (12 B6 status + 12 B8 degradation + 43 B8 SRU + 118 B8 tests) bit_fields = ( # ===== B6: LRU Status Fields ===== "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_array_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pressurization_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_over_temperature_alarm", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_receiver_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_rx_front_end_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_servoloop_over_temperature_alarm", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_servoloop_status", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_over_temperature_alarm", "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_status", # ===== B8: Degradation Conditions ===== "degradation_conditions_w1_DegradationConditionsW1_bcn_fail", "degradation_conditions_w1_DegradationConditionsW1_gm_rbm_sea1_ta_wa_fail", "degradation_conditions_w1_DegradationConditionsW1_group1_fail", "degradation_conditions_w1_DegradationConditionsW1_group2_fail", "degradation_conditions_w1_DegradationConditionsW1_group3_fail", "degradation_conditions_w1_DegradationConditionsW1_group4_fail", "degradation_conditions_w1_DegradationConditionsW1_group5_fail", "degradation_conditions_w1_DegradationConditionsW1_hr_modes_and_gm_dbs_fail", "degradation_conditions_w1_DegradationConditionsW1_no_rdr_symbology", "degradation_conditions_w1_DegradationConditionsW1_not_identified_rdr_fail", "degradation_conditions_w1_DegradationConditionsW1_selected_channel_fail", "degradation_conditions_w1_DegradationConditionsW1_total_rdr_fail", # ===== B8: SRU Failure Locations ===== "failure_location_pedestal_FailureLocationPedestal_sru1_gimbal", "failure_location_pedestal_FailureLocationPedestal_sru2_waveguide", "failure_location_pedestal_FailureLocationPedestal_sru3_waveguide", "failure_location_pedestal_FailureLocationPedestal_sru4_delta_guard_lna_switch", "failure_location_pedestal_FailureLocationPedestal_sru5_waveguide_switch", "failure_location_processor_FailureLocationProcessor_sru10_main_computer", "failure_location_processor_FailureLocationProcessor_sru11_graphic_computer", "failure_location_processor_FailureLocationProcessor_sru12_power_supply", "failure_location_processor_FailureLocationProcessor_sru13_det_exp", "failure_location_processor_FailureLocationProcessor_sru14_rx_module", "failure_location_processor_FailureLocationProcessor_sru1_motherboard_chassis", "failure_location_processor_FailureLocationProcessor_sru2_mti_fft", "failure_location_processor_FailureLocationProcessor_sru3_dsp0", "failure_location_processor_FailureLocationProcessor_sru4_dsp1", "failure_location_processor_FailureLocationProcessor_sru5_cfar_px_ctrl", "failure_location_processor_FailureLocationProcessor_sru6_timer", "failure_location_processor_FailureLocationProcessor_sru7_post_processor", "failure_location_processor_FailureLocationProcessor_sru8_agc", "failure_location_processor_FailureLocationProcessor_sru9_esa_if", "failure_location_receiver_FailureLocationReceiver_sru1_chassis", "failure_location_receiver_FailureLocationReceiver_sru2_uhf_assy", "failure_location_receiver_FailureLocationReceiver_sru3_synthesizer", "failure_location_receiver_FailureLocationReceiver_sru4_delta_guard_down_converter", "failure_location_receiver_FailureLocationReceiver_sru5_sum_down_converter", "failure_location_receiver_FailureLocationReceiver_sru6_lo_distributor", "failure_location_receiver_FailureLocationReceiver_sru7_up_converter", "failure_location_rx_frontend_FailureLocationRxFrontEnd_sru1_chassis", "failure_location_rx_frontend_FailureLocationRxFrontEnd_sru2_delta_guard_lna", "failure_location_rx_frontend_FailureLocationRxFrontEnd_sru3_sum_act_prot_lna", "failure_location_rx_frontend_FailureLocationRxFrontEnd_sru4_4port_circulator", "failure_location_rx_frontend_FailureLocationRxFrontEnd_sru5_stc_delta_guard", "failure_location_rx_frontend_FailureLocationRxFrontEnd_sru5_stc_sum", "failure_location_servoloop_FailureLocationServoloop_sru1_chassis", "failure_location_servoloop_FailureLocationServoloop_sru2_power_supply", "failure_location_servoloop_FailureLocationServoloop_sru3_digital_controller", "failure_location_transmitter_FailureLocationTransmitter_sru1_chassis", "failure_location_transmitter_FailureLocationTransmitter_sru2_rex_f_tx", "failure_location_transmitter_FailureLocationTransmitter_sru3_power_supply", "failure_location_transmitter_FailureLocationTransmitter_sru4_valve_el_twt_tx", "failure_location_transmitter_FailureLocationTransmitter_sru5_rf_driver", "failure_location_transmitter_FailureLocationTransmitter_sru6_controller_tx", "failure_location_transmitter_FailureLocationTransmitter_sru7_hv_power_supply", "failure_location_transmitter_FailureLocationTransmitter_sru8_eht_power_supply", # ===== B8: All Test Results ===== "agc_test_results_AGCTestResults_test_agc10_pulse_compressor_interface", "agc_test_results_AGCTestResults_test_agc11_dp_interface", "agc_test_results_AGCTestResults_test_agc13_taxi_running", "agc_test_results_AGCTestResults_test_agc14_external_xyp_ram", "agc_test_results_AGCTestResults_test_agc15_servoloop_interface", "agc_test_results_AGCTestResults_test_agc1_internal_xyp_ram", "agc_test_results_AGCTestResults_test_agc2_external_xyp_ram", "agc_test_results_AGCTestResults_test_agc5_dual_port_ram", "agc_test_results_AGCTestResults_test_agc6_agc_machine", "agc_test_results_AGCTestResults_test_agc7_sat_machine", "agc_test_results_AGCTestResults_test_agc9_c_ram_xy_checksum", "data_processor_test_results_DataProcessorTestResults_test_dp10_video_memory", "data_processor_test_results_DataProcessorTestResults_test_dp11_video_unit", "data_processor_test_results_DataProcessorTestResults_test_dp12_transputer_unit", "data_processor_test_results_DataProcessorTestResults_test_dp13_scan_converter_polar_memory", "data_processor_test_results_DataProcessorTestResults_test_dp14_scan_converter_format_converter", "data_processor_test_results_DataProcessorTestResults_test_dp1_486_cpu_tests", "data_processor_test_results_DataProcessorTestResults_test_dp2_486_interfaces_with_r3000_gc", "data_processor_test_results_DataProcessorTestResults_test_dp3_486_interface_with_slc", "data_processor_test_results_DataProcessorTestResults_test_dp4_slc_communications", "data_processor_test_results_DataProcessorTestResults_test_dp5_r3000_cpu_tests", "data_processor_test_results_DataProcessorTestResults_test_dp6_r3000_interfaces", "data_processor_test_results_DataProcessorTestResults_test_dp7_1553_and_discretes", "data_processor_test_results_DataProcessorTestResults_test_dp8_graphic_cpu", "data_processor_test_results_DataProcessorTestResults_test_dp9_graphic_processors", "integrated_system_test_results_IntegratedSystemTestResults_array_status", "integrated_system_test_results_IntegratedSystemTestResults_cal_delta_channel_fail", "integrated_system_test_results_IntegratedSystemTestResults_cal_injection_fail", "integrated_system_test_results_IntegratedSystemTestResults_cal_noise_fail", "integrated_system_test_results_IntegratedSystemTestResults_pedestal_status", "integrated_system_test_results_IntegratedSystemTestResults_processor_status", "integrated_system_test_results_IntegratedSystemTestResults_receiver_status", "integrated_system_test_results_IntegratedSystemTestResults_rx_frontend_status", "integrated_system_test_results_IntegratedSystemTestResults_servoloop_status", "integrated_system_test_results_IntegratedSystemTestResults_test_is1_upconverter_chain_levels", "integrated_system_test_results_IntegratedSystemTestResults_test_is2_downconverter_chain_levels", "integrated_system_test_results_IntegratedSystemTestResults_test_is3_antenna_status_inconsistent", "integrated_system_test_results_IntegratedSystemTestResults_test_is4_tx_status_inconsistent", "integrated_system_test_results_IntegratedSystemTestResults_test_is5_tx_power_level", "integrated_system_test_results_IntegratedSystemTestResults_transmitter_status", "post_processor_test_results_PostProcessorTestResults_test_pp1_master_dsp", "post_processor_test_results_PostProcessorTestResults_test_pp2_interface_card", "post_processor_test_results_PostProcessorTestResults_test_pp3_cpu_cards", "post_processor_test_results_PostProcessorTestResults_test_pp4_dma_bus", "post_processor_test_results_PostProcessorTestResults_test_pp5_sp_interface", "post_processor_test_results_PostProcessorTestResults_test_pp6_dp_interface", "post_processor_test_results_PostProcessorTestResults_test_pp7_scan_converter_interface", "post_processor_test_results_PostProcessorTestResults_test_pp8_agc_interface", "power_supply_test_results_PowerSupplyTestResults_test_ps1_power_supply", "power_supply_test_results_PowerSupplyTestResults_test_ps2_over_temperature", "receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_fe1_lna", "receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_fe2_agc_attenuators", "receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx1_synthesizer_commands", "receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx2_synthesizer_internal_tests", "receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx3_uhf_oscillator_level", "receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx4_downconverter_lo_level", "receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx5_upconverter_lo_level", "rx_module_test_results_RxModuleTestResults_test_rm16_calibration_sum_channel_fail", "rx_module_test_results_RxModuleTestResults_test_rm1_master_clock_level", "rx_module_test_results_RxModuleTestResults_test_rm2_expander_level", "rx_module_test_results_RxModuleTestResults_test_rm3_sum_channel_down_converter", "rx_module_test_results_RxModuleTestResults_test_rm4_dg_channel_down_converter", "rx_module_test_results_RxModuleTestResults_test_rm5_noise_attenuators", "servoloop_test_results_ServoloopTestResults_test_sl10_agc_control", "servoloop_test_results_ServoloopTestResults_test_sl11_ad", "servoloop_test_results_ServoloopTestResults_test_sl12_das", "servoloop_test_results_ServoloopTestResults_test_sl13_serial_communications", "servoloop_test_results_ServoloopTestResults_test_sl14_taxi_interface", "servoloop_test_results_ServoloopTestResults_test_sl15_pedestal_centre_scan_location", "servoloop_test_results_ServoloopTestResults_test_sl1_low_voltage_power_supply", "servoloop_test_results_ServoloopTestResults_test_sl2_high_voltage_power_supply", "servoloop_test_results_ServoloopTestResults_test_sl3_motor_drivers", "servoloop_test_results_ServoloopTestResults_test_sl4_resolvers_power_supply", "servoloop_test_results_ServoloopTestResults_test_sl5_waveguide_switch", "servoloop_test_results_ServoloopTestResults_test_sl6_over_temperature", "servoloop_test_results_ServoloopTestResults_test_sl7_resolver_to_digital_conv", "servoloop_test_results_ServoloopTestResults_test_sl8_position_loop_error", "servoloop_test_results_ServoloopTestResults_test_sl9_microprocessor", "signal_processor_test_results_SignalProcessorTestResults_test_sp10_board_overall", "signal_processor_test_results_SignalProcessorTestResults_test_sp11_attenuatori_antenna", "signal_processor_test_results_SignalProcessorTestResults_test_sp14_external_sp_if", "signal_processor_test_results_SignalProcessorTestResults_test_sp16_bcn", "signal_processor_test_results_SignalProcessorTestResults_test_sp1_timer1_up", "signal_processor_test_results_SignalProcessorTestResults_test_sp2_timer_dma_pxc_if", "signal_processor_test_results_SignalProcessorTestResults_test_sp3_timer_internal", "signal_processor_test_results_SignalProcessorTestResults_test_sp4_px_ctrl_comm", "signal_processor_test_results_SignalProcessorTestResults_test_sp5_video1_without_ad", "signal_processor_test_results_SignalProcessorTestResults_test_sp6_video1_with_ad", "signal_processor_test_results_SignalProcessorTestResults_test_sp7_video2_ad_sync", "signal_processor_test_results_SignalProcessorTestResults_test_sp8_video2_timer_sync", "signal_processor_test_results_SignalProcessorTestResults_test_sp9_ad_da", "signal_processor_test_results_SignalProcessorTestResults_test_sp9b_wideband_expander", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx10_hv_ps_over_temperature_warning", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx11_twt_helix_over_current", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx12_cathode_to_helix_arc", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx13_twt_over_temperature_hazard", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx14_twt_over_temperature_warning", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx15_cathode_under_voltage", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx16_cathode_over_voltage", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx1_microprocessors", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx2_tx_rf_input", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx3_twt_rf_input", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx4_twt_rf_output", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx5_tx_rf_output_level", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx6_vswr", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx7_three_phase_input_power", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx8_low_voltage_power_supplies", "transmitter_test_results_w1_TransmitterTestResultsW1_test_tx9_hv_ps_over_temperature_hazard", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx17_collector_under_voltage", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx18_collector_over_voltage", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx19_rectified_voltage", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx20_cathode_inv_current_fail", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx21_collector_inv_current_fail", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx22_waveguide_pressurization", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx23_grid_window_over_duty_alt", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx24_floating_deck_fail", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx25_floating_deck_ps_fail", "transmitter_test_results_w2_TransmitterTestResultsW2_test_tx26_grid_window_over_duty", ) # ==================== # BIT FIELDS CATEGORIZATION # ==================== # Dictionary mapping category names to field indices in bit_fields tuple. # Used for organized drill-down reporting when B6 failures trigger B8 verification. # # Categories: # B6_LRU_Status: 12 Line Replaceable Unit status fields (always checked) # B8_Degradation: 12 system degradation condition flags # B8_SRU_*: 43 Shop Replaceable Unit failure location flags (6 subsystems) # B8_Test_*: 118 detailed test result fields (10 test types) # # Total: 185 diagnostic fields providing complete radar health visibility bit_fields_categories = { 'B6_LRU_Status': bit_fields[0:12], 'B8_Degradation': bit_fields[12:24], 'B8_SRU_Pedestal': bit_fields[24:29], 'B8_SRU_Processor': bit_fields[29:43], 'B8_SRU_Receiver': bit_fields[43:50], 'B8_SRU_RxFrontend': bit_fields[50:56], 'B8_SRU_Servoloop': bit_fields[56:59], 'B8_SRU_Transmitter': bit_fields[59:67], 'B8_Test_AGC': bit_fields[67:78], 'B8_Test_DataProcessor': bit_fields[78:92], 'B8_Test_IntegratedSystem': bit_fields[92:107], 'B8_Test_PostProcessor': bit_fields[107:115], 'B8_Test_PowerSupply': bit_fields[115:117], 'B8_Test_Receiver': bit_fields[117:124], 'B8_Test_RxModule': bit_fields[124:130], 'B8_Test_Servoloop': bit_fields[130:145], 'B8_Test_SignalProcessor': bit_fields[145:159], 'B8_Test_Transmitter': bit_fields[159:185], } logger_setup('GRIFO_M_PBIT.log') report = testReport(sys.argv[0]) interface = theGrifo1553.getInterface() # Create serial terminal (real or mock based on simulation mode) if use_mock_terminal: terminal = create_mock_terminal() else: terminal = leo_grifo_terminal.GrifoSerialTerminal() terminal.connect() # If running in simulation, the mock module may have configured run_on_target if use_mock_terminal: try: run_on_target = GRIFO_M_PBIT_mock._run_on_target except Exception: run_on_target = False test_return = True try: #report.open_session('Pre Conditions') #power_grifo_off() #report.close_session() ############ Test Execution ############ #report.open_session('Test Execution') report.add_comment("The Test Operator check if the failure BIT in B6_MsgRdrSettingsAndParametersTellback changes ...") # Target detection test will be performed AFTER each PBIT run (if run_on_target=True) # See target test integration at lines 1973-2010 in main loop for repetition in range(runs_total): info = f'Repetition {1 + repetition} of {runs_total}' logging.info(info) report.open_session(info) # Update GUI for new run if gui_monitor: gui_monitor.update_status(run_current=repetition + 1, run_total=runs_total, power_on=True) gui_monitor.log_event('info', f'Starting repetition {repetition + 1}/{runs_total}') # Statistics for this run # Record run start timestamp for reporting run_start_time = datetime.now() run_start_perf = time.perf_counter() run_stats = { 'repetition': repetition + 1, 'start_time': run_start_time.isoformat(), 'pbit_time': 0, 'bit_available': False, 'target_simulated': None, 'b6_total': 0, 'b6_pass': 0, 'b6_fail': 0, 'b6_known_fail': 0, 'b8_checked': 0, 'b8_pass': 0, 'b8_fail': 0, 'failures': [], 'known_failures': [], 'success': True, # Serial statistics 'serial_total': 0, 'serial_errors': 0, 'serial_fatal': 0, 'serial_recycles': 0, 'serial_details': [], # List of notable serial events # Target detection test results (initialized to None, set after test execution) 'target_detected': None, 'target_test_time': None, 'target_hits': 0, 'target_range': 0, 'target_iterations': 0, } # Attach scenario name: simulation scenario or production indicator if use_mock_terminal: # Simulation mode: use configured scenario from mock try: if hasattr(GRIFO_M_PBIT_mock, '_scenario_list') and GRIFO_M_PBIT_mock._scenario_list: idx = repetition if repetition < len(GRIFO_M_PBIT_mock._scenario_list) else 0 run_stats['scenario'] = GRIFO_M_PBIT_mock._scenario_list[idx] else: run_stats['scenario'] = None except Exception: run_stats['scenario'] = None else: # Production mode: indicate real hardware execution run_stats['scenario'] = 'Production Run' test_statistics['total_runs'] += 1 # Reset serial statistics for this run terminal.reset_serial_statistics() # Attach pre-run target info (if mock simulated target was injected) # FIXED: Retrieve for ALL runs, not just first one try: t_info = None if use_mock_terminal: t_info = getattr(interface, '_last_simulated_target', None) if t_info is None: t_info = getattr(interface, '_last_target', None) run_stats['target_simulated'] = t_info except Exception: run_stats['target_simulated'] = None report.add_comment("The test operator is required to switch off the target and wait 3 seconds.") power_grifo_off(wait_after=4, wait_before=1) # Update GUI - power off if gui_monitor: gui_monitor.update_status(power_on=False, pbit_time=0.0) gui_monitor.log_event('info', 'Power OFF - waiting before=1s, after=4s') report.add_comment("The test operator is required to switch on the target.") power_grifo_on(wait_after=1.5) # timing: 1.5s is sufficient # Update GUI - power on if gui_monitor: gui_monitor.update_status(power_on=True) gui_monitor.log_event('info', 'Power ON - setup starting immediately') # Step 3: Setup radar parameters (immediate, no extra delays) # Order matches : INTENSITY first, then SILENCE/STANDBY/scan params together logging.info('[Setup] Configuring radar parameters') report.add_comment("Configuring radar operational parameters") try: # Step 3a: Set INTENSITY first logging.info('[Setup] Setting symbology intensity') setValue(theGrifo1553, 127, "A1_MsgRdrSettingsAndParameters", "settings_RDROperationalSettings_rdr_symbology_intensity", commitChanges=True) # Step 3b: Set SILENCE + scan params + STANDBY together (grouped commit) # This matches strategy: batch A2 commands in single commit logging.info('[Setup] Setting SILENCE, scan parameters, and STANDBY (grouped commit)') setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_silence") # NO commit setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "param1_RdrFunAndParam1_azimuth_scan_width_selection") # NO commit setValue(theGrifo1553, 2, "A2_MsgRdrOperationCommand", "param1_RdrFunAndParam1_range_scale_selection") # NO commit setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "param1_RdrFunAndParam1_number_of_bars_selection") # NO commit setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True) # Commit ALL A2 together if gui_monitor: gui_monitor.log_event('info', 'SILENCE, STANDBY, and scan params set') except Exception as e: logging.error(f'[Setup] Exception during A1/A2 setup: {e}') if gui_monitor: gui_monitor.log_event('error', f'A1/A2 setup failed: {e}') # Step 4: Verify tellbacks (OPTIONAL - controlled by ENABLE_TELLBACK_VERIFICATION flag) if ENABLE_TELLBACK_VERIFICATION: logging.info('[Setup] TELLBACK VERIFICATION ENABLED - Verifying STANDBY and SILENCE tellbacks') # Wait for radar to process commands before checking tellbacks time.sleep(TELLBACK_POST_SET_DELAY_SEC) try: stby_verified = False silence_verified = False start_verify = time.time() while (time.time() - start_verify) < TELLBACK_VERIFY_TIMEOUT_SEC: # Check STANDBY tellback (B7 message) try: stby_val = interface.getMessageFieldValue( "B7_MsgRdrStatusTellback", "rdr_mode_tellback_RdrStatusTellback_stby_tellback" ) if stby_val == 1 or str(stby_val).strip().lower() in ("true", "1"): stby_verified = True except Exception: pass # Check SILENCE tellback via rf_radiation_status (B7 message) # Note: SILENCE=1 (command) → rf_radiation_status=0 (RF OFF) try: rf_rad_val = interface.getMessageFieldValue( "B7_MsgRdrStatusTellback", "rdr_mode_tellback_RdrStatusTellback_rf_radiation_status" ) # SILENCE attivo quando RF radiation è OFF (0) if rf_rad_val == 0 or str(rf_rad_val).strip().lower() in ("false", "0"): silence_verified = True except Exception: pass if stby_verified and silence_verified: break time.sleep(TELLBACK_VERIFY_STEP_SEC) if stby_verified and silence_verified: logging.info('[Setup] ✓ STANDBY and SILENCE tellbacks verified') report.add_comment("STANDBY and SILENCE modes verified via tellbacks") if gui_monitor: gui_monitor.log_event('success', 'STANDBY and SILENCE verified') else: logging.warning(f'[Setup] ⚠ Tellback verification incomplete (STANDBY: {stby_verified}, SILENCE: {silence_verified})') if gui_monitor: gui_monitor.log_event('warning', 'Tellback verification incomplete') except Exception as e: logging.error(f'[Setup] Exception during tellback verification: {e}') else: logging.info('[Setup] TELLBACK VERIFICATION DISABLED - Using "fire and forget" mode') if gui_monitor: gui_monitor.log_event('info', 'Tellback verification skipped (fast mode)') # Step 5: Configure navigation data and INU (continues sequence) logging.info('[Setup] Configuring navigation data (A4) and INU mode (A5)') try: # Step 5a: Configure navigation data (A4) - grouped commit setValue(theGrifo1553, 1060, "A4_MsgNavDataAndCursor", "z_acceleration_Acceleration_raw") # NO commit setValue(theGrifo1553, 2500, "A4_MsgNavDataAndCursor", "corrected_baro_altitude_BaroAltitude_raw") # NO commit setValue(theGrifo1553, 2500, "A4_MsgNavDataAndCursor", "radio_altitude_RadioAltitude_raw", commitChanges=True) # Commit A4 together # Step 5b: Configure INU mode word (A5) setValue(theGrifo1553, 0xAA54, "A5_MsgInuHighSpeed", "mode_word", commitChanges=True) logging.info('[Setup] ✓ Nav data and INU configuration complete') if gui_monitor: gui_monitor.log_event('success', 'Radar setup complete - starting PBIT') except Exception as e: logging.error(f'[Setup] Exception during A4/A5 configuration: {e}') if gui_monitor: gui_monitor.log_event('error', f'A4/A5 configuration failed: {e}') # Step 6: Execute PBIT test (immediate start, no extra delays) logging.info(f'[Run {repetition+1}] Starting PBIT test (timeout: {PBIT_SEC_TIME}s)') report.add_comment(f"Starting PBIT test - waiting up to {PBIT_SEC_TIME}s for completion") if gui_monitor: gui_monitor.log_event('info', 'Starting PBIT test...') test_return = execute_pbit_test(interface, report, run_stats, bit_fields, bit_fields_categories, gui_monitor) # Finalize run timing try: run_stats['end_time'] = datetime.now().isoformat() run_stats['run_duration'] = time.perf_counter() - run_start_perf except Exception: run_stats['end_time'] = None run_stats['run_duration'] = None # Run statistics test_statistics['repetitions'].append(run_stats) if run_stats['success']: test_statistics['successful_runs'] += 1 else: test_statistics['failed_runs'] += 1 # Step 7: Remove STANDBY for target test (simple and direct) logging.info('[Target Prep] Removing STANDBY mode') report.add_comment("Removing STANDBY mode for target test") try: setValue(theGrifo1553, 0, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True) logging.info('[Target Prep] ✓ STANDBY removed') if gui_monitor: gui_monitor.log_event('info', 'STANDBY removed') except Exception as e: logging.error(f'[Target Prep] Exception during STANDBY removal: {e}') # Step 8: Verify STANDBY removal (OPTIONAL - only if tellback verification enabled) if ENABLE_TELLBACK_VERIFICATION: logging.info('[Target Prep] Verifying STANDBY removal via tellback') # Wait for command to be processed time.sleep(STBY_POST_UNSET_DELAY_SEC) try: stby_removed = False start_verify = time.time() while (time.time() - start_verify) < TELLBACK_VERIFY_TIMEOUT_SEC: try: stby_val = interface.getMessageFieldValue( "B7_MsgRdrStatusTellback", "rdr_mode_tellback_RdrStatusTellback_stby_tellback" ) if stby_val == 0 or str(stby_val).strip().lower() in ("false", "0", "no"): stby_removed = True break except Exception: pass time.sleep(TELLBACK_VERIFY_STEP_SEC) if stby_removed: logging.info('[Target Prep] ✓ STANDBY removal verified') if gui_monitor: gui_monitor.log_event('success', 'STANDBY removed (verified)') else: logging.warning('[Target Prep] ⚠ STANDBY removal not verified') if gui_monitor: gui_monitor.log_event('warning', 'STANDBY removal not verified') except Exception as e: logging.error(f'[Target Prep] Exception during tellback verification: {e}') # Step 9: Execute target test (if requested) target_found = False target_test_time = 0.0 target_test_done = False if run_on_target: target_test_done = True # Wait 1 second before starting target test logging.info('[Target Test] Waiting 1 second before target detection test') time.sleep(1.0) try: report.add_comment("Target Detection Test") logging.info(f"[Run {repetition+1}] Starting target detection test (timeout: {TARGET_DETECTION_TIMEOUT_SEC}s)") if gui_monitor: gui_monitor.log_event('info', f"Run {repetition+1}: Target detection test starting...") # Execute auxiliary serial sequence to prepare radar (if configured and enabled) if send_serial_sequence and ENABLE_AUX_SERIAL: try: logging.info('[Target Test] Executing auxiliary serial sequence') send_serial_sequence() except Exception as e: logging.warning(f"[Target Test] Aux serial sequence failed: {e}") # Execute target detection start_tgt = time.time() tgt_result = tgt_gen( theGrifo1553, timeout_sec=TARGET_DETECTION_TIMEOUT_SEC, expected_range=TARGET_EXPECTED_RANGE, range_tolerance=(TARGET_RANGE_TOLERANCE_LOW, TARGET_RANGE_TOLERANCE_HIGH), hit_threshold=TARGET_HIT_THRESHOLD, only_bc=False, # STANDBY already removed, don't set it again enable_stim=True ) target_test_time = time.time() - start_tgt target_found = tgt_result['detected'] # Store detailed results run_stats['target_hits'] = tgt_result['hits'] run_stats['target_range'] = tgt_result['range'] run_stats['target_iterations'] = tgt_result['iterations'] run_stats['target_detected'] = target_found run_stats['target_test_time'] = target_test_time # Restore STANDBY after target test ( simple command) logging.info('[Target Test] Restoring STANDBY mode') try: setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True) logging.info('[Target Test] ✓ STANDBY restored') except Exception as e: logging.error(f'[Target Test] Exception restoring STANDBY: {e}') if target_found: report.add_comment(f"Target detected successfully in {target_test_time:.2f}s") logging.info(f"[Run {repetition+1}] Target DETECTED in {target_test_time:.2f}s") if gui_monitor: gui_monitor.log_event('success', f"Run {repetition+1}: Target detected in {target_test_time:.1f}s") else: report.add_comment(f"Target NOT detected (timeout after {target_test_time:.2f}s)") logging.warning(f"[Run {repetition+1}] Target NOT DETECTED (timeout: {target_test_time:.2f}s)") if gui_monitor: gui_monitor.log_event('error', f"Run {repetition+1}: Target NOT detected (timeout)") except Exception as e: logging.error(f"[Run {repetition+1}] Target test error: {e}") logging.error(traceback.format_exc()) run_stats['target_detected'] = False run_stats['target_test_time'] = 0.0 else: # Target test disabled - skip it logging.info(f"[Run {repetition+1}] Target test SKIPPED (run_on_target=False)") run_stats['target_detected'] = None run_stats['target_test_time'] = None # Collect serial statistics for this run before closing session serial_stats = terminal.get_serial_statistics() run_stats['serial_total'] = serial_stats['total_messages'] run_stats['serial_errors'] = serial_stats['error_messages'] run_stats['serial_fatal'] = serial_stats['fatal_messages'] run_stats['serial_recycles'] = serial_stats['recycle_count'] # Store serial details for final aggregate report # Details will be shown in dedicated PDF section, not as step logs if serial_stats['recycle_count'] > 0: for timestamp, message in serial_stats['recycle_details']: run_stats['serial_details'].append({'type': 'RECYCLE', 'timestamp': timestamp, 'message': message}) if serial_stats['error_messages'] > 0: for timestamp, message in serial_stats['error_details'][:5]: # Limit to first 5 run_stats['serial_details'].append({'type': 'ERROR', 'timestamp': timestamp, 'message': message}) if serial_stats['fatal_messages'] > 0: for timestamp, message in serial_stats['fatal_details'][:5]: # Limit to first 5 run_stats['serial_details'].append({'type': 'FATAL', 'timestamp': timestamp, 'message': message}) # Log summary to console for immediate feedback during test execution logging.info(f"[Run {repetition+1}] Serial: {serial_stats['total_messages']} total, " f"{serial_stats['error_messages']} errors, {serial_stats['fatal_messages']} fatal, " f"{serial_stats['recycle_count']} recycles") # Update GUI with serial statistics if gui_monitor: gui_monitor.update_statistics( serial_total=serial_stats['total_messages'], serial_errors=serial_stats['error_messages'], serial_fatal=serial_stats['fatal_messages'], serial_recycles=serial_stats['recycle_count'] ) if serial_stats['recycle_count'] > 0: gui_monitor.log_event('warning', f"Serial: {serial_stats['recycle_count']} RECYCLE events") if serial_stats['fatal_messages'] > 0: gui_monitor.log_event('error', f"Serial: {serial_stats['fatal_messages']} fatal messages") # Push per-run summary to GUI runs table if gui_monitor: # Determine target test status for GUI display if target_test_done: target_done_text = 'PASS' if target_found else 'FAIL' target_result_text = 'PASS' if target_found else 'FAIL' else: target_done_text = 'NO' target_result_text = 'N/A' fail_summary = '' if run_stats.get('failures'): try: fail_summary = ', '.join([f.split('_')[-1] for f, _ in run_stats['failures'][:3]]) except Exception: fail_summary = str(run_stats.get('failures')) # Get target test results from run_stats (not from interface._last_target) target_distance_m = None target_distance_nm = None target_found_iter = None if target_test_done and target_found: # Target was detected - get actual results from run_stats try: target_range = run_stats.get('target_range', 0) target_iterations = run_stats.get('target_iterations', 0) if target_range > 0: target_distance_m = float(target_range) # Nautical mile conversion (1 NM = 1852 meters) target_distance_nm = target_distance_m / 1852.0 if target_iterations > 0: target_found_iter = int(target_iterations) except Exception: target_distance_m = None target_distance_nm = None target_found_iter = None gui_monitor.update_run( run=repetition+1, result='PASS' if run_stats.get('success') else 'FAIL', # Overall PBIT result pbit=run_stats.get('pbit_time', 0.0), b6_fail=run_stats.get('b6_fail', 0), b8_fail=run_stats.get('b8_fail', 0), known=run_stats.get('b6_known_fail', 0), fail_summary=fail_summary, serial_events=len(run_stats.get('serial_details', [])), target_done=target_done_text, target_result=target_result_text, # Extra detailed fields for the run detail dialog start_time=run_stats.get('start_time'), end_time=run_stats.get('end_time'), run_duration=run_stats.get('run_duration'), failures=run_stats.get('failures', []), serial_details=run_stats.get('serial_details', []), target_detected=run_stats.get('target_detected'), target_test_time=run_stats.get('target_test_time'), stby_verified=run_stats.get('stby_verified'), silence_verified=run_stats.get('silence_verified'), stby_silence_verify_details=run_stats.get('stby_silence_verify_details'), target_distance_m=target_distance_m, target_distance_nm=target_distance_nm, target_found_iter=target_found_iter ) report.close_session() if interruptRequest is True: report.add_comment("Test interrupted by user (Ctrl-C)") break report.add_comment("Repetitions terminated.") # ===== FINAL STATISTICS REPORT ===== custom_statistics = generate_final_statistics_report(report, test_statistics) # ===== EXPORT TO CSV (if enabled) ===== if EXPORT_STATISTICS_CSV and custom_statistics: # Generate CSV filename with timestamp (matching log file naming) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_base_name = f"{report.title()}_{timestamp}" # Use same folder as PDF report for all test outputs pdf_folder = report.get_pdf_folder() csv_path = export_statistics_to_csv(custom_statistics, csv_base_name, pdf_folder) # Also write JSON statistics file alongside CSV for structured consumption try: if csv_path and os.path.isdir(pdf_folder): json_filename = f"{csv_base_name}_statistics.json" json_path = os.path.join(pdf_folder, json_filename) with open(json_path, 'w', encoding='utf-8') as jf: json.dump(custom_statistics, jf, ensure_ascii=False, indent=2) logging.info(f"Statistics exported successfully to JSON: {json_path}") except Exception as e: logging.error(f"Failed to export statistics to JSON: {e}") logging.error(traceback.format_exc()) ############ END STEPS ############ #report.open_session('Post Conditions') power_grifo_off() #report.close_session() if terminal is not None: terminal.disconnect() return test_return except Exception as e: report.add_comment(f"Test terminated unexpectedly :{e}") return False finally: report.generate_pdf() # Notify GUI (if present) that test finished and where outputs are stored if gui_monitor: try: out_folder = report.get_pdf_folder() except Exception: out_folder = None if out_folder: try: gui_monitor.show_results(out_folder) logging.info("Test completed - notifying GUI of results") # If running non-interactively (stdin is not a TTY), automatically # request the GUI to shutdown to allow the process to exit cleanly. try: interactive = sys.stdin.isatty() except Exception: interactive = False if not interactive: logging.info("Non-interactive session detected - auto-closing GUI") try: # First request a GC to run on the GUI thread so that # any ImageTk/PhotoImage destructors that need to call # into Tcl are executed on the correct thread. try: if hasattr(gui_monitor, 'update_queue'): gui_monitor.update_queue.put(('gc_collect', {})) # give GUI thread a moment to process GC time.sleep(0.2) except Exception: pass # Use the public stop() method which enqueues shutdown and joins if hasattr(gui_monitor, 'stop'): gui_monitor.stop() else: # Fallback: request shutdown via queue if hasattr(gui_monitor, 'update_queue'): gui_monitor.update_queue.put(('shutdown', {})) except Exception: pass else: # Interactive session: wait for user to close GUI try: if hasattr(gui_monitor, 'thread') and gui_monitor.thread: gui_monitor.thread.join() except Exception: pass except Exception as e: logging.error(f"Error waiting for GUI: {e}") #-- --------------------------------------------------------------- if __name__ == '__main__': global gui_monitor signal.signal(signal.SIGINT, signal_handler) test_proc() # CRITICAL: For non-interactive automated runs, use os._exit(0) to avoid # Python's atexit cleanup phase which can trigger Tcl_AsyncDelete warnings # when Python destroys objects containing Tk references after GUI thread stopped. # This is safe because at this point: # - Test is complete # - PDF/CSV/JSON reports are written # - GUI has been closed via stop() # - All essential cleanup has been done try: import sys interactive = sys.stdin.isatty() except Exception: interactive = False if not interactive: # Non-interactive: Use os._exit to skip atexit cleanup import os import time time.sleep(0.1) # Brief pause for any final file flushes os._exit(0) # Exit immediately without Python cleanup # else: Interactive session, allow normal Python cleanup