PlatSim_Genova/TestEnvironment/scripts/GRIFO_M_PBIT.py

1919 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
GRIFO_M_PBIT.py - Automated Power-On BIT Test for GRIFO-F/TH Radar
This script performs comprehensive Built-In Test (BIT) verification with power cycling:
- Executes configurable number of test repetitions (default: 10)
- Power cycles radar between runs to simulate cold-start conditions
- Monitors B6 LRU (Line Replaceable Unit) status fields
- Performs detailed B8 diagnostic drill-down on real failures
- Detects 1553 bus communication loss for fast-fail behavior
- Generates comprehensive statistics report with timing analysis
Test Flow:
1. Power off radar (wait_before=1s, wait_after=4s for stabilization)
2. Power on radar (wait_after=100ms) and wait for initialization
3. Execute BIT and wait for completion (timeout: 182s)
4. Monitor 1553 bus health continuously during BIT execution
5. Verify all 12 B6 LRU status fields
6. If real failures detected, drill-down into 185 B8 diagnostic fields
7. Track statistics (timing, pass/fail counts, failure details)
8. Repeat for configured number of cycles
9. Generate final comprehensive report with aggregate statistics
Configuration Options:
NUMBER_OF_REPETITIONS: Number of test cycles to execute (default: 10)
PBIT_SEC_TIME: BIT completion timeout in seconds (default: 182s, matches target2)
COMM_LOSS_THRESHOLD: 1553 comm loss detection threshold in iterations (default: 20)
EXPORT_STATISTICS_CSV: Export statistics to CSV file (default: True)
FORCE_B8_DRILL_DOWN: If True, always perform B8 drill-down even when only
known failures are detected (matches target2 behavior
for complete SW requirement verification). Default: False
KNOWN_FAILURES: List of expected failures due to HW setup limitations
Power Cycling Timing (aligned with target2):
- Power OFF: wait_before=1s (stabilization before action), wait_after=4s (settle time)
- Power ON: wait_after=100ms (initialization delay)
Author: Test Automation Team
Date: 2026-01-29
Last Updated: 2026-02-02 (aligned with target2 timing and behavior)
"""
import __init__
import signal
import time,sys,os
import logging
import csv
import json
from datetime import datetime
from leo_grifo_common import *
from test_common_function import *
from leo_grifo_test_report import testReport
from leo_grifo_1553 import theGrifo1553
#import leo_grifo_serial_maintnance
from logger import logger_setup
import leo_grifo_terminal
import pdb
import traceback
# Optional auxiliary serial helper (external script)
try:
from serial_aux import send_serial_sequence
except Exception:
send_serial_sequence = None
NUMBER_OF_REPETITIONS = 10 # Total test cycles to execute (4 perfect runs = 40%)
PBIT_SEC_TIME = 182 # BIT completion timeout in seconds (target2 uses 182s)
COMM_LOSS_THRESHOLD = 20 # 1553 bus comm loss detection: iterations without msg count increase
EXPORT_STATISTICS_CSV = True # Export statistics to CSV file for Excel/external analysis
TARGET_DETECTION_TIMEOUT_SEC = 10.0 # Target detection timeout: max seconds to wait for target visibility
TELLBACK_VERIFY_TIMEOUT_SEC = 5.0 # Default timeout for verifying tellbacks (STBY/SILENCE)
TELLBACK_VERIFY_STEP_SEC = 0.5 # Poll step for tellback verification
TELLBACK_POST_SET_DELAY_SEC = 2.0 # Delay after setting STBY/SILENCE to allow radar to update tellbacks
STBY_POST_UNSET_DELAY_SEC = 2.0 # Delay after removing STBY to allow radar to exit standby
# Global flag: enable auxiliary serial helper by default (can be overridden by operator)
# Set to False to disable auxiliary serial communication from tests.
ENABLE_AUX_SERIAL = True
# ====================
# B8 DRILL-DOWN CONFIGURATION
# ====================
# FORCE_B8_DRILL_DOWN: If True, always perform B8 drill-down even when only known failures
# are detected. This matches target2 behavior where B8 is checked unconditionally to verify
# SW requirements and obtain complete step fail statistics.
#
# Default: False (optimized behavior - B8 only on real failures)
# Set to True: Replicate target2 behavior for complete SW requirement verification
FORCE_B8_DRILL_DOWN = True
# ====================
# KNOWN FAILURES CONFIGURATION
# ====================
# List of field names that are expected to fail due to HW test setup limitations.
# These failures are tracked but do not trigger B8 drill-down or test failure.
#
# Use case: When test HW setup lacks physical components (e.g., pedestal unit),
# certain status checks will always fail. Adding them here prevents false negatives.
#
# Format: Full field name from bit_fields tuple
# Note: Known failures are reported separately in statistics but don't affect test verdict
# Note: radar_fail_status is NOT in this list - it's an aggregate flag checked contextually
KNOWN_FAILURES = [
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status",
# Add more known HW setup limitations here as needed
]
interruptRequest = False # Global flag for graceful Ctrl-C handling
# ====================
# TEST STATISTICS TRACKING
# ====================
# Global dictionary to track statistics across all test repetitions.
# Populated during test execution and used to generate final comprehensive report.
#
# Structure:
# repetitions: List of dicts, one per run, containing:
# - repetition: Run number (1-based)
# - pbit_time: BIT completion time in seconds
# - bit_available: Boolean, True if BIT completed
# - success: Boolean, overall run result (pass/fail)
# - b6_total/pass/fail/known_fail: B6 LRU status check counts
# - b8_checked/pass/fail: B8 diagnostic check counts
# - failures: List of (field_name, value) tuples for real failures
# - known_failures: List of (field_name, value) tuples for expected failures
# total_runs: Counter for completed test runs
# successful_runs: Counter for runs with no real failures
# failed_runs: Counter for runs with real failures detected
test_statistics = {
'repetitions': [], # List of per-run statistics dictionaries
'total_runs': 0, # Total number of completed runs
'successful_runs': 0, # Number of runs that passed (only known failures allowed)
'failed_runs': 0, # Number of runs with real failures detected
}
def signal_handler(sig, frame):
"""Handle Ctrl-C signal for graceful test termination."""
global interruptRequest
logging.info("Ctrl-C detected, exiting gracefully...")
interruptRequest = True
def analyze_test_failures(stats):
"""
Analyze all test failures across runs and generate detailed failure frequency table.
Extracts test identifiers (e.g., SP1, TX10) from field names and aggregates failure
counts with descriptions, sorted by failure percentage (descending).
Args:
stats: test_statistics dictionary containing all run data
Returns:
list: List of dicts with keys:
- test_id: Test identifier (e.g., "SP1", "TX10", "AGC5")
- description: Human-readable test description
- full_field: Complete field name for reference
- occurrences: Number of times this test failed
- percentage: Failure rate as percentage of total runs
- runs_failed: List of run numbers where this test failed
"""
import re
test_failure_counts = {} # Key: (test_id, description, full_field), Value: list of run numbers
total_runs = stats['total_runs']
for run in stats['repetitions']:
run_num = run['repetition']
# Analyze both real failures and known failures (for complete picture)
all_failures = run.get('failures', []) + run.get('known_failures', [])
for failure_item in all_failures:
# Handle both tuple formats: (field, value) or (category, field, value)
if len(failure_item) >= 2:
field_name = failure_item[0] if len(failure_item) == 2 else failure_item[1]
else:
continue
# Extract test identifier from field name
# Pattern: "test_XX##_description" where XX is 1-3 letters, ## is 1-2 digits
# Examples: "test_sp1_", "test_tx10_", "test_agc5_", "test_is1_"
test_match = re.search(r'test_([a-z]{1,3})(\d{1,2})_(.+)', field_name, re.IGNORECASE)
if test_match:
prefix = test_match.group(1).upper() # SP, TX, AGC, etc.
number = test_match.group(2) # 1, 10, 5, etc.
description = test_match.group(3) # timer1_up, hv_ps_over_temperature_warning, etc.
test_id = f"{prefix}{number}" # "SP1", "TX10", etc.
# Clean up description: replace underscores with spaces, capitalize
clean_desc = description.replace('_', ' ').title()
key = (test_id, clean_desc, field_name)
if key not in test_failure_counts:
test_failure_counts[key] = []
test_failure_counts[key].append(run_num)
# Build result list
failure_analysis = []
for (test_id, description, full_field), run_list in test_failure_counts.items():
occurrences = len(run_list)
percentage = (occurrences / total_runs * 100) if total_runs > 0 else 0
failure_analysis.append({
'test_id': test_id,
'description': description,
'full_field': full_field,
'occurrences': occurrences,
'percentage': percentage,
'runs_failed': sorted(set(run_list)) # Unique, sorted run numbers
})
# Sort by percentage (descending), then by test_id (ascending) for ties
failure_analysis.sort(key=lambda x: (-x['percentage'], x['test_id']))
return failure_analysis
def generate_final_statistics_report(report, stats):
"""
Generate comprehensive final statistics report with professional PDF formatting.
Instead of using add_comment() which registers text as steps, this function
prepares structured data and passes it to the PDF generator for rendering
as dedicated sections with professional tables.
Produces professional test summary suitable for formal documentation and presentations.
Includes aggregate statistics, timing analysis, failure categorization, and test verdict.
Args:
report: testReport object for PDF generation
stats: test_statistics dictionary containing all run data
Report Sections (rendered as dedicated PDF chapters):
1. Per-Run Summary: Table with 1553 + Serial stats for each run
2. Global Aggregate: Combined statistics from all runs
3. Timing Analysis: PBIT performance metrics
4. Known Failures: HW setup limitations tracking
5. Test Failure Analysis: Detailed frequency table of failed tests (NEW)
Returns:
None (data is passed to report via set_custom_statistics)
"""
# Prepare structured data for PDF generation instead of ASCII art
# Calculate aggregate statistics
total_b6_checks = sum(r['b6_total'] for r in stats['repetitions'])
total_b6_pass = sum(r['b6_pass'] for r in stats['repetitions'])
total_b6_fail = sum(r['b6_fail'] for r in stats['repetitions'])
total_b6_known = sum(r['b6_known_fail'] for r in stats['repetitions'])
total_b8_checks = sum(r['b8_checked'] for r in stats['repetitions'])
total_b8_pass = sum(r['b8_pass'] for r in stats['repetitions'])
total_b8_fail = sum(r['b8_fail'] for r in stats['repetitions'])
total_serial_msgs = sum(r.get('serial_total', 0) for r in stats['repetitions'])
total_serial_errors = sum(r.get('serial_errors', 0) for r in stats['repetitions'])
total_serial_fatal = sum(r.get('serial_fatal', 0) for r in stats['repetitions'])
total_serial_recycles = sum(r.get('serial_recycles', 0) for r in stats['repetitions'])
# Calculate target detection statistics
target_tests = [r.get('target_detected', None) for r in stats['repetitions']]
target_detected_count = sum(1 for t in target_tests if t is True)
target_not_detected_count = sum(1 for t in target_tests if t is False)
target_pass_rate = (target_detected_count / len(target_tests) * 100) if target_tests else 0.0
target_times = [r.get('target_test_time', 0.0) for r in stats['repetitions'] if r.get('target_detected') is not None]
if target_times:
avg_target_time = sum(target_times) / len(target_times)
min_target_time = min(target_times)
max_target_time = max(target_times)
else:
avg_target_time = min_target_time = max_target_time = 0.0
# Calculate timing statistics
pbit_times = [r['pbit_time'] for r in stats['repetitions'] if r['bit_available']]
if pbit_times:
avg_pbit = sum(pbit_times) / len(pbit_times)
min_pbit = min(pbit_times)
max_pbit = max(pbit_times)
variance = sum((t - avg_pbit) ** 2 for t in pbit_times) / len(pbit_times)
std_dev = variance ** 0.5
else:
avg_pbit = min_pbit = max_pbit = std_dev = None
# Generate detailed test failure analysis
test_failure_analysis = analyze_test_failures(stats)
# Prepare structured data dictionary for PDF rendering
custom_statistics = {
'repetitions': stats['repetitions'], # Per-run data with all metrics
'aggregate': {
# Overall test summary
'total_runs': stats['total_runs'],
'successful_runs': stats['successful_runs'],
'failed_runs': stats['failed_runs'],
# 1553 Bus statistics
'total_b6_checks': total_b6_checks,
'total_b6_pass': total_b6_pass,
'total_b6_fail': total_b6_fail,
'total_b6_known': total_b6_known,
'total_b8_checks': total_b8_checks,
'total_b8_pass': total_b8_pass,
'total_b8_fail': total_b8_fail,
# Serial communication statistics
'total_serial_msgs': total_serial_msgs,
'total_serial_errors': total_serial_errors,
'total_serial_fatal': total_serial_fatal,
'total_serial_recycles': total_serial_recycles,
# Target detection statistics
'target_detected_count': target_detected_count,
'target_not_detected_count': target_not_detected_count,
'target_pass_rate': target_pass_rate,
'avg_target_time': avg_target_time,
'min_target_time': min_target_time,
'max_target_time': max_target_time,
# Timing analysis
'avg_pbit_time': avg_pbit,
'min_pbit_time': min_pbit,
'max_pbit_time': max_pbit,
'std_dev_pbit': std_dev,
},
'test_failure_analysis': test_failure_analysis # NEW: Detailed test failure frequency table
}
# Pass structured data to report for professional PDF rendering
# This will generate dedicated chapters with native PDF tables
# instead of mixing ASCII art with step execution logs
report.set_custom_statistics(custom_statistics)
# Log summary to console for immediate feedback
logging.info("="*90)
logging.info(" FINAL TEST STATISTICS SUMMARY")
logging.info("="*90)
logging.info(f"Total Runs: {stats['total_runs']}")
logging.info(f"Successful: {stats['successful_runs']} ({stats['successful_runs']/stats['total_runs']*100:.1f}%)")
logging.info(f"Failed: {stats['failed_runs']} ({stats['failed_runs']/stats['total_runs']*100:.1f}%)")
logging.info(f"B6 Checks: {total_b6_checks} (Pass: {total_b6_pass}, Fail: {total_b6_fail}, Known: {total_b6_known})")
logging.info(f"B8 Checks: {total_b8_checks} (Pass: {total_b8_pass}, Fail: {total_b8_fail})")
logging.info(f"Serial: {total_serial_msgs} messages ({total_serial_errors} errors, {total_serial_fatal} fatal, {total_serial_recycles} recycles)")
logging.info(f"Target: {target_detected_count} detected, {target_not_detected_count} not detected ({target_pass_rate:.1f}% pass rate)")
if avg_pbit is not None:
logging.info(f"PBIT Timing: avg={avg_pbit:.2f}s, min={min_pbit:.2f}s, max={max_pbit:.2f}s, σ={std_dev:.2f}s")
logging.info("="*90)
logging.info("Detailed statistics will be available in the PDF report")
# Return custom_statistics for optional CSV export
return custom_statistics
def export_statistics_to_csv(custom_statistics, test_name, output_folder):
"""
Export test statistics to CSV file for external analysis (Excel, etc.).
Creates a CSV file with three sections:
1. Per-Run Statistics: Detailed results for each run
2. Aggregate Statistics: Overall summary metrics
3. Problem Distribution: Analysis of failure types
Args:
custom_statistics: Dictionary with 'repetitions' and 'aggregate' data
test_name: Base name for the CSV file (e.g., "GRIFO_M_PBIT_20260129_153432")
output_folder: Absolute path to folder where CSV will be saved (same as PDF)
Returns:
Path to generated CSV file, or None if export failed
"""
try:
# Create output folder if it doesn't exist
if not os.path.exists(output_folder):
os.makedirs(output_folder)
logging.info(f"Created output folder: {output_folder}")
# Create CSV filename with absolute path
csv_filename = f"{test_name}_statistics.csv"
csv_path = os.path.join(output_folder, csv_filename)
logging.info(f"Exporting statistics to CSV: {csv_path}")
with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# Section 1: Per-Run Statistics
writer.writerow(['PER-RUN STATISTICS'])
writer.writerow([]) # Blank line
# Headers for per-run data (append detailed columns as JSON strings)
headers = [
'Run', 'Result', 'PBIT Time (s)', 'Start Time', 'End Time', 'Run Duration (s)',
'Scenario',
'B6 Total', 'B6 Pass', 'B6 Fail', 'B6 Known',
'B8 Checked', 'B8 Pass', 'B8 Fail',
'Serial Msgs', 'Serial Errors', 'Serial Fatal', 'Serial Recycles',
'Real Failures', 'Known Failures', 'Target Simulated',
'Failures Detail (JSON)', 'Known Failures Detail (JSON)', 'Serial Details (JSON)'
]
writer.writerow(headers)
# Per-run data rows
for run in custom_statistics['repetitions']:
# Prepare detailed JSON fields for precise per-run analysis
# failures: list of tuples (field, error) or (category, field, error)
failures = []
for item in run.get('failures', []):
try:
# normalize tuple/list entries
if isinstance(item, (list, tuple)):
failures.append(list(item))
else:
failures.append([str(item)])
except Exception:
failures.append([str(item)])
known_failures = []
for item in run.get('known_failures', []):
try:
if isinstance(item, (list, tuple)):
known_failures.append(list(item))
else:
known_failures.append([str(item)])
except Exception:
known_failures.append([str(item)])
serial_details = run.get('serial_details', [])
row = [
run['repetition'],
'PASS' if run['success'] else 'FAIL',
f"{run.get('pbit_time', 0):.2f}",
run.get('start_time', ''),
run.get('end_time', ''),
f"{run.get('run_duration', 0) if run.get('run_duration') is not None else 0:.2f}",
run.get('scenario', ''),
run.get('b6_total', 0),
run.get('b6_pass', 0),
run.get('b6_fail', 0),
run.get('b6_known_fail', 0),
run.get('b8_checked', 0),
run.get('b8_pass', 0),
run.get('b8_fail', 0),
run.get('serial_total', 0),
run.get('serial_errors', 0),
run.get('serial_fatal', 0),
run.get('serial_recycles', 0),
len(run.get('failures', [])),
len(run.get('known_failures', [])),
json.dumps(run.get('target_simulated', None), ensure_ascii=False),
json.dumps(failures, ensure_ascii=False),
json.dumps(known_failures, ensure_ascii=False),
json.dumps(serial_details, ensure_ascii=False),
]
writer.writerow(row)
writer.writerow([]) # Blank line
writer.writerow([]) # Extra blank line
# Section 2: Aggregate Statistics
writer.writerow(['AGGREGATE STATISTICS'])
writer.writerow([]) # Blank line
writer.writerow(['Metric', 'Value'])
agg = custom_statistics['aggregate']
# Overall metrics
writer.writerow(['Total Runs', agg['total_runs']])
writer.writerow(['Successful Runs', agg['successful_runs']])
writer.writerow(['Failed Runs', agg['failed_runs']])
writer.writerow(['Success Rate (%)', f"{agg['successful_runs']/agg['total_runs']*100:.1f}" if agg['total_runs'] > 0 else "0.0"])
writer.writerow([]) # Blank line
# B6 LRU Status
writer.writerow(['B6 Total Checks', agg['total_b6_checks']])
writer.writerow(['B6 Pass', agg['total_b6_pass']])
writer.writerow(['B6 Fail', agg['total_b6_fail']])
writer.writerow(['B6 Known Fail', agg['total_b6_known']])
writer.writerow([]) # Blank line
# B8 Diagnostics
writer.writerow(['B8 Total Checks', agg['total_b8_checks']])
writer.writerow(['B8 Pass', agg['total_b8_pass']])
writer.writerow(['B8 Fail', agg['total_b8_fail']])
writer.writerow([]) # Blank line
# Serial Communication
writer.writerow(['Serial Total Messages', agg['total_serial_msgs']])
writer.writerow(['Serial Errors', agg['total_serial_errors']])
writer.writerow(['Serial Fatal', agg['total_serial_fatal']])
writer.writerow(['Serial Recycles', agg['total_serial_recycles']])
writer.writerow([]) # Blank line
# Timing Statistics
writer.writerow(['Average PBIT Time (s)', f"{agg['avg_pbit_time']:.2f}" if agg['avg_pbit_time'] is not None else "N/A"])
writer.writerow(['Min PBIT Time (s)', f"{agg['min_pbit_time']:.2f}" if agg['min_pbit_time'] is not None else "N/A"])
writer.writerow(['Max PBIT Time (s)', f"{agg['max_pbit_time']:.2f}" if agg['max_pbit_time'] is not None else "N/A"])
writer.writerow(['Std Dev PBIT Time (s)', f"{agg['std_dev_pbit']:.2f}" if agg['std_dev_pbit'] is not None else "N/A"])
writer.writerow([]) # Blank line
writer.writerow([]) # Extra blank line
# Section 3: Known Failures (Ignored)
writer.writerow(['KNOWN FAILURES (IGNORED IN STATISTICS)'])
writer.writerow([])
writer.writerow(['These failures are expected due to HW test setup limitations and do not affect test verdict:'])
writer.writerow([])
# List known failures from KNOWN_FAILURES constant
from GRIFO_M_PBIT import KNOWN_FAILURES
for known_field in KNOWN_FAILURES:
# Extract clean field name
if 'RdrHealthStatusAndBitReport_' in known_field:
clean_name = known_field.split('RdrHealthStatusAndBitReport_')[-1]
else:
clean_name = known_field.split('_')[-1] if '_' in known_field else known_field
clean_name = clean_name.replace('_', ' ').title()
writer.writerow([f" - {clean_name}"])
writer.writerow([]) # Blank line
writer.writerow([]) # Extra blank line
# Section 4: Test Failure Analysis (NEW - Detailed test-by-test breakdown)
writer.writerow(['TEST FAILURE ANALYSIS'])
writer.writerow([]) # Blank line
writer.writerow(['Detailed breakdown of individual test failures, sorted by failure percentage (highest first)'])
writer.writerow([]) # Blank line
test_failure_analysis = custom_statistics.get('test_failure_analysis', [])
if test_failure_analysis:
# Table headers
writer.writerow(['Test ID', 'Description', 'Occurrences', '% of Total Runs', 'Runs Where Failed', 'Full Field Name'])
for test in test_failure_analysis:
runs_str = ', '.join(map(str, test['runs_failed']))
writer.writerow([
test['test_id'],
test['description'],
test['occurrences'],
f"{test['percentage']:.1f}%",
runs_str,
test['full_field']
])
else:
writer.writerow(['No test failures detected - all tests passed!'])
writer.writerow([]) # Blank line
writer.writerow([]) # Extra blank line
# Section 5: Problem Distribution Analysis (category-level summary)
writer.writerow(['PROBLEM DISTRIBUTION ANALYSIS (Category Level)'])
writer.writerow([]) # Blank line
# Analyze problem types from repetitions (same logic as PDF)
problem_counts = {}
total_runs = agg['total_runs']
perfect_runs = agg['successful_runs']
for run in custom_statistics['repetitions']:
if not run['success']:
# Extract FULL field names from failures (not just last parts)
for field, value in run['failures']:
# Remove common prefix but keep full field identifier
# Example: "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"
# -> "processor_status"
if 'RdrHealthStatusAndBitReport_' in field:
# Extract everything after message name
test_name_clean = field.split('RdrHealthStatusAndBitReport_')[-1]
elif '_' in field and len(field.split('_')) > 3:
# For other messages, keep last 4 parts for context
parts = field.split('_')
test_name_clean = '_'.join(parts[-4:])
else:
test_name_clean = field
# Clean up for display (capitalize, keep underscores for clarity)
test_name_clean = test_name_clean.replace('_', ' ').title()
problem_counts[test_name_clean] = problem_counts.get(test_name_clean, 0) + 1
# Serial problems
if run.get('serial_fatal', 0) > 0:
problem_counts['Serial Communication (Fatal)'] = problem_counts.get('Serial Communication (Fatal)', 0) + 1
if run.get('serial_recycles', 0) > 1:
problem_counts['System Instability (Recycles)'] = problem_counts.get('System Instability (Recycles)', 0) + 1
if problem_counts:
# Sort by frequency (descending)
sorted_problems = sorted(problem_counts.items(), key=lambda x: x[1], reverse=True)
writer.writerow(['Problem Type', 'Occurrences', '% of Total Runs', '% of Failed Runs'])
for problem, count in sorted_problems:
pct_total = (count / total_runs * 100) if total_runs > 0 else 0
pct_failed = (count / (total_runs - perfect_runs) * 100) if (total_runs - perfect_runs) > 0 else 0
writer.writerow([
problem,
count,
f"{pct_total:.1f}",
f"{pct_failed:.1f}"
])
else:
writer.writerow(['No problems detected - all runs were successful!'])
logging.info(f"Statistics exported successfully to: {csv_path}")
return csv_path
except Exception as e:
logging.error(f"Failed to export statistics to CSV: {e}")
logging.error(traceback.format_exc())
return None
def tgt_gen(interface, timeout_sec=None):
"""
Target generation test: verifies radar can detect simulated target within timeout.
Args:
interface: Radar interface object
timeout_sec: Maximum seconds to wait for target (default: TARGET_DETECTION_TIMEOUT_SEC)
Returns:
bool: True if target detected within timeout, False otherwise
"""
if timeout_sec is None:
timeout_sec = TARGET_DETECTION_TIMEOUT_SEC
logging.info(f'tgt_gen() - timeout: {timeout_sec}s')
period=10 #ms
expeced_range=1000
pcnt=0
max_iterations = int(timeout_sec / (period / 1000.0)) # Calculate iterations from timeout
for i in range(max_iterations):
time.sleep(0.010)
cnt = interface.getSingleMessageReceivedSz("B9")
t_num=interface.getMessageFieldValue("B9", "b9_t_num")
t_rng=interface.getMessageFieldValue("B9", "b9_t1_rng")
if (i % 10)==0:
# Protect against None/invalid values from communication failures
if cnt is not None and cnt >= 0:
dcnt=cnt-pcnt
pcnt=cnt
else:
dcnt=-1
logging.info(f'TgtMsg: {cnt} {dcnt}')
if t_num is not None and t_num > 0:
logging.info(f'Tgt: {t_num} @ {t_rng}')
try:
# record last detected target info on interface for reporting
if hasattr(interface, '_last_target') is False:
interface._last_target = {}
interface._last_target = {'distance': t_rng, 'found_at_iter': i}
except Exception:
pass
ret_proc_sts, err= check(theGrifo1553,(1,2), "B9", "b9_t_num")
check(theGrifo1553,(1179, 1186), "B9", "b9_t1_rng")
return True # Target found
if interruptRequest is True:
break
return False # Target not found
def set_radar_operational(interface, range_nm: float = 40.0, intensity: int = 127,
master_mode: object = 'RWS', gm_submode: int = 0):
"""
Try to set radar in an operational configuration before target acquisition.
This function attempts a series of non-invasive set operations using
`setValue()` on commonly used message/field names. It is defensive: each
attempt is wrapped in try/except and failures are logged but do not raise.
Args:
interface: the equipment interface object (theGrifo1553-like)
range_nm: desired range scale in nautical miles (default: 40)
intensity: symbology intensity value (0-255) default 127
master_mode: radar master mode value (string or numeric) - best-effort
gm_submode: GM submode numeric value (default 0)
Returns:
dict: mapping of attempted (msg,field) -> (success(bool), error_or_desc)
"""
results = {}
# helper to try setValue and record result
def _try_set(val, msg, field, **kwargs):
try:
ret, err = setValue(interface, val, msg, field, **kwargs)
results[(msg, field)] = (bool(ret), err)
logging.info(f"Set {msg} {field} -> {val} : {ret}")
except Exception as e:
results[(msg, field)] = (False, f"EXC: {e}")
logging.warning(f"Failed to set {msg}.{field}: {e}")
# First: ensure radar is in STANDBY and SILENCE to avoid unintended radiation
_try_set(1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_silence", commitChanges=True)
_try_set(1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True)
# Allow radar time to process STBY/SILENCE and update tellbacks
try:
time.sleep(TELLBACK_POST_SET_DELAY_SEC)
except Exception:
pass
# 1) Symbology intensity (A1)
_try_set(intensity, "A1_MsgRdrSettingsAndParameters", "settings_RDROperationalSettings_rdr_symbology_intensity", commitChanges=True)
# set target history level to TARGET_HISTORY_LEVEL_04 if present
_try_set(3, "A1_MsgRdrSettingsAndParameters", "settings_RDROperationalSettings_target_history", commitChanges=True)
# 2) Set range scale using explicit ICD field name (A2 param1 range selection)
try:
range_m = float(range_nm) * 1852.0
except Exception:
range_m = None
# Explicitly set the param1 range scale selection in A2 (NM40 -> value 1 in ICD)
try:
_try_set(1, "A2_MsgRdrOperationCommand", "param1_RdrFunAndParam1_range_scale_selection", commitChanges=True)
except Exception:
# fallback: try writing raw meter value into A1 param_value if needed
if range_m is not None:
_try_set(int(round(range_m)), "A1_MsgRdrSettingsAndParameters", "param_value_ParamValue_raw_raw0", commitChanges=True)
# 3) Master mode and submode (A2)
_try_set(master_mode, "A2_MsgRdrModeCommand", "rdr_mode_command_master_mode", commitChanges=True)
_try_set(gm_submode, "A2_MsgRdrModeCommand", "param1_gm_submode", commitChanges=True)
# 4) Validity and slew (A4)
_try_set(0, "A4_MsgValidityAndSlew", "validity_and_slew_raw", commitChanges=True)
return results
def verify_stby_and_silence(interface, timeout_sec: float = 5.0, step: float = 0.5):
"""
Verify via tellback that STBY and SILENCE (radiation OFF) are active.
Returns a tuple (stby_ok, silence_ok, details) where details is a dict
containing the raw check error descriptions (or None).
"""
details = {}
stby_field = "rdr_mode_tellback_RdrStatusTellback_stby_tellback"
rf_field = "rdr_mode_tellback_RdrStatusTellback_rf_radiation_status"
stby_ok = False
silence_ok = False
try:
stby_ok, stby_err = check(interface, "STBY_ON", "B7_MsgRdrStatusTellback", stby_field, timeout=timeout_sec, step=step)
details['stby_err'] = stby_err
except Exception as e:
stby_ok = False
details['stby_err'] = f"EXC: {e}"
try:
# Expect radiation to be OFF when silence is requested -> RADIATION_OFF
silence_ok, sil_err = check(interface, "RADIATION_OFF", "B7_MsgRdrStatusTellback", rf_field, timeout=timeout_sec, step=step)
details['silence_err'] = sil_err
except Exception as e:
silence_ok = False
details['silence_err'] = f"EXC: {e}"
return stby_ok, silence_ok, details
def tgt_gen_alone(interface):
interface.logStart(3,os.path.dirname(sys.argv[0]))
target_found = False
for n in range(10*1000):
logging.info(f'tgt_gen_alone(): {n}')
target_found = tgt_gen(interface)
if target_found or interruptRequest is True:
break
interface.logStop()
return target_found
def ask_production_config():
"""
Ask user for test configuration in production mode (target reale).
Returns:
tuple: (num_runs, gui_enabled)
"""
print("")
print("=" * 80)
print("GRIFO PBIT - PRODUCTION MODE (Target Reale)")
print("=" * 80)
print("")
# Check if GUI is available
try:
from GRIFO_M_PBIT_gui import TestMonitorGUI
gui_available = True
except ImportError:
gui_available = False
# Ask about GUI first (if available)
gui_enabled = False
if gui_available:
while True:
try:
gui_input = input("Enable real-time GUI monitor? (y/n) [y]: ").strip().lower()
if gui_input in ['', 'y', 'yes']:
gui_enabled = True
print("✓ GUI monitor will be enabled")
break
elif gui_input in ['n', 'no']:
gui_enabled = False
print("✓ GUI monitor disabled (console only)")
break
else:
print("Please enter 'y' or 'n'")
except (KeyboardInterrupt, EOFError):
gui_enabled = False
break
print("")
else:
print("[INFO] GUI monitor not available (tkinter import failed)")
print("")
# Ask for number of runs
while True:
try:
user_input = input(f"How many test runs do you want to execute? (minimum 1) [{NUMBER_OF_REPETITIONS}]: ").strip()
if user_input == '':
num_runs = NUMBER_OF_REPETITIONS
break
num_runs = int(user_input)
if num_runs < 1:
print(f"Error: Number of runs must be at least 1. You entered: {num_runs}")
continue
break
except ValueError:
print(f"Error: Invalid input. Please enter a number.")
continue
except (KeyboardInterrupt, EOFError):
print("\n\n[INFO] Using default value")
num_runs = NUMBER_OF_REPETITIONS
break
# Ask whether to perform target validation (acquire target on 1553) before runs
# Default is 'n' to avoid unexpected hardware operations unless operator requests it
run_on_target = False
while True:
try:
target_input = input("Perform target acquisition on real hardware before runs? (y/n) [n]: ").strip().lower()
if target_input in ['', 'n', 'no']:
run_on_target = False
break
if target_input in ['y', 'yes']:
run_on_target = True
break
print("Please enter 'y' or 'n'")
except (KeyboardInterrupt, EOFError):
run_on_target = False
break
print("")
print(f"✓ Configured for {num_runs} test run(s)")
print("")
print("=" * 80)
print("")
return num_runs, gui_enabled, run_on_target
def test_proc():
# ========== SIMULATION MODE SUPPORT ==========
# Enable test execution without hardware using --simulate flag
# Mock implementation in GRIFO_M_PBIT_mock.py provides simulated interfaces
global gui_monitor
gui_monitor = None # Reference to GUI monitor (if available)
if '--simulate' in sys.argv:
from GRIFO_M_PBIT_mock import initialize_simulation, setup_simulation, create_mock_terminal
import GRIFO_M_PBIT_mock
# Check if already initialized by launcher to avoid double initialization
if GRIFO_M_PBIT_mock._requested_runs is None:
# Not initialized yet, do it now (direct execution scenario)
initialize_simulation()
setup_simulation()
# else: already initialized by launcher, skip
# Get GUI reference if enabled
gui_monitor = GRIFO_M_PBIT_mock._gui_monitor
use_mock_terminal = True
else:
# ========== PRODUCTION MODE (Target Reale) ==========
# Ask user for configuration (number of runs, GUI, and whether to run target acquisition)
runs_total, gui_enabled, run_on_target = ask_production_config()
# Initialize GUI if enabled
if gui_enabled:
try:
from GRIFO_M_PBIT_gui import TestMonitorGUI
gui_monitor = TestMonitorGUI()
gui_monitor.start()
gui_monitor.update_status(run_total=runs_total)
gui_monitor.log_event('info', f'Production mode - {runs_total} runs configured')
try:
gui_monitor.update_scenario(
name='Production Run',
description='Executing test on real hardware (target).',
expected_failures=[],
expected_passes=[],
notes='Running in production mode'
)
except Exception:
pass
logging.info("GUI monitor started successfully")
except Exception as e:
logging.warning(f"Failed to start GUI monitor: {e}")
gui_monitor = None
use_mock_terminal = False
# Determine total runs to execute. In simulate mode, prefer user-requested runs from mock.
if use_mock_terminal:
try:
runs_total = GRIFO_M_PBIT_mock._requested_runs if GRIFO_M_PBIT_mock._requested_runs is not None else NUMBER_OF_REPETITIONS
except Exception:
runs_total = NUMBER_OF_REPETITIONS
# In production mode, runs_total is already set by ask_production_config()
# ========== END SIMULATION SUPPORT ==========
global report, test_statistics
# Complete bit_fields: All B6 LRU status + All B8 degradation/SRU/test fields
# Total: 185 fields (12 B6 status + 12 B8 degradation + 43 B8 SRU + 118 B8 tests)
bit_fields = (
# ===== B6: LRU Status Fields =====
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_array_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pressurization_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_over_temperature_alarm",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_receiver_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_rx_front_end_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_servoloop_over_temperature_alarm",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_servoloop_status",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_over_temperature_alarm",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_status",
# ===== B8: Degradation Conditions =====
"degradation_conditions_w1_DegradationConditionsW1_bcn_fail",
"degradation_conditions_w1_DegradationConditionsW1_gm_rbm_sea1_ta_wa_fail",
"degradation_conditions_w1_DegradationConditionsW1_group1_fail",
"degradation_conditions_w1_DegradationConditionsW1_group2_fail",
"degradation_conditions_w1_DegradationConditionsW1_group3_fail",
"degradation_conditions_w1_DegradationConditionsW1_group4_fail",
"degradation_conditions_w1_DegradationConditionsW1_group5_fail",
"degradation_conditions_w1_DegradationConditionsW1_hr_modes_and_gm_dbs_fail",
"degradation_conditions_w1_DegradationConditionsW1_no_rdr_symbology",
"degradation_conditions_w1_DegradationConditionsW1_not_identified_rdr_fail",
"degradation_conditions_w1_DegradationConditionsW1_selected_channel_fail",
"degradation_conditions_w1_DegradationConditionsW1_total_rdr_fail",
# ===== B8: SRU Failure Locations =====
"failure_location_pedestal_FailureLocationPedestal_sru1_gimbal",
"failure_location_pedestal_FailureLocationPedestal_sru2_waveguide",
"failure_location_pedestal_FailureLocationPedestal_sru3_waveguide",
"failure_location_pedestal_FailureLocationPedestal_sru4_delta_guard_lna_switch",
"failure_location_pedestal_FailureLocationPedestal_sru5_waveguide_switch",
"failure_location_processor_FailureLocationProcessor_sru10_main_computer",
"failure_location_processor_FailureLocationProcessor_sru11_graphic_computer",
"failure_location_processor_FailureLocationProcessor_sru12_power_supply",
"failure_location_processor_FailureLocationProcessor_sru13_det_exp",
"failure_location_processor_FailureLocationProcessor_sru14_rx_module",
"failure_location_processor_FailureLocationProcessor_sru1_motherboard_chassis",
"failure_location_processor_FailureLocationProcessor_sru2_mti_fft",
"failure_location_processor_FailureLocationProcessor_sru3_dsp0",
"failure_location_processor_FailureLocationProcessor_sru4_dsp1",
"failure_location_processor_FailureLocationProcessor_sru5_cfar_px_ctrl",
"failure_location_processor_FailureLocationProcessor_sru6_timer",
"failure_location_processor_FailureLocationProcessor_sru7_post_processor",
"failure_location_processor_FailureLocationProcessor_sru8_agc",
"failure_location_processor_FailureLocationProcessor_sru9_esa_if",
"failure_location_receiver_FailureLocationReceiver_sru1_chassis",
"failure_location_receiver_FailureLocationReceiver_sru2_uhf_assy",
"failure_location_receiver_FailureLocationReceiver_sru3_synthesizer",
"failure_location_receiver_FailureLocationReceiver_sru4_delta_guard_down_converter",
"failure_location_receiver_FailureLocationReceiver_sru5_sum_down_converter",
"failure_location_receiver_FailureLocationReceiver_sru6_lo_distributor",
"failure_location_receiver_FailureLocationReceiver_sru7_up_converter",
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru1_chassis",
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru2_delta_guard_lna",
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru3_sum_act_prot_lna",
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru4_4port_circulator",
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru5_stc_delta_guard",
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru5_stc_sum",
"failure_location_servoloop_FailureLocationServoloop_sru1_chassis",
"failure_location_servoloop_FailureLocationServoloop_sru2_power_supply",
"failure_location_servoloop_FailureLocationServoloop_sru3_digital_controller",
"failure_location_transmitter_FailureLocationTransmitter_sru1_chassis",
"failure_location_transmitter_FailureLocationTransmitter_sru2_rex_f_tx",
"failure_location_transmitter_FailureLocationTransmitter_sru3_power_supply",
"failure_location_transmitter_FailureLocationTransmitter_sru4_valve_el_twt_tx",
"failure_location_transmitter_FailureLocationTransmitter_sru5_rf_driver",
"failure_location_transmitter_FailureLocationTransmitter_sru6_controller_tx",
"failure_location_transmitter_FailureLocationTransmitter_sru7_hv_power_supply",
"failure_location_transmitter_FailureLocationTransmitter_sru8_eht_power_supply",
# ===== B8: All Test Results =====
"agc_test_results_AGCTestResults_test_agc10_pulse_compressor_interface",
"agc_test_results_AGCTestResults_test_agc11_dp_interface",
"agc_test_results_AGCTestResults_test_agc13_taxi_running",
"agc_test_results_AGCTestResults_test_agc14_external_xyp_ram",
"agc_test_results_AGCTestResults_test_agc15_servoloop_interface",
"agc_test_results_AGCTestResults_test_agc1_internal_xyp_ram",
"agc_test_results_AGCTestResults_test_agc2_external_xyp_ram",
"agc_test_results_AGCTestResults_test_agc5_dual_port_ram",
"agc_test_results_AGCTestResults_test_agc6_agc_machine",
"agc_test_results_AGCTestResults_test_agc7_sat_machine",
"agc_test_results_AGCTestResults_test_agc9_c_ram_xy_checksum",
"data_processor_test_results_DataProcessorTestResults_test_dp10_video_memory",
"data_processor_test_results_DataProcessorTestResults_test_dp11_video_unit",
"data_processor_test_results_DataProcessorTestResults_test_dp12_transputer_unit",
"data_processor_test_results_DataProcessorTestResults_test_dp13_scan_converter_polar_memory",
"data_processor_test_results_DataProcessorTestResults_test_dp14_scan_converter_format_converter",
"data_processor_test_results_DataProcessorTestResults_test_dp1_486_cpu_tests",
"data_processor_test_results_DataProcessorTestResults_test_dp2_486_interfaces_with_r3000_gc",
"data_processor_test_results_DataProcessorTestResults_test_dp3_486_interface_with_slc",
"data_processor_test_results_DataProcessorTestResults_test_dp4_slc_communications",
"data_processor_test_results_DataProcessorTestResults_test_dp5_r3000_cpu_tests",
"data_processor_test_results_DataProcessorTestResults_test_dp6_r3000_interfaces",
"data_processor_test_results_DataProcessorTestResults_test_dp7_1553_and_discretes",
"data_processor_test_results_DataProcessorTestResults_test_dp8_graphic_cpu",
"data_processor_test_results_DataProcessorTestResults_test_dp9_graphic_processors",
"integrated_system_test_results_IntegratedSystemTestResults_array_status",
"integrated_system_test_results_IntegratedSystemTestResults_cal_delta_channel_fail",
"integrated_system_test_results_IntegratedSystemTestResults_cal_injection_fail",
"integrated_system_test_results_IntegratedSystemTestResults_cal_noise_fail",
"integrated_system_test_results_IntegratedSystemTestResults_pedestal_status",
"integrated_system_test_results_IntegratedSystemTestResults_processor_status",
"integrated_system_test_results_IntegratedSystemTestResults_receiver_status",
"integrated_system_test_results_IntegratedSystemTestResults_rx_frontend_status",
"integrated_system_test_results_IntegratedSystemTestResults_servoloop_status",
"integrated_system_test_results_IntegratedSystemTestResults_test_is1_upconverter_chain_levels",
"integrated_system_test_results_IntegratedSystemTestResults_test_is2_downconverter_chain_levels",
"integrated_system_test_results_IntegratedSystemTestResults_test_is3_antenna_status_inconsistent",
"integrated_system_test_results_IntegratedSystemTestResults_test_is4_tx_status_inconsistent",
"integrated_system_test_results_IntegratedSystemTestResults_test_is5_tx_power_level",
"integrated_system_test_results_IntegratedSystemTestResults_transmitter_status",
"post_processor_test_results_PostProcessorTestResults_test_pp1_master_dsp",
"post_processor_test_results_PostProcessorTestResults_test_pp2_interface_card",
"post_processor_test_results_PostProcessorTestResults_test_pp3_cpu_cards",
"post_processor_test_results_PostProcessorTestResults_test_pp4_dma_bus",
"post_processor_test_results_PostProcessorTestResults_test_pp5_sp_interface",
"post_processor_test_results_PostProcessorTestResults_test_pp6_dp_interface",
"post_processor_test_results_PostProcessorTestResults_test_pp7_scan_converter_interface",
"post_processor_test_results_PostProcessorTestResults_test_pp8_agc_interface",
"power_supply_test_results_PowerSupplyTestResults_test_ps1_power_supply",
"power_supply_test_results_PowerSupplyTestResults_test_ps2_over_temperature",
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_fe1_lna",
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_fe2_agc_attenuators",
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx1_synthesizer_commands",
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx2_synthesizer_internal_tests",
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx3_uhf_oscillator_level",
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx4_downconverter_lo_level",
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx5_upconverter_lo_level",
"rx_module_test_results_RxModuleTestResults_test_rm16_calibration_sum_channel_fail",
"rx_module_test_results_RxModuleTestResults_test_rm1_master_clock_level",
"rx_module_test_results_RxModuleTestResults_test_rm2_expander_level",
"rx_module_test_results_RxModuleTestResults_test_rm3_sum_channel_down_converter",
"rx_module_test_results_RxModuleTestResults_test_rm4_dg_channel_down_converter",
"rx_module_test_results_RxModuleTestResults_test_rm5_noise_attenuators",
"servoloop_test_results_ServoloopTestResults_test_sl10_agc_control",
"servoloop_test_results_ServoloopTestResults_test_sl11_ad",
"servoloop_test_results_ServoloopTestResults_test_sl12_das",
"servoloop_test_results_ServoloopTestResults_test_sl13_serial_communications",
"servoloop_test_results_ServoloopTestResults_test_sl14_taxi_interface",
"servoloop_test_results_ServoloopTestResults_test_sl15_pedestal_centre_scan_location",
"servoloop_test_results_ServoloopTestResults_test_sl1_low_voltage_power_supply",
"servoloop_test_results_ServoloopTestResults_test_sl2_high_voltage_power_supply",
"servoloop_test_results_ServoloopTestResults_test_sl3_motor_drivers",
"servoloop_test_results_ServoloopTestResults_test_sl4_resolvers_power_supply",
"servoloop_test_results_ServoloopTestResults_test_sl5_waveguide_switch",
"servoloop_test_results_ServoloopTestResults_test_sl6_over_temperature",
"servoloop_test_results_ServoloopTestResults_test_sl7_resolver_to_digital_conv",
"servoloop_test_results_ServoloopTestResults_test_sl8_position_loop_error",
"servoloop_test_results_ServoloopTestResults_test_sl9_microprocessor",
"signal_processor_test_results_SignalProcessorTestResults_test_sp10_board_overall",
"signal_processor_test_results_SignalProcessorTestResults_test_sp11_attenuatori_antenna",
"signal_processor_test_results_SignalProcessorTestResults_test_sp14_external_sp_if",
"signal_processor_test_results_SignalProcessorTestResults_test_sp16_bcn",
"signal_processor_test_results_SignalProcessorTestResults_test_sp1_timer1_up",
"signal_processor_test_results_SignalProcessorTestResults_test_sp2_timer_dma_pxc_if",
"signal_processor_test_results_SignalProcessorTestResults_test_sp3_timer_internal",
"signal_processor_test_results_SignalProcessorTestResults_test_sp4_px_ctrl_comm",
"signal_processor_test_results_SignalProcessorTestResults_test_sp5_video1_without_ad",
"signal_processor_test_results_SignalProcessorTestResults_test_sp6_video1_with_ad",
"signal_processor_test_results_SignalProcessorTestResults_test_sp7_video2_ad_sync",
"signal_processor_test_results_SignalProcessorTestResults_test_sp8_video2_timer_sync",
"signal_processor_test_results_SignalProcessorTestResults_test_sp9_ad_da",
"signal_processor_test_results_SignalProcessorTestResults_test_sp9b_wideband_expander",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx10_hv_ps_over_temperature_warning",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx11_twt_helix_over_current",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx12_cathode_to_helix_arc",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx13_twt_over_temperature_hazard",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx14_twt_over_temperature_warning",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx15_cathode_under_voltage",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx16_cathode_over_voltage",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx1_microprocessors",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx2_tx_rf_input",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx3_twt_rf_input",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx4_twt_rf_output",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx5_tx_rf_output_level",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx6_vswr",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx7_three_phase_input_power",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx8_low_voltage_power_supplies",
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx9_hv_ps_over_temperature_hazard",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx17_collector_under_voltage",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx18_collector_over_voltage",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx19_rectified_voltage",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx20_cathode_inv_current_fail",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx21_collector_inv_current_fail",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx22_waveguide_pressurization",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx23_grid_window_over_duty_alt",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx24_floating_deck_fail",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx25_floating_deck_ps_fail",
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx26_grid_window_over_duty",
)
# ====================
# BIT FIELDS CATEGORIZATION
# ====================
# Dictionary mapping category names to field indices in bit_fields tuple.
# Used for organized drill-down reporting when B6 failures trigger B8 verification.
#
# Categories:
# B6_LRU_Status: 12 Line Replaceable Unit status fields (always checked)
# B8_Degradation: 12 system degradation condition flags
# B8_SRU_*: 43 Shop Replaceable Unit failure location flags (6 subsystems)
# B8_Test_*: 118 detailed test result fields (10 test types)
#
# Total: 185 diagnostic fields providing complete radar health visibility
bit_fields_categories = {
'B6_LRU_Status': bit_fields[0:12],
'B8_Degradation': bit_fields[12:24],
'B8_SRU_Pedestal': bit_fields[24:29],
'B8_SRU_Processor': bit_fields[29:43],
'B8_SRU_Receiver': bit_fields[43:50],
'B8_SRU_RxFrontend': bit_fields[50:56],
'B8_SRU_Servoloop': bit_fields[56:59],
'B8_SRU_Transmitter': bit_fields[59:67],
'B8_Test_AGC': bit_fields[67:78],
'B8_Test_DataProcessor': bit_fields[78:92],
'B8_Test_IntegratedSystem': bit_fields[92:107],
'B8_Test_PostProcessor': bit_fields[107:115],
'B8_Test_PowerSupply': bit_fields[115:117],
'B8_Test_Receiver': bit_fields[117:124],
'B8_Test_RxModule': bit_fields[124:130],
'B8_Test_Servoloop': bit_fields[130:145],
'B8_Test_SignalProcessor': bit_fields[145:159],
'B8_Test_Transmitter': bit_fields[159:185],
}
logger_setup('GRIFO_M_PBIT.log')
report = testReport(sys.argv[0])
interface = theGrifo1553.getInterface()
# Create serial terminal (real or mock based on simulation mode)
if use_mock_terminal:
terminal = create_mock_terminal()
else:
terminal = leo_grifo_terminal.GrifoSerialTerminal()
terminal.connect()
# If running in simulation, the mock module may have configured run_on_target
if use_mock_terminal:
try:
run_on_target = GRIFO_M_PBIT_mock._run_on_target
except Exception:
run_on_target = False
test_return = True
try:
#report.open_session('Pre Conditions')
#power_grifo_off()
#report.close_session()
############ Test Execution ############
#report.open_session('Test Execution')
report.add_comment("The Test Operator check if the failure BIT in B6_MsgRdrSettingsAndParametersTellback changes ...")
# Optionally perform target acquisition on real hardware (or simulated) before running repetitions
try:
if 'run_on_target' in locals() and run_on_target:
if use_mock_terminal:
# Use mock-provided target simulation (blocks until simulated target injected)
try:
GRIFO_M_PBIT_mock.mock_tgt_gen_alone(interface)
except Exception:
logging.debug('Mock target simulation failed to start', exc_info=True)
# Then run the standard tgt detection routine to perform checks
if tgt_gen_alone(interface) is False:
return
else:
# Production: perform real target acquisition
if tgt_gen_alone(interface) is False:
return
except Exception:
# If run_on_target is not defined for some reason, skip target acquisition
pass
for repetition in range(runs_total):
info = f'Repetition {1 + repetition} of {runs_total}'
logging.info(info)
report.open_session(info)
# Update GUI for new run
if gui_monitor:
gui_monitor.update_status(run_current=repetition + 1, run_total=runs_total, power_on=True)
gui_monitor.log_event('info', f'Starting repetition {repetition + 1}/{runs_total}')
# Statistics for this run
# Record run start timestamp for reporting
run_start_time = datetime.now()
run_start_perf = time.perf_counter()
run_stats = {
'repetition': repetition + 1,
'start_time': run_start_time.isoformat(),
'pbit_time': 0,
'bit_available': False,
'target_simulated': None,
'b6_total': 0,
'b6_pass': 0,
'b6_fail': 0,
'b6_known_fail': 0,
'b8_checked': 0,
'b8_pass': 0,
'b8_fail': 0,
'failures': [],
'known_failures': [],
'success': True,
# Serial statistics
'serial_total': 0,
'serial_errors': 0,
'serial_fatal': 0,
'serial_recycles': 0,
'serial_details': [], # List of notable serial events
}
# Attach scenario name: simulation scenario or production indicator
if use_mock_terminal:
# Simulation mode: use configured scenario from mock
try:
if hasattr(GRIFO_M_PBIT_mock, '_scenario_list') and GRIFO_M_PBIT_mock._scenario_list:
idx = repetition if repetition < len(GRIFO_M_PBIT_mock._scenario_list) else 0
run_stats['scenario'] = GRIFO_M_PBIT_mock._scenario_list[idx]
else:
run_stats['scenario'] = None
except Exception:
run_stats['scenario'] = None
else:
# Production mode: indicate real hardware execution
run_stats['scenario'] = 'Production Run'
test_statistics['total_runs'] += 1
# Reset serial statistics for this run
terminal.reset_serial_statistics()
# Attach pre-run target info (if mock simulated target was injected)
# FIXED: Retrieve for ALL runs, not just first one
try:
t_info = None
if use_mock_terminal:
t_info = getattr(interface, '_last_simulated_target', None)
if t_info is None:
t_info = getattr(interface, '_last_target', None)
run_stats['target_simulated'] = t_info
except Exception:
run_stats['target_simulated'] = None
report.add_comment("The test operator is required to switch off the target and wait 3 seconds.")
power_grifo_off(wait_after=4, wait_before=1)
# Update GUI - power off
if gui_monitor:
gui_monitor.update_status(power_on=False, pbit_time=0.0)
gui_monitor.log_event('info', 'Power OFF - waiting before=1s, after=4s')
report.add_comment("The test operator is required to switch on the target.")
power_grifo_on(wait_after=0.100)
# Update GUI - power on
if gui_monitor:
gui_monitor.update_status(power_on=True)
gui_monitor.log_event('info', 'Power ON - waiting for BIT...')
remaining_time = PBIT_SEC_TIME
pbit_start_time = time.perf_counter()
# Initialize 1553 communication loss detection variables
# These must persist across while loop iterations to track bus health
msg_cnt = 0
mil1553_error_flag = COMM_LOSS_THRESHOLD
setValue(theGrifo1553, 100, "A1_MsgRdrSettingsAndParameters", "settings_RDROperationalSettings_rdr_symbology_intensity", commitChanges=True)
while remaining_time > 0:
start = time.perf_counter()
ret_rep_is_avail = False
for i in range(100):
cnt = interface.getSingleMessageReceivedSz("B6_MsgRdrSettingsAndParametersTellback")
value = interface.getMessageFieldValue(
"B6_MsgRdrSettingsAndParametersTellback",
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_bit_report_available"
)
# Robust availability check: accept booleans, string true/1/yes, or numeric 1
ret_rep_is_avail = False
try:
if isinstance(value, bool):
ret_rep_is_avail = value
elif value is None:
ret_rep_is_avail = False
else:
ret_rep_is_avail = str(value).strip().lower() in ("true", "1", "yes")
except Exception:
ret_rep_is_avail = False
if ret_rep_is_avail is True:
break
# Monitor 1553 bus health: detect if message counter is stalled
if isinstance(cnt, (int, float)) and cnt >= 0:
if cnt > msg_cnt:
# Message counter increased -> bus alive, reset watchdog
mil1553_error_flag = COMM_LOSS_THRESHOLD
else:
# Message counter stalled -> decrement watchdog
mil1553_error_flag -= 1
msg_cnt = int(cnt)
else:
# No valid counter value -> decrement watchdog conservatively
mil1553_error_flag -= 1
# Check if communication is lost (counter stalled for too long)
if mil1553_error_flag == 0:
logging.critical(f"1553 bus communication lost - message counter stalled at {msg_cnt}")
report.add_comment(f"CRITICAL: 1553 bus communication lost (counter stalled at {msg_cnt} messages)", False)
if gui_monitor:
gui_monitor.log_event('error', '1553 communication LOST - aborting test')
return False
time.sleep(0.05)
if ret_rep_is_avail is True:
time.sleep(0.02)
run_stats['bit_available'] = True
run_stats['pbit_time'] = time.perf_counter() - pbit_start_time
report.add_comment(f"BIT report available after {run_stats['pbit_time']:.1f}s")
# Update GUI - BIT available
if gui_monitor:
gui_monitor.update_status(pbit_available=True, pbit_time=run_stats['pbit_time'])
gui_monitor.log_event('success', f"BIT available after {run_stats['pbit_time']:.1f}s")
# ===== PHASE 1: Verify ALL B6 LRU Status Fields =====
b6_lru_fields = bit_fields_categories['B6_LRU_Status']
b6_failures = []
b6_known_failures = []
radar_fail_status_field = "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"
# Check all B6 fields EXCEPT radar_fail_status (check it last)
for f in b6_lru_fields:
if f == radar_fail_status_field:
continue # Skip radar_fail_status, check it after all others
run_stats['b6_total'] += 1
ret, err = check(theGrifo1553, "false", "B6_MsgRdrSettingsAndParametersTellback", f)
# Update GUI with B6 progress
if gui_monitor and run_stats['b6_total'] % 3 == 0: # Update every 3 checks
gui_monitor.update_statistics(
b6_total=run_stats['b6_total'],
b6_pass=run_stats['b6_pass'],
b6_fail=run_stats['b6_fail'],
b6_known=run_stats['b6_known_fail']
)
if ret:
run_stats['b6_pass'] += 1
else:
if f in KNOWN_FAILURES:
# Known failure: annotate but don't trigger drill-down
run_stats['b6_known_fail'] += 1
b6_known_failures.append((f, err))
logging.warning(f"Known failure (ignored): {f}")
else:
# Real failure: needs investigation
run_stats['b6_fail'] += 1
b6_failures.append((f, err))
test_return = False
run_stats['success'] = False
# ===== SPECIAL CHECK: radar_fail_status (aggregate flag) =====
# This flag aggregates all component statuses. Logic:
# - If ONLY known failures exist (e.g., pedestal), ignore it
# - If ANY real failures exist, it's a valid indicator
run_stats['b6_total'] += 1
ret_radar_fail, err_radar_fail = check(theGrifo1553, "false", "B6_MsgRdrSettingsAndParametersTellback", radar_fail_status_field)
if ret_radar_fail:
run_stats['b6_pass'] += 1
else:
# radar_fail_status is TRUE (indicating failure)
if len(b6_failures) > 0:
# Real failures exist -> radar_fail_status is a valid failure indicator
run_stats['b6_fail'] += 1
b6_failures.append((radar_fail_status_field, err_radar_fail))
test_return = False
run_stats['success'] = False
logging.warning(f"Radar fail status: REAL failure (caused by: {', '.join([f.split('_')[-1] for f, _ in b6_failures[:3]])})")
else:
# Only known failures exist -> radar_fail_status is caused by known issues
run_stats['b6_known_fail'] += 1
b6_known_failures.append((radar_fail_status_field, err_radar_fail))
logging.warning(f"Radar fail status: Known failure (caused only by pedestal)")
# Log B6 summary to console (not as PDF step - will be in final tables)
logging.info(f"[Run {repetition+1}] B6 LRU Status: {run_stats['b6_total']} total, "
f"{run_stats['b6_pass']} pass, {run_stats['b6_fail']} fail, "
f"{run_stats['b6_known_fail']} known")
# Update GUI with final B6 stats
if gui_monitor:
gui_monitor.update_statistics(
b6_total=run_stats['b6_total'],
b6_pass=run_stats['b6_pass'],
b6_fail=run_stats['b6_fail'],
b6_known=run_stats['b6_known_fail']
)
if run_stats['b6_fail'] > 0:
gui_monitor.log_event('warning', f'B6: {run_stats["b6_fail"]} real failures detected')
else:
gui_monitor.log_event('success', 'B6: All checks passed')
# Store failures for final aggregate report (not as steps)
if b6_known_failures:
run_stats['known_failures'].extend(b6_known_failures)
logging.info(f" Known failures (HW setup): {len(b6_known_failures)}")
if b6_failures:
run_stats['failures'].extend(b6_failures)
fail_summary = ', '.join([f.split('_')[-1] for f, _ in b6_failures[:3]])
logging.warning(f" Real failures: {fail_summary}{'...' if len(b6_failures) > 3 else ''}")
# ===== PHASE 2: Drill-down B8 only if REAL failures in B6 =====
# Check if B8 drill-down is needed:
# - Always if there are real B6 failures
# - OR if FORCE_B8_DRILL_DOWN=True and there are known failures (target2 behavior)
should_drill_down = b6_failures or (FORCE_B8_DRILL_DOWN and b6_known_failures)
if should_drill_down:
if FORCE_B8_DRILL_DOWN and not b6_failures:
report.add_comment(f"\nForced B8 drill-down (FORCE_B8_DRILL_DOWN=True): Verifying all {len(bit_fields) - 12} B8 diagnostic fields...")
logging.info("[FORCE_B8_DRILL_DOWN] Performing B8 drill-down despite only known failures")
else:
report.add_comment(f"\nDrill-down: Verifying all {len(bit_fields) - 12} B8 diagnostic fields...")
b8_fields = bit_fields[12:] # All B8 fields
b8_failures = []
for category, fields in list(bit_fields_categories.items())[1:]: # Skip B6
category_fail = 0
category_pass = 0
for f in fields:
run_stats['b8_checked'] += 1
ret, err = check(theGrifo1553, "false", "B8_MsgBitReport", f)
# Update GUI with B8 progress
if gui_monitor and run_stats['b8_checked'] % 10 == 0: # Update every 10 checks
gui_monitor.update_statistics(
b8_checked=run_stats['b8_checked'],
b8_pass=run_stats['b8_pass'],
b8_fail=run_stats['b8_fail']
)
if ret:
category_pass += 1
run_stats['b8_pass'] += 1
else:
category_fail += 1
run_stats['b8_fail'] += 1
b8_failures.append((category, f, err))
test_return = False
if category_fail > 0:
logging.warning(f"{category}: {category_fail}/{len(fields)} failures")
# Log B8 summary to console (not as PDF step - will be in final tables)
logging.info(f"[Run {repetition+1}] B8 Diagnostics: {run_stats['b8_checked']} checked, "
f"{run_stats['b8_pass']} pass, {run_stats['b8_fail']} fail")
if b8_failures:
# Store failures for final aggregate report
# Details will be shown in dedicated PDF section, not as step logs
for cat, field, err in b8_failures:
run_stats['failures'].append((field, err))
# Log to console for immediate feedback
fail_by_cat = {}
for cat, field, err in b8_failures:
if cat not in fail_by_cat:
fail_by_cat[cat] = []
fail_by_cat[cat].append(field.split('_')[-1])
for cat, fails in fail_by_cat.items():
logging.warning(f" {cat}: {len(fails)} failures - {', '.join(fails[:3])}{'...' if len(fails) > 3 else ''}")
else:
logging.info(f"[Run {repetition+1}] All B6 LRU Status PASS (no B8 drill-down needed)")
# Finalize run timing
try:
run_stats['end_time'] = datetime.now().isoformat()
run_stats['run_duration'] = time.perf_counter() - run_start_perf
except Exception:
run_stats['end_time'] = None
run_stats['run_duration'] = None
# Run statistics
test_statistics['repetitions'].append(run_stats)
if run_stats['success']:
test_statistics['successful_runs'] += 1
else:
test_statistics['failed_runs'] += 1
time_passed = time.perf_counter() - start
remaining_time -= time_passed
if ret_rep_is_avail is True:
remaining_time = 0
logging.info(f'{remaining_time:.1f}s remaining ...')
# Collect serial statistics for this run before closing session
serial_stats = terminal.get_serial_statistics()
run_stats['serial_total'] = serial_stats['total_messages']
run_stats['serial_errors'] = serial_stats['error_messages']
run_stats['serial_fatal'] = serial_stats['fatal_messages']
run_stats['serial_recycles'] = serial_stats['recycle_count']
# Store serial details for final aggregate report
# Details will be shown in dedicated PDF section, not as step logs
if serial_stats['recycle_count'] > 0:
for timestamp, message in serial_stats['recycle_details']:
run_stats['serial_details'].append({'type': 'RECYCLE', 'timestamp': timestamp, 'message': message})
if serial_stats['error_messages'] > 0:
for timestamp, message in serial_stats['error_details'][:5]: # Limit to first 5
run_stats['serial_details'].append({'type': 'ERROR', 'timestamp': timestamp, 'message': message})
if serial_stats['fatal_messages'] > 0:
for timestamp, message in serial_stats['fatal_details'][:5]: # Limit to first 5
run_stats['serial_details'].append({'type': 'FATAL', 'timestamp': timestamp, 'message': message})
# Log summary to console for immediate feedback during test execution
logging.info(f"[Run {repetition+1}] Serial: {serial_stats['total_messages']} total, "
f"{serial_stats['error_messages']} errors, {serial_stats['fatal_messages']} fatal, "
f"{serial_stats['recycle_count']} recycles")
# Update GUI with serial statistics
if gui_monitor:
gui_monitor.update_statistics(
serial_total=serial_stats['total_messages'],
serial_errors=serial_stats['error_messages'],
serial_fatal=serial_stats['fatal_messages'],
serial_recycles=serial_stats['recycle_count']
)
if serial_stats['recycle_count'] > 0:
gui_monitor.log_event('warning', f"Serial: {serial_stats['recycle_count']} RECYCLE events")
if serial_stats['fatal_messages'] > 0:
gui_monitor.log_event('error', f"Serial: {serial_stats['fatal_messages']} fatal messages")
# ===== TARGET DETECTION TEST (per ogni ripetizione) =====
# Only run target test if run_on_target is True
target_found = False
target_test_time = 0.0
target_test_done = False
if run_on_target:
target_test_done = True
try:
report.add_comment("Target Detection Test")
logging.info(f"[Run {repetition+1}] Starting target detection test (timeout: {TARGET_DETECTION_TIMEOUT_SEC}s)")
if gui_monitor:
gui_monitor.log_event('info', f"Run {repetition+1}: Target detection test starting...")
# Prepare radar for operational target acquisition (best-effort)
try:
logging.info("Setting radar to operational mode before target test (best-effort)")
# Execute auxiliary serial sequence to prepare radar (if configured and enabled)
if send_serial_sequence and ENABLE_AUX_SERIAL:
try:
logging.info('Executing auxiliary serial sequence to prepare radar')
send_serial_sequence()
except Exception as e:
logging.warning(f"Aux serial sequence failed: {e}")
# Use theGrifo1553 wrapper so setValue/check functions work with mocks
set_radar_operational(theGrifo1553, range_nm=40.0, intensity=127, master_mode='RWS', gm_submode=0)
except Exception as e:
logging.warning(f"set_radar_operational() failed: {e}")
# Safety check: verify STBY and SILENCE via tellback before acquiring target
try:
# verify_stby_and_silence expects the TestCommonInterface wrapper
stby_ok, silence_ok, verify_details = verify_stby_and_silence(
theGrifo1553,
timeout_sec=TELLBACK_VERIFY_TIMEOUT_SEC,
step=TELLBACK_VERIFY_STEP_SEC,
)
run_stats['stby_verified'] = bool(stby_ok)
run_stats['silence_verified'] = bool(silence_ok)
run_stats['stby_silence_verify_details'] = verify_details
if not (stby_ok and silence_ok):
logging.warning(f"STBY/SILENCE verification failed: STBY={stby_ok}, SILENCE={silence_ok} - notifying operator and continuing")
report.add_comment(f"WARNING: STBY/SILENCE verification failed (STBY={stby_ok}, SILENCE={silence_ok}) - operator notified, continuing with acquisition")
if gui_monitor:
gui_monitor.log_event('warning', f"Run {repetition+1}: STBY/SILENCE verification failed - operator notified, continuing")
# Before target acquisition, remove STBY so radar can actively detect target
try:
logging.info("Removing STBY before target acquisition to enable radar TX")
try:
setValue(theGrifo1553, 0, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True)
except Exception:
# Fallback: call raw interface if wrapper not available
try:
theGrifo1553.set(0, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby")
except Exception as e:
logging.warning(f"Failed to remove STBY: {e}")
except Exception as e:
logging.warning(f"Failed to remove STBY: {e}")
# Allow radar time to leave standby
try:
time.sleep(STBY_POST_UNSET_DELAY_SEC)
except Exception:
pass
# Verify radar is not in STBY anymore (best-effort)
try:
not_stby_ok, not_stby_err = check(theGrifo1553, "STBY_OFF", "B7_MsgRdrStatusTellback", "rdr_mode_tellback_RdrStatusTellback_stby_tellback", timeout=TELLBACK_VERIFY_TIMEOUT_SEC, step=TELLBACK_VERIFY_STEP_SEC)
if not not_stby_ok:
logging.warning(f"Radar did not exit STBY as expected: {not_stby_err}")
if gui_monitor:
gui_monitor.log_event('warning', f"Run {repetition+1}: Radar did not exit STBY as expected")
except Exception as e:
logging.warning(f"Exception while verifying radar left STBY: {e}")
# Proceed with target acquisition regardless of verification outcome
start_tgt = time.time()
target_found = tgt_gen(interface, TARGET_DETECTION_TIMEOUT_SEC)
target_test_time = time.time() - start_tgt
except Exception as e:
logging.warning(f"Exception during STBY/SILENCE verification: {e} - notifying operator and continuing")
report.add_comment(f"WARNING: Exception during STBY/SILENCE verification: {e} - operator notified, continuing")
run_stats['stby_verified'] = False
run_stats['silence_verified'] = False
run_stats['stby_silence_verify_details'] = {'exception': str(e)}
if gui_monitor:
gui_monitor.log_event('warning', f"Run {repetition+1}: Exception during STBY/SILENCE verification - operator notified, continuing")
# Proceed with acquisition despite verification exception
start_tgt = time.time()
target_found = tgt_gen(interface, TARGET_DETECTION_TIMEOUT_SEC)
target_test_time = time.time() - start_tgt
# Record target test result in statistics
run_stats['target_detected'] = target_found
run_stats['target_test_time'] = target_test_time
# After target test, re-enable STBY and verify it returned
if run_on_target:
try:
logging.info("Re-enabling STBY after target test")
try:
setValue(theGrifo1553, 1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby", commitChanges=True)
except Exception:
theGrifo1553.set(1, "A2_MsgRdrOperationCommand", "rdr_mode_command_RdrModeCommandWord_stby")
except Exception as e:
logging.warning(f"Failed to re-enable STBY: {e}")
# Allow radar time to re-enter STBY
try:
time.sleep(TELLBACK_POST_SET_DELAY_SEC)
except Exception:
pass
# Verify STBY is active again
try:
stby_back_ok, stby_back_err = check(theGrifo1553, "STBY_ON", "B7_MsgRdrStatusTellback", "rdr_mode_tellback_RdrStatusTellback_stby_tellback", timeout=TELLBACK_VERIFY_TIMEOUT_SEC, step=TELLBACK_VERIFY_STEP_SEC)
run_stats['stby_restored'] = bool(stby_back_ok)
if not stby_back_ok:
logging.warning(f"STBY not restored after target test: {stby_back_err}")
if gui_monitor:
gui_monitor.log_event('warning', f"Run {repetition+1}: STBY not restored after target test")
except Exception as e:
logging.warning(f"Exception verifying STBY restoration: {e}")
run_stats['stby_restored'] = False
if target_found:
report.add_comment(f"Target detected successfully in {target_test_time:.2f}s")
logging.info(f"[Run {repetition+1}] Target DETECTED in {target_test_time:.2f}s")
if gui_monitor:
gui_monitor.log_event('success', f"Run {repetition+1}: Target detected in {target_test_time:.1f}s")
else:
report.add_comment(f"Target NOT detected (timeout after {target_test_time:.2f}s)")
logging.warning(f"[Run {repetition+1}] Target NOT DETECTED (timeout: {target_test_time:.2f}s)")
if gui_monitor:
gui_monitor.log_event('error', f"Run {repetition+1}: Target NOT detected (timeout)")
except Exception as e:
logging.error(f"[Run {repetition+1}] Target test error: {e}")
run_stats['target_detected'] = False
run_stats['target_test_time'] = 0.0
else:
# Target test disabled - skip it
logging.info(f"[Run {repetition+1}] Target test SKIPPED (run_on_target=False)")
run_stats['target_detected'] = None
run_stats['target_test_time'] = None
# Push per-run summary to GUI runs table
if gui_monitor:
# Determine target test status for GUI display
if target_test_done:
target_done_text = 'YES'
target_result_text = 'PASS' if target_found else 'FAIL'
else:
target_done_text = 'NO'
target_result_text = 'N/A'
fail_summary = ''
if run_stats.get('failures'):
try:
fail_summary = ', '.join([f.split('_')[-1] for f, _ in run_stats['failures'][:3]])
except Exception:
fail_summary = str(run_stats.get('failures'))
# Attach extra metadata for GUI detail dialog
try:
t_info = getattr(interface, '_last_target', None)
except Exception:
t_info = None
target_distance_m = None
target_distance_nm = None
target_found_iter = None
if t_info and isinstance(t_info, dict):
try:
target_distance_m = float(t_info.get('distance')) if t_info.get('distance') is not None else None
# Nautical mile conversion (1 NM = 1852 meters)
if target_distance_m is not None:
target_distance_nm = target_distance_m / 1852.0
target_found_iter = int(t_info.get('found_at_iter')) if t_info.get('found_at_iter') is not None else None
except Exception:
target_distance_m = None
target_distance_nm = None
target_found_iter = None
gui_monitor.update_run(
run=repetition+1,
result='PASS' if run_stats.get('success') else 'FAIL', # Overall PBIT result
pbit=run_stats.get('pbit_time', 0.0),
b6_fail=run_stats.get('b6_fail', 0),
b8_fail=run_stats.get('b8_fail', 0),
known=run_stats.get('b6_known_fail', 0),
fail_summary=fail_summary,
serial_events=len(run_stats.get('serial_details', [])),
target_done=target_done_text,
target_result=target_result_text,
# Extra detailed fields for the run detail dialog
start_time=run_stats.get('start_time'),
end_time=run_stats.get('end_time'),
run_duration=run_stats.get('run_duration'),
failures=run_stats.get('failures', []),
serial_details=run_stats.get('serial_details', []),
target_detected=run_stats.get('target_detected'),
target_test_time=run_stats.get('target_test_time'),
stby_verified=run_stats.get('stby_verified'),
silence_verified=run_stats.get('silence_verified'),
stby_silence_verify_details=run_stats.get('stby_silence_verify_details'),
target_distance_m=target_distance_m,
target_distance_nm=target_distance_nm,
target_found_iter=target_found_iter
)
report.close_session()
if interruptRequest is True:
report.add_comment("Test interrupted by user (Ctrl-C)")
break
report.add_comment("Repetitions terminated.")
# ===== FINAL STATISTICS REPORT =====
custom_statistics = generate_final_statistics_report(report, test_statistics)
# ===== EXPORT TO CSV (if enabled) =====
if EXPORT_STATISTICS_CSV and custom_statistics:
# Generate CSV filename with timestamp (matching log file naming)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_base_name = f"{report.title()}_{timestamp}"
# Use same folder as PDF report for all test outputs
pdf_folder = report.get_pdf_folder()
csv_path = export_statistics_to_csv(custom_statistics, csv_base_name, pdf_folder)
# Also write JSON statistics file alongside CSV for structured consumption
try:
if csv_path and os.path.isdir(pdf_folder):
json_filename = f"{csv_base_name}_statistics.json"
json_path = os.path.join(pdf_folder, json_filename)
with open(json_path, 'w', encoding='utf-8') as jf:
json.dump(custom_statistics, jf, ensure_ascii=False, indent=2)
logging.info(f"Statistics exported successfully to JSON: {json_path}")
except Exception as e:
logging.error(f"Failed to export statistics to JSON: {e}")
logging.error(traceback.format_exc())
############ END STEPS ############
#report.open_session('Post Conditions')
power_grifo_off()
#report.close_session()
if terminal is not None:
terminal.disconnect()
return test_return
except Exception as e:
report.add_comment(f"Test terminated unexpectedly :{e}")
return False
finally:
report.generate_pdf()
# Notify GUI (if present) that test finished and where outputs are stored
if gui_monitor:
try:
out_folder = report.get_pdf_folder()
except Exception:
out_folder = None
if out_folder:
try:
gui_monitor.show_results(out_folder)
logging.info("Test completed - notifying GUI of results")
# If running non-interactively (stdin is not a TTY), automatically
# request the GUI to shutdown to allow the process to exit cleanly.
try:
interactive = sys.stdin.isatty()
except Exception:
interactive = False
if not interactive:
logging.info("Non-interactive session detected - auto-closing GUI")
try:
# First request a GC to run on the GUI thread so that
# any ImageTk/PhotoImage destructors that need to call
# into Tcl are executed on the correct thread.
try:
if hasattr(gui_monitor, 'update_queue'):
gui_monitor.update_queue.put(('gc_collect', {}))
# give GUI thread a moment to process GC
time.sleep(0.2)
except Exception:
pass
# Use the public stop() method which enqueues shutdown and joins
if hasattr(gui_monitor, 'stop'):
gui_monitor.stop()
else:
# Fallback: request shutdown via queue
if hasattr(gui_monitor, 'update_queue'):
gui_monitor.update_queue.put(('shutdown', {}))
except Exception:
pass
else:
# Interactive session: wait for user to close GUI
try:
if hasattr(gui_monitor, 'thread') and gui_monitor.thread:
gui_monitor.thread.join()
except Exception:
pass
except Exception as e:
logging.error(f"Error waiting for GUI: {e}")
#-- ---------------------------------------------------------------
if __name__ == '__main__':
global gui_monitor
signal.signal(signal.SIGINT, signal_handler)
test_proc()
# CRITICAL: For non-interactive automated runs, use os._exit(0) to avoid
# Python's atexit cleanup phase which can trigger Tcl_AsyncDelete warnings
# when Python destroys objects containing Tk references after GUI thread stopped.
# This is safe because at this point:
# - Test is complete
# - PDF/CSV/JSON reports are written
# - GUI has been closed via stop()
# - All essential cleanup has been done
try:
import sys
interactive = sys.stdin.isatty()
except Exception:
interactive = False
if not interactive:
# Non-interactive: Use os._exit to skip atexit cleanup
import os
import time
time.sleep(0.1) # Brief pause for any final file flushes
os._exit(0) # Exit immediately without Python cleanup
# else: Interactive session, allow normal Python cleanup