1285 lines
67 KiB
Python
1285 lines
67 KiB
Python
"""
|
||
GRIFO_M_PBIT.py - Automated Power-On BIT Test for GRIFO-F/TH Radar
|
||
|
||
This script performs comprehensive Built-In Test (BIT) verification with power cycling:
|
||
- Executes configurable number of test repetitions (default: 10)
|
||
- Power cycles radar between runs to simulate cold-start conditions
|
||
- Monitors B6 LRU (Line Replaceable Unit) status fields
|
||
- Performs detailed B8 diagnostic drill-down on real failures
|
||
- Generates comprehensive statistics report with timing analysis
|
||
|
||
Test Flow:
|
||
1. Power off radar (wait_before=1s, wait_after=4s for stabilization)
|
||
2. Power on radar (wait_after=100ms) and wait for initialization
|
||
3. Execute BIT and wait for completion (timeout: 182s)
|
||
4. Verify all 12 B6 LRU status fields
|
||
5. If real failures detected, drill-down into 185 B8 diagnostic fields
|
||
6. Track statistics (timing, pass/fail counts, failure details)
|
||
7. Repeat for configured number of cycles
|
||
8. Generate final comprehensive report with aggregate statistics
|
||
|
||
Configuration Options:
|
||
NUMBER_OF_REPETITIONS: Number of test cycles to execute (default: 10)
|
||
PBIT_SEC_TIME: BIT completion timeout in seconds (default: 182s, matches target2)
|
||
EXPORT_STATISTICS_CSV: Export statistics to CSV file (default: True)
|
||
FORCE_B8_DRILL_DOWN: If True, always perform B8 drill-down even when only
|
||
known failures are detected (matches target2 behavior
|
||
for complete SW requirement verification). Default: False
|
||
KNOWN_FAILURES: List of expected failures due to HW setup limitations
|
||
|
||
Power Cycling Timing (aligned with target2):
|
||
- Power OFF: wait_before=1s (stabilization before action), wait_after=4s (settle time)
|
||
- Power ON: wait_after=100ms (initialization delay)
|
||
|
||
Author: Test Automation Team
|
||
Date: 2026-01-29
|
||
Last Updated: 2026-02-02 (aligned with target2 timing and behavior)
|
||
"""
|
||
|
||
import __init__
|
||
import signal
|
||
import time,sys,os
|
||
import logging
|
||
import csv
|
||
import json
|
||
from leo_grifo_common import *
|
||
from test_common_function import *
|
||
from leo_grifo_test_report import testReport
|
||
from leo_grifo_1553 import theGrifo1553
|
||
#import leo_grifo_serial_maintnance
|
||
from logger import logger_setup
|
||
import leo_grifo_terminal
|
||
import pdb
|
||
import traceback
|
||
|
||
NUMBER_OF_REPETITIONS = 10 # Total test cycles to execute (4 perfect runs = 40%)
|
||
PBIT_SEC_TIME = 182 # BIT completion timeout in seconds (target2 uses 182s)
|
||
EXPORT_STATISTICS_CSV = True # Export statistics to CSV file for Excel/external analysis
|
||
|
||
# ====================
|
||
# B8 DRILL-DOWN CONFIGURATION
|
||
# ====================
|
||
# FORCE_B8_DRILL_DOWN: If True, always perform B8 drill-down even when only known failures
|
||
# are detected. This matches target2 behavior where B8 is checked unconditionally to verify
|
||
# SW requirements and obtain complete step fail statistics.
|
||
#
|
||
# Default: False (optimized behavior - B8 only on real failures)
|
||
# Set to True: Replicate target2 behavior for complete SW requirement verification
|
||
FORCE_B8_DRILL_DOWN = True
|
||
|
||
# ====================
|
||
# KNOWN FAILURES CONFIGURATION
|
||
# ====================
|
||
# List of field names that are expected to fail due to HW test setup limitations.
|
||
# These failures are tracked but do not trigger B8 drill-down or test failure.
|
||
#
|
||
# Use case: When test HW setup lacks physical components (e.g., pedestal unit),
|
||
# certain status checks will always fail. Adding them here prevents false negatives.
|
||
#
|
||
# Format: Full field name from bit_fields tuple
|
||
# Note: Known failures are reported separately in statistics but don't affect test verdict
|
||
# Note: radar_fail_status is NOT in this list - it's an aggregate flag checked contextually
|
||
KNOWN_FAILURES = [
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status",
|
||
# Add more known HW setup limitations here as needed
|
||
]
|
||
|
||
interruptRequest = False # Global flag for graceful Ctrl-C handling
|
||
|
||
# ====================
|
||
# TEST STATISTICS TRACKING
|
||
# ====================
|
||
# Global dictionary to track statistics across all test repetitions.
|
||
# Populated during test execution and used to generate final comprehensive report.
|
||
#
|
||
# Structure:
|
||
# repetitions: List of dicts, one per run, containing:
|
||
# - repetition: Run number (1-based)
|
||
# - pbit_time: BIT completion time in seconds
|
||
# - bit_available: Boolean, True if BIT completed
|
||
# - success: Boolean, overall run result (pass/fail)
|
||
# - b6_total/pass/fail/known_fail: B6 LRU status check counts
|
||
# - b8_checked/pass/fail: B8 diagnostic check counts
|
||
# - failures: List of (field_name, value) tuples for real failures
|
||
# - known_failures: List of (field_name, value) tuples for expected failures
|
||
# total_runs: Counter for completed test runs
|
||
# successful_runs: Counter for runs with no real failures
|
||
# failed_runs: Counter for runs with real failures detected
|
||
test_statistics = {
|
||
'repetitions': [], # List of per-run statistics dictionaries
|
||
'total_runs': 0, # Total number of completed runs
|
||
'successful_runs': 0, # Number of runs that passed (only known failures allowed)
|
||
'failed_runs': 0, # Number of runs with real failures detected
|
||
}
|
||
|
||
def signal_handler(sig, frame):
|
||
"""Handle Ctrl-C signal for graceful test termination."""
|
||
global interruptRequest
|
||
logging.info("Ctrl-C detected, exiting gracefully...")
|
||
interruptRequest = True
|
||
|
||
def generate_final_statistics_report(report, stats):
|
||
"""
|
||
Generate comprehensive final statistics report with professional PDF formatting.
|
||
|
||
Instead of using add_comment() which registers text as steps, this function
|
||
prepares structured data and passes it to the PDF generator for rendering
|
||
as dedicated sections with professional tables.
|
||
|
||
Produces professional test summary suitable for formal documentation and presentations.
|
||
Includes aggregate statistics, timing analysis, failure categorization, and test verdict.
|
||
|
||
Args:
|
||
report: testReport object for PDF generation
|
||
stats: test_statistics dictionary containing all run data
|
||
|
||
Report Sections (rendered as dedicated PDF chapters):
|
||
1. Per-Run Summary: Table with 1553 + Serial stats for each run
|
||
2. Global Aggregate: Combined statistics from all runs
|
||
3. Timing Analysis: PBIT performance metrics
|
||
4. Known Failures: HW setup limitations tracking
|
||
|
||
Returns:
|
||
None (data is passed to report via set_custom_statistics)
|
||
"""
|
||
|
||
# Prepare structured data for PDF generation instead of ASCII art
|
||
|
||
# Calculate aggregate statistics
|
||
total_b6_checks = sum(r['b6_total'] for r in stats['repetitions'])
|
||
total_b6_pass = sum(r['b6_pass'] for r in stats['repetitions'])
|
||
total_b6_fail = sum(r['b6_fail'] for r in stats['repetitions'])
|
||
total_b6_known = sum(r['b6_known_fail'] for r in stats['repetitions'])
|
||
|
||
total_b8_checks = sum(r['b8_checked'] for r in stats['repetitions'])
|
||
total_b8_pass = sum(r['b8_pass'] for r in stats['repetitions'])
|
||
total_b8_fail = sum(r['b8_fail'] for r in stats['repetitions'])
|
||
|
||
total_serial_msgs = sum(r.get('serial_total', 0) for r in stats['repetitions'])
|
||
total_serial_errors = sum(r.get('serial_errors', 0) for r in stats['repetitions'])
|
||
total_serial_fatal = sum(r.get('serial_fatal', 0) for r in stats['repetitions'])
|
||
total_serial_recycles = sum(r.get('serial_recycles', 0) for r in stats['repetitions'])
|
||
|
||
# Calculate timing statistics
|
||
pbit_times = [r['pbit_time'] for r in stats['repetitions'] if r['bit_available']]
|
||
if pbit_times:
|
||
avg_pbit = sum(pbit_times) / len(pbit_times)
|
||
min_pbit = min(pbit_times)
|
||
max_pbit = max(pbit_times)
|
||
variance = sum((t - avg_pbit) ** 2 for t in pbit_times) / len(pbit_times)
|
||
std_dev = variance ** 0.5
|
||
else:
|
||
avg_pbit = min_pbit = max_pbit = std_dev = None
|
||
|
||
# Prepare structured data dictionary for PDF rendering
|
||
custom_statistics = {
|
||
'repetitions': stats['repetitions'], # Per-run data with all metrics
|
||
'aggregate': {
|
||
# Overall test summary
|
||
'total_runs': stats['total_runs'],
|
||
'successful_runs': stats['successful_runs'],
|
||
'failed_runs': stats['failed_runs'],
|
||
|
||
# 1553 Bus statistics
|
||
'total_b6_checks': total_b6_checks,
|
||
'total_b6_pass': total_b6_pass,
|
||
'total_b6_fail': total_b6_fail,
|
||
'total_b6_known': total_b6_known,
|
||
'total_b8_checks': total_b8_checks,
|
||
'total_b8_pass': total_b8_pass,
|
||
'total_b8_fail': total_b8_fail,
|
||
|
||
# Serial communication statistics
|
||
'total_serial_msgs': total_serial_msgs,
|
||
'total_serial_errors': total_serial_errors,
|
||
'total_serial_fatal': total_serial_fatal,
|
||
'total_serial_recycles': total_serial_recycles,
|
||
|
||
# Timing analysis
|
||
'avg_pbit_time': avg_pbit,
|
||
'min_pbit_time': min_pbit,
|
||
'max_pbit_time': max_pbit,
|
||
'std_dev_pbit': std_dev,
|
||
}
|
||
}
|
||
|
||
# Pass structured data to report for professional PDF rendering
|
||
# This will generate dedicated chapters with native PDF tables
|
||
# instead of mixing ASCII art with step execution logs
|
||
report.set_custom_statistics(custom_statistics)
|
||
|
||
# Log summary to console for immediate feedback
|
||
logging.info("="*90)
|
||
logging.info(" FINAL TEST STATISTICS SUMMARY")
|
||
logging.info("="*90)
|
||
logging.info(f"Total Runs: {stats['total_runs']}")
|
||
logging.info(f"Successful: {stats['successful_runs']} ({stats['successful_runs']/stats['total_runs']*100:.1f}%)")
|
||
logging.info(f"Failed: {stats['failed_runs']} ({stats['failed_runs']/stats['total_runs']*100:.1f}%)")
|
||
logging.info(f"B6 Checks: {total_b6_checks} (Pass: {total_b6_pass}, Fail: {total_b6_fail}, Known: {total_b6_known})")
|
||
logging.info(f"B8 Checks: {total_b8_checks} (Pass: {total_b8_pass}, Fail: {total_b8_fail})")
|
||
logging.info(f"Serial: {total_serial_msgs} messages ({total_serial_errors} errors, {total_serial_fatal} fatal, {total_serial_recycles} recycles)")
|
||
if avg_pbit is not None:
|
||
logging.info(f"PBIT Timing: avg={avg_pbit:.2f}s, min={min_pbit:.2f}s, max={max_pbit:.2f}s, σ={std_dev:.2f}s")
|
||
logging.info("="*90)
|
||
logging.info("Detailed statistics will be available in the PDF report")
|
||
|
||
# Return custom_statistics for optional CSV export
|
||
return custom_statistics
|
||
|
||
|
||
def export_statistics_to_csv(custom_statistics, test_name, output_folder):
|
||
"""
|
||
Export test statistics to CSV file for external analysis (Excel, etc.).
|
||
|
||
Creates a CSV file with three sections:
|
||
1. Per-Run Statistics: Detailed results for each run
|
||
2. Aggregate Statistics: Overall summary metrics
|
||
3. Problem Distribution: Analysis of failure types
|
||
|
||
Args:
|
||
custom_statistics: Dictionary with 'repetitions' and 'aggregate' data
|
||
test_name: Base name for the CSV file (e.g., "GRIFO_M_PBIT_20260129_153432")
|
||
output_folder: Absolute path to folder where CSV will be saved (same as PDF)
|
||
|
||
Returns:
|
||
Path to generated CSV file, or None if export failed
|
||
"""
|
||
try:
|
||
# Create output folder if it doesn't exist
|
||
if not os.path.exists(output_folder):
|
||
os.makedirs(output_folder)
|
||
logging.info(f"Created output folder: {output_folder}")
|
||
|
||
# Create CSV filename with absolute path
|
||
csv_filename = f"{test_name}_statistics.csv"
|
||
csv_path = os.path.join(output_folder, csv_filename)
|
||
|
||
logging.info(f"Exporting statistics to CSV: {csv_path}")
|
||
|
||
with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
|
||
writer = csv.writer(csvfile)
|
||
|
||
# Section 1: Per-Run Statistics
|
||
writer.writerow(['PER-RUN STATISTICS'])
|
||
writer.writerow([]) # Blank line
|
||
|
||
# Headers for per-run data (append detailed columns as JSON strings)
|
||
headers = [
|
||
'Run', 'Result', 'PBIT Time (s)',
|
||
'Scenario',
|
||
'B6 Total', 'B6 Pass', 'B6 Fail', 'B6 Known',
|
||
'B8 Checked', 'B8 Pass', 'B8 Fail',
|
||
'Serial Msgs', 'Serial Errors', 'Serial Fatal', 'Serial Recycles',
|
||
'Real Failures', 'Known Failures',
|
||
'Failures Detail (JSON)', 'Known Failures Detail (JSON)', 'Serial Details (JSON)'
|
||
]
|
||
writer.writerow(headers)
|
||
|
||
# Per-run data rows
|
||
for run in custom_statistics['repetitions']:
|
||
# Prepare detailed JSON fields for precise per-run analysis
|
||
# failures: list of tuples (field, error) or (category, field, error)
|
||
failures = []
|
||
for item in run.get('failures', []):
|
||
try:
|
||
# normalize tuple/list entries
|
||
if isinstance(item, (list, tuple)):
|
||
failures.append(list(item))
|
||
else:
|
||
failures.append([str(item)])
|
||
except Exception:
|
||
failures.append([str(item)])
|
||
|
||
known_failures = []
|
||
for item in run.get('known_failures', []):
|
||
try:
|
||
if isinstance(item, (list, tuple)):
|
||
known_failures.append(list(item))
|
||
else:
|
||
known_failures.append([str(item)])
|
||
except Exception:
|
||
known_failures.append([str(item)])
|
||
|
||
serial_details = run.get('serial_details', [])
|
||
|
||
row = [
|
||
run['repetition'],
|
||
'PASS' if run['success'] else 'FAIL',
|
||
f"{run.get('pbit_time', 0):.2f}",
|
||
run.get('scenario', ''),
|
||
run.get('b6_total', 0),
|
||
run.get('b6_pass', 0),
|
||
run.get('b6_fail', 0),
|
||
run.get('b6_known_fail', 0),
|
||
run.get('b8_checked', 0),
|
||
run.get('b8_pass', 0),
|
||
run.get('b8_fail', 0),
|
||
run.get('serial_total', 0),
|
||
run.get('serial_errors', 0),
|
||
run.get('serial_fatal', 0),
|
||
run.get('serial_recycles', 0),
|
||
len(run.get('failures', [])),
|
||
len(run.get('known_failures', [])),
|
||
json.dumps(failures, ensure_ascii=False),
|
||
json.dumps(known_failures, ensure_ascii=False),
|
||
json.dumps(serial_details, ensure_ascii=False),
|
||
]
|
||
writer.writerow(row)
|
||
|
||
writer.writerow([]) # Blank line
|
||
writer.writerow([]) # Extra blank line
|
||
|
||
# Section 2: Aggregate Statistics
|
||
writer.writerow(['AGGREGATE STATISTICS'])
|
||
writer.writerow([]) # Blank line
|
||
writer.writerow(['Metric', 'Value'])
|
||
|
||
agg = custom_statistics['aggregate']
|
||
|
||
# Overall metrics
|
||
writer.writerow(['Total Runs', agg['total_runs']])
|
||
writer.writerow(['Successful Runs', agg['successful_runs']])
|
||
writer.writerow(['Failed Runs', agg['failed_runs']])
|
||
writer.writerow(['Success Rate (%)', f"{agg['successful_runs']/agg['total_runs']*100:.1f}" if agg['total_runs'] > 0 else "0.0"])
|
||
writer.writerow([]) # Blank line
|
||
|
||
# B6 LRU Status
|
||
writer.writerow(['B6 Total Checks', agg['total_b6_checks']])
|
||
writer.writerow(['B6 Pass', agg['total_b6_pass']])
|
||
writer.writerow(['B6 Fail', agg['total_b6_fail']])
|
||
writer.writerow(['B6 Known Fail', agg['total_b6_known']])
|
||
writer.writerow([]) # Blank line
|
||
|
||
# B8 Diagnostics
|
||
writer.writerow(['B8 Total Checks', agg['total_b8_checks']])
|
||
writer.writerow(['B8 Pass', agg['total_b8_pass']])
|
||
writer.writerow(['B8 Fail', agg['total_b8_fail']])
|
||
writer.writerow([]) # Blank line
|
||
|
||
# Serial Communication
|
||
writer.writerow(['Serial Total Messages', agg['total_serial_msgs']])
|
||
writer.writerow(['Serial Errors', agg['total_serial_errors']])
|
||
writer.writerow(['Serial Fatal', agg['total_serial_fatal']])
|
||
writer.writerow(['Serial Recycles', agg['total_serial_recycles']])
|
||
writer.writerow([]) # Blank line
|
||
|
||
# Timing Statistics
|
||
writer.writerow(['Average PBIT Time (s)', f"{agg['avg_pbit_time']:.2f}" if agg['avg_pbit_time'] is not None else "N/A"])
|
||
writer.writerow(['Min PBIT Time (s)', f"{agg['min_pbit_time']:.2f}" if agg['min_pbit_time'] is not None else "N/A"])
|
||
writer.writerow(['Max PBIT Time (s)', f"{agg['max_pbit_time']:.2f}" if agg['max_pbit_time'] is not None else "N/A"])
|
||
writer.writerow(['Std Dev PBIT Time (s)', f"{agg['std_dev_pbit']:.2f}" if agg['std_dev_pbit'] is not None else "N/A"])
|
||
|
||
writer.writerow([]) # Blank line
|
||
writer.writerow([]) # Extra blank line
|
||
|
||
# Section 3: Known Failures (Ignored)
|
||
writer.writerow(['KNOWN FAILURES (IGNORED IN STATISTICS)'])
|
||
writer.writerow([])
|
||
writer.writerow(['These failures are expected due to HW test setup limitations and do not affect test verdict:'])
|
||
writer.writerow([])
|
||
|
||
# List known failures from KNOWN_FAILURES constant
|
||
from GRIFO_M_PBIT import KNOWN_FAILURES
|
||
for known_field in KNOWN_FAILURES:
|
||
# Extract clean field name
|
||
if 'RdrHealthStatusAndBitReport_' in known_field:
|
||
clean_name = known_field.split('RdrHealthStatusAndBitReport_')[-1]
|
||
else:
|
||
clean_name = known_field.split('_')[-1] if '_' in known_field else known_field
|
||
clean_name = clean_name.replace('_', ' ').title()
|
||
writer.writerow([f" - {clean_name}"])
|
||
|
||
writer.writerow([]) # Blank line
|
||
writer.writerow([]) # Extra blank line
|
||
|
||
# Section 4: Problem Distribution Analysis
|
||
writer.writerow(['PROBLEM DISTRIBUTION ANALYSIS'])
|
||
writer.writerow([]) # Blank line
|
||
|
||
# Analyze problem types from repetitions (same logic as PDF)
|
||
problem_counts = {}
|
||
total_runs = agg['total_runs']
|
||
perfect_runs = agg['successful_runs']
|
||
|
||
for run in custom_statistics['repetitions']:
|
||
if not run['success']:
|
||
# Extract FULL field names from failures (not just last parts)
|
||
for field, value in run['failures']:
|
||
# Remove common prefix but keep full field identifier
|
||
# Example: "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status"
|
||
# -> "processor_status"
|
||
if 'RdrHealthStatusAndBitReport_' in field:
|
||
# Extract everything after message name
|
||
test_name_clean = field.split('RdrHealthStatusAndBitReport_')[-1]
|
||
elif '_' in field and len(field.split('_')) > 3:
|
||
# For other messages, keep last 4 parts for context
|
||
parts = field.split('_')
|
||
test_name_clean = '_'.join(parts[-4:])
|
||
else:
|
||
test_name_clean = field
|
||
|
||
# Clean up for display (capitalize, keep underscores for clarity)
|
||
test_name_clean = test_name_clean.replace('_', ' ').title()
|
||
problem_counts[test_name_clean] = problem_counts.get(test_name_clean, 0) + 1
|
||
|
||
# Serial problems
|
||
if run.get('serial_fatal', 0) > 0:
|
||
problem_counts['Serial Communication (Fatal)'] = problem_counts.get('Serial Communication (Fatal)', 0) + 1
|
||
if run.get('serial_recycles', 0) > 1:
|
||
problem_counts['System Instability (Recycles)'] = problem_counts.get('System Instability (Recycles)', 0) + 1
|
||
|
||
if problem_counts:
|
||
# Sort by frequency (descending)
|
||
sorted_problems = sorted(problem_counts.items(), key=lambda x: x[1], reverse=True)
|
||
|
||
writer.writerow(['Problem Type', 'Occurrences', '% of Total Runs', '% of Failed Runs'])
|
||
|
||
for problem, count in sorted_problems:
|
||
pct_total = (count / total_runs * 100) if total_runs > 0 else 0
|
||
pct_failed = (count / (total_runs - perfect_runs) * 100) if (total_runs - perfect_runs) > 0 else 0
|
||
|
||
writer.writerow([
|
||
problem,
|
||
count,
|
||
f"{pct_total:.1f}",
|
||
f"{pct_failed:.1f}"
|
||
])
|
||
else:
|
||
writer.writerow(['No problems detected - all runs were successful!'])
|
||
|
||
logging.info(f"Statistics exported successfully to: {csv_path}")
|
||
return csv_path
|
||
|
||
except Exception as e:
|
||
logging.error(f"Failed to export statistics to CSV: {e}")
|
||
logging.error(traceback.format_exc())
|
||
return None
|
||
|
||
|
||
def tgt_gen(interface):
|
||
logging.info('tgt_gen()')
|
||
#time.sleep(5)
|
||
period=10 #ms
|
||
expeced_range=1000
|
||
pcnt=0
|
||
for i in range(500):
|
||
time.sleep(0.010)
|
||
cnt = interface.getSingleMessageReceivedSz("B9")
|
||
t_num=interface.getMessageFieldValue("B9", "b9_t_num")
|
||
t_rng=interface.getMessageFieldValue("B9", "b9_t1_rng")
|
||
|
||
if (i % 10)==0:
|
||
dcnt=cnt-pcnt
|
||
pcnt=cnt
|
||
logging.info(f'TgtMsg: {cnt} {dcnt}')
|
||
|
||
if t_num>0:
|
||
logging.info(f'Tgt: {t_num} @ {t_rng}')
|
||
|
||
ret_proc_sts, err= check(theGrifo1553,(1,2), "B9", "b9_t_num")
|
||
check(theGrifo1553,(1179, 1186), "B9", "b9_t1_rng")
|
||
return True # Target found
|
||
if interruptRequest is True:
|
||
break
|
||
|
||
return False # Target not found
|
||
|
||
|
||
def tgt_gen_alone(interface):
|
||
interface.logStart(3,os.path.dirname(sys.argv[0]))
|
||
target_found = False
|
||
for n in range(10*1000):
|
||
logging.info(f'tgt_gen_alone(): {n}')
|
||
target_found = tgt_gen(interface)
|
||
if target_found or interruptRequest is True:
|
||
break
|
||
interface.logStop()
|
||
return target_found
|
||
|
||
def ask_production_config():
|
||
"""
|
||
Ask user for test configuration in production mode (target reale).
|
||
|
||
Returns:
|
||
tuple: (num_runs, gui_enabled)
|
||
"""
|
||
print("")
|
||
print("=" * 80)
|
||
print("GRIFO PBIT - PRODUCTION MODE (Target Reale)")
|
||
print("=" * 80)
|
||
print("")
|
||
|
||
# Check if GUI is available
|
||
try:
|
||
from GRIFO_M_PBIT_gui import TestMonitorGUI
|
||
gui_available = True
|
||
except ImportError:
|
||
gui_available = False
|
||
|
||
# Ask about GUI first (if available)
|
||
gui_enabled = False
|
||
if gui_available:
|
||
while True:
|
||
try:
|
||
gui_input = input("Enable real-time GUI monitor? (y/n) [y]: ").strip().lower()
|
||
if gui_input in ['', 'y', 'yes']:
|
||
gui_enabled = True
|
||
print("✓ GUI monitor will be enabled")
|
||
break
|
||
elif gui_input in ['n', 'no']:
|
||
gui_enabled = False
|
||
print("✓ GUI monitor disabled (console only)")
|
||
break
|
||
else:
|
||
print("Please enter 'y' or 'n'")
|
||
except (KeyboardInterrupt, EOFError):
|
||
gui_enabled = False
|
||
break
|
||
print("")
|
||
else:
|
||
print("[INFO] GUI monitor not available (tkinter import failed)")
|
||
print("")
|
||
|
||
# Ask for number of runs
|
||
while True:
|
||
try:
|
||
user_input = input(f"How many test runs do you want to execute? (minimum 1) [{NUMBER_OF_REPETITIONS}]: ").strip()
|
||
|
||
if user_input == '':
|
||
num_runs = NUMBER_OF_REPETITIONS
|
||
break
|
||
|
||
num_runs = int(user_input)
|
||
|
||
if num_runs < 1:
|
||
print(f"Error: Number of runs must be at least 1. You entered: {num_runs}")
|
||
continue
|
||
|
||
break
|
||
|
||
except ValueError:
|
||
print(f"Error: Invalid input. Please enter a number.")
|
||
continue
|
||
except (KeyboardInterrupt, EOFError):
|
||
print("\n\n[INFO] Using default value")
|
||
num_runs = NUMBER_OF_REPETITIONS
|
||
break
|
||
|
||
print("")
|
||
print(f"✓ Configured for {num_runs} test run(s)")
|
||
print("")
|
||
print("=" * 80)
|
||
print("")
|
||
|
||
return num_runs, gui_enabled
|
||
|
||
|
||
def test_proc():
|
||
# ========== SIMULATION MODE SUPPORT ==========
|
||
# Enable test execution without hardware using --simulate flag
|
||
# Mock implementation in GRIFO_M_PBIT_mock.py provides simulated interfaces
|
||
global gui_monitor
|
||
gui_monitor = None # Reference to GUI monitor (if available)
|
||
|
||
if '--simulate' in sys.argv:
|
||
from GRIFO_M_PBIT_mock import initialize_simulation, setup_simulation, create_mock_terminal
|
||
import GRIFO_M_PBIT_mock
|
||
|
||
# Check if already initialized by launcher to avoid double initialization
|
||
if GRIFO_M_PBIT_mock._requested_runs is None:
|
||
# Not initialized yet, do it now (direct execution scenario)
|
||
initialize_simulation()
|
||
setup_simulation()
|
||
# else: already initialized by launcher, skip
|
||
|
||
# Get GUI reference if enabled
|
||
gui_monitor = GRIFO_M_PBIT_mock._gui_monitor
|
||
|
||
use_mock_terminal = True
|
||
else:
|
||
# ========== PRODUCTION MODE (Target Reale) ==========
|
||
# Ask user for configuration (number of runs and GUI)
|
||
runs_total, gui_enabled = ask_production_config()
|
||
|
||
# Initialize GUI if enabled
|
||
if gui_enabled:
|
||
try:
|
||
from GRIFO_M_PBIT_gui import TestMonitorGUI
|
||
gui_monitor = TestMonitorGUI()
|
||
gui_monitor.start()
|
||
gui_monitor.update_status(run_total=runs_total)
|
||
gui_monitor.log_event('info', f'Production mode - {runs_total} runs configured')
|
||
logging.info("GUI monitor started successfully")
|
||
except Exception as e:
|
||
logging.warning(f"Failed to start GUI monitor: {e}")
|
||
gui_monitor = None
|
||
|
||
use_mock_terminal = False
|
||
|
||
# Determine total runs to execute. In simulate mode, prefer user-requested runs from mock.
|
||
if use_mock_terminal:
|
||
try:
|
||
runs_total = GRIFO_M_PBIT_mock._requested_runs if GRIFO_M_PBIT_mock._requested_runs is not None else NUMBER_OF_REPETITIONS
|
||
except Exception:
|
||
runs_total = NUMBER_OF_REPETITIONS
|
||
# In production mode, runs_total is already set by ask_production_config()
|
||
# ========== END SIMULATION SUPPORT ==========
|
||
|
||
global report, test_statistics
|
||
|
||
# Complete bit_fields: All B6 LRU status + All B8 degradation/SRU/test fields
|
||
# Total: 185 fields (12 B6 status + 12 B8 degradation + 43 B8 SRU + 118 B8 tests)
|
||
bit_fields = (
|
||
# ===== B6: LRU Status Fields =====
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_array_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pedestal_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_pressurization_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_over_temperature_alarm",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_processor_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_receiver_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_rx_front_end_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_servoloop_over_temperature_alarm",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_servoloop_status",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_over_temperature_alarm",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_trasmitter_status",
|
||
# ===== B8: Degradation Conditions =====
|
||
"degradation_conditions_w1_DegradationConditionsW1_bcn_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_gm_rbm_sea1_ta_wa_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_group1_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_group2_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_group3_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_group4_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_group5_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_hr_modes_and_gm_dbs_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_no_rdr_symbology",
|
||
"degradation_conditions_w1_DegradationConditionsW1_not_identified_rdr_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_selected_channel_fail",
|
||
"degradation_conditions_w1_DegradationConditionsW1_total_rdr_fail",
|
||
# ===== B8: SRU Failure Locations =====
|
||
"failure_location_pedestal_FailureLocationPedestal_sru1_gimbal",
|
||
"failure_location_pedestal_FailureLocationPedestal_sru2_waveguide",
|
||
"failure_location_pedestal_FailureLocationPedestal_sru3_waveguide",
|
||
"failure_location_pedestal_FailureLocationPedestal_sru4_delta_guard_lna_switch",
|
||
"failure_location_pedestal_FailureLocationPedestal_sru5_waveguide_switch",
|
||
"failure_location_processor_FailureLocationProcessor_sru10_main_computer",
|
||
"failure_location_processor_FailureLocationProcessor_sru11_graphic_computer",
|
||
"failure_location_processor_FailureLocationProcessor_sru12_power_supply",
|
||
"failure_location_processor_FailureLocationProcessor_sru13_det_exp",
|
||
"failure_location_processor_FailureLocationProcessor_sru14_rx_module",
|
||
"failure_location_processor_FailureLocationProcessor_sru1_motherboard_chassis",
|
||
"failure_location_processor_FailureLocationProcessor_sru2_mti_fft",
|
||
"failure_location_processor_FailureLocationProcessor_sru3_dsp0",
|
||
"failure_location_processor_FailureLocationProcessor_sru4_dsp1",
|
||
"failure_location_processor_FailureLocationProcessor_sru5_cfar_px_ctrl",
|
||
"failure_location_processor_FailureLocationProcessor_sru6_timer",
|
||
"failure_location_processor_FailureLocationProcessor_sru7_post_processor",
|
||
"failure_location_processor_FailureLocationProcessor_sru8_agc",
|
||
"failure_location_processor_FailureLocationProcessor_sru9_esa_if",
|
||
"failure_location_receiver_FailureLocationReceiver_sru1_chassis",
|
||
"failure_location_receiver_FailureLocationReceiver_sru2_uhf_assy",
|
||
"failure_location_receiver_FailureLocationReceiver_sru3_synthesizer",
|
||
"failure_location_receiver_FailureLocationReceiver_sru4_delta_guard_down_converter",
|
||
"failure_location_receiver_FailureLocationReceiver_sru5_sum_down_converter",
|
||
"failure_location_receiver_FailureLocationReceiver_sru6_lo_distributor",
|
||
"failure_location_receiver_FailureLocationReceiver_sru7_up_converter",
|
||
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru1_chassis",
|
||
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru2_delta_guard_lna",
|
||
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru3_sum_act_prot_lna",
|
||
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru4_4port_circulator",
|
||
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru5_stc_delta_guard",
|
||
"failure_location_rx_frontend_FailureLocationRxFrontEnd_sru5_stc_sum",
|
||
"failure_location_servoloop_FailureLocationServoloop_sru1_chassis",
|
||
"failure_location_servoloop_FailureLocationServoloop_sru2_power_supply",
|
||
"failure_location_servoloop_FailureLocationServoloop_sru3_digital_controller",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru1_chassis",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru2_rex_f_tx",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru3_power_supply",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru4_valve_el_twt_tx",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru5_rf_driver",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru6_controller_tx",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru7_hv_power_supply",
|
||
"failure_location_transmitter_FailureLocationTransmitter_sru8_eht_power_supply",
|
||
# ===== B8: All Test Results =====
|
||
"agc_test_results_AGCTestResults_test_agc10_pulse_compressor_interface",
|
||
"agc_test_results_AGCTestResults_test_agc11_dp_interface",
|
||
"agc_test_results_AGCTestResults_test_agc13_taxi_running",
|
||
"agc_test_results_AGCTestResults_test_agc14_external_xyp_ram",
|
||
"agc_test_results_AGCTestResults_test_agc15_servoloop_interface",
|
||
"agc_test_results_AGCTestResults_test_agc1_internal_xyp_ram",
|
||
"agc_test_results_AGCTestResults_test_agc2_external_xyp_ram",
|
||
"agc_test_results_AGCTestResults_test_agc5_dual_port_ram",
|
||
"agc_test_results_AGCTestResults_test_agc6_agc_machine",
|
||
"agc_test_results_AGCTestResults_test_agc7_sat_machine",
|
||
"agc_test_results_AGCTestResults_test_agc9_c_ram_xy_checksum",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp10_video_memory",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp11_video_unit",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp12_transputer_unit",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp13_scan_converter_polar_memory",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp14_scan_converter_format_converter",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp1_486_cpu_tests",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp2_486_interfaces_with_r3000_gc",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp3_486_interface_with_slc",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp4_slc_communications",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp5_r3000_cpu_tests",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp6_r3000_interfaces",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp7_1553_and_discretes",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp8_graphic_cpu",
|
||
"data_processor_test_results_DataProcessorTestResults_test_dp9_graphic_processors",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_array_status",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_cal_delta_channel_fail",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_cal_injection_fail",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_cal_noise_fail",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_pedestal_status",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_processor_status",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_receiver_status",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_rx_frontend_status",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_servoloop_status",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_test_is1_upconverter_chain_levels",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_test_is2_downconverter_chain_levels",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_test_is3_antenna_status_inconsistent",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_test_is4_tx_status_inconsistent",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_test_is5_tx_power_level",
|
||
"integrated_system_test_results_IntegratedSystemTestResults_transmitter_status",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp1_master_dsp",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp2_interface_card",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp3_cpu_cards",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp4_dma_bus",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp5_sp_interface",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp6_dp_interface",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp7_scan_converter_interface",
|
||
"post_processor_test_results_PostProcessorTestResults_test_pp8_agc_interface",
|
||
"power_supply_test_results_PowerSupplyTestResults_test_ps1_power_supply",
|
||
"power_supply_test_results_PowerSupplyTestResults_test_ps2_over_temperature",
|
||
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_fe1_lna",
|
||
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_fe2_agc_attenuators",
|
||
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx1_synthesizer_commands",
|
||
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx2_synthesizer_internal_tests",
|
||
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx3_uhf_oscillator_level",
|
||
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx4_downconverter_lo_level",
|
||
"receiver_and_rx_frontend_test_results_ReceiverAndRxTestResults_test_rx5_upconverter_lo_level",
|
||
"rx_module_test_results_RxModuleTestResults_test_rm16_calibration_sum_channel_fail",
|
||
"rx_module_test_results_RxModuleTestResults_test_rm1_master_clock_level",
|
||
"rx_module_test_results_RxModuleTestResults_test_rm2_expander_level",
|
||
"rx_module_test_results_RxModuleTestResults_test_rm3_sum_channel_down_converter",
|
||
"rx_module_test_results_RxModuleTestResults_test_rm4_dg_channel_down_converter",
|
||
"rx_module_test_results_RxModuleTestResults_test_rm5_noise_attenuators",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl10_agc_control",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl11_ad",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl12_das",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl13_serial_communications",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl14_taxi_interface",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl15_pedestal_centre_scan_location",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl1_low_voltage_power_supply",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl2_high_voltage_power_supply",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl3_motor_drivers",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl4_resolvers_power_supply",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl5_waveguide_switch",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl6_over_temperature",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl7_resolver_to_digital_conv",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl8_position_loop_error",
|
||
"servoloop_test_results_ServoloopTestResults_test_sl9_microprocessor",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp10_board_overall",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp11_attenuatori_antenna",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp14_external_sp_if",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp16_bcn",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp1_timer1_up",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp2_timer_dma_pxc_if",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp3_timer_internal",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp4_px_ctrl_comm",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp5_video1_without_ad",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp6_video1_with_ad",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp7_video2_ad_sync",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp8_video2_timer_sync",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp9_ad_da",
|
||
"signal_processor_test_results_SignalProcessorTestResults_test_sp9b_wideband_expander",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx10_hv_ps_over_temperature_warning",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx11_twt_helix_over_current",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx12_cathode_to_helix_arc",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx13_twt_over_temperature_hazard",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx14_twt_over_temperature_warning",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx15_cathode_under_voltage",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx16_cathode_over_voltage",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx1_microprocessors",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx2_tx_rf_input",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx3_twt_rf_input",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx4_twt_rf_output",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx5_tx_rf_output_level",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx6_vswr",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx7_three_phase_input_power",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx8_low_voltage_power_supplies",
|
||
"transmitter_test_results_w1_TransmitterTestResultsW1_test_tx9_hv_ps_over_temperature_hazard",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx17_collector_under_voltage",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx18_collector_over_voltage",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx19_rectified_voltage",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx20_cathode_inv_current_fail",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx21_collector_inv_current_fail",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx22_waveguide_pressurization",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx23_grid_window_over_duty_alt",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx24_floating_deck_fail",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx25_floating_deck_ps_fail",
|
||
"transmitter_test_results_w2_TransmitterTestResultsW2_test_tx26_grid_window_over_duty",
|
||
)
|
||
|
||
# ====================
|
||
# BIT FIELDS CATEGORIZATION
|
||
# ====================
|
||
# Dictionary mapping category names to field indices in bit_fields tuple.
|
||
# Used for organized drill-down reporting when B6 failures trigger B8 verification.
|
||
#
|
||
# Categories:
|
||
# B6_LRU_Status: 12 Line Replaceable Unit status fields (always checked)
|
||
# B8_Degradation: 12 system degradation condition flags
|
||
# B8_SRU_*: 43 Shop Replaceable Unit failure location flags (6 subsystems)
|
||
# B8_Test_*: 118 detailed test result fields (10 test types)
|
||
#
|
||
# Total: 185 diagnostic fields providing complete radar health visibility
|
||
bit_fields_categories = {
|
||
'B6_LRU_Status': bit_fields[0:12],
|
||
'B8_Degradation': bit_fields[12:24],
|
||
'B8_SRU_Pedestal': bit_fields[24:29],
|
||
'B8_SRU_Processor': bit_fields[29:43],
|
||
'B8_SRU_Receiver': bit_fields[43:50],
|
||
'B8_SRU_RxFrontend': bit_fields[50:56],
|
||
'B8_SRU_Servoloop': bit_fields[56:59],
|
||
'B8_SRU_Transmitter': bit_fields[59:67],
|
||
'B8_Test_AGC': bit_fields[67:78],
|
||
'B8_Test_DataProcessor': bit_fields[78:92],
|
||
'B8_Test_IntegratedSystem': bit_fields[92:107],
|
||
'B8_Test_PostProcessor': bit_fields[107:115],
|
||
'B8_Test_PowerSupply': bit_fields[115:117],
|
||
'B8_Test_Receiver': bit_fields[117:124],
|
||
'B8_Test_RxModule': bit_fields[124:130],
|
||
'B8_Test_Servoloop': bit_fields[130:145],
|
||
'B8_Test_SignalProcessor': bit_fields[145:159],
|
||
'B8_Test_Transmitter': bit_fields[159:185],
|
||
}
|
||
|
||
logger_setup('GRIFO_M_PBIT.log')
|
||
report = testReport(sys.argv[0])
|
||
interface = theGrifo1553.getInterface()
|
||
|
||
# Create serial terminal (real or mock based on simulation mode)
|
||
if use_mock_terminal:
|
||
terminal = create_mock_terminal()
|
||
else:
|
||
terminal = leo_grifo_terminal.GrifoSerialTerminal()
|
||
|
||
terminal.connect()
|
||
test_return = True
|
||
try:
|
||
#report.open_session('Pre Conditions')
|
||
#power_grifo_off()
|
||
#report.close_session()
|
||
############ Test Execution ############
|
||
|
||
#report.open_session('Test Execution')
|
||
report.add_comment("The Test Operator check if the failure BIT in B6_MsgRdrSettingsAndParametersTellback changes ...")
|
||
|
||
if tgt_gen_alone(interface) is False: return
|
||
|
||
for repetition in range(runs_total):
|
||
info = f'Repetition {1 + repetition} of {runs_total}'
|
||
logging.info(info)
|
||
report.open_session(info)
|
||
|
||
# Update GUI for new run
|
||
if gui_monitor:
|
||
gui_monitor.update_status(run_current=repetition + 1, run_total=runs_total, power_on=True)
|
||
gui_monitor.log_event('info', f'Starting repetition {repetition + 1}/{runs_total}')
|
||
|
||
# Statistics for this run
|
||
run_stats = {
|
||
'repetition': repetition + 1,
|
||
'pbit_time': 0,
|
||
'bit_available': False,
|
||
'b6_total': 0,
|
||
'b6_pass': 0,
|
||
'b6_fail': 0,
|
||
'b6_known_fail': 0,
|
||
'b8_checked': 0,
|
||
'b8_pass': 0,
|
||
'b8_fail': 0,
|
||
'failures': [],
|
||
'known_failures': [],
|
||
'success': True,
|
||
# Serial statistics
|
||
'serial_total': 0,
|
||
'serial_errors': 0,
|
||
'serial_fatal': 0,
|
||
'serial_recycles': 0,
|
||
'serial_details': [], # List of notable serial events
|
||
}
|
||
# Attach scenario name: simulation scenario or production indicator
|
||
if use_mock_terminal:
|
||
# Simulation mode: use configured scenario from mock
|
||
try:
|
||
if hasattr(GRIFO_M_PBIT_mock, '_scenario_list') and GRIFO_M_PBIT_mock._scenario_list:
|
||
idx = repetition if repetition < len(GRIFO_M_PBIT_mock._scenario_list) else 0
|
||
run_stats['scenario'] = GRIFO_M_PBIT_mock._scenario_list[idx]
|
||
else:
|
||
run_stats['scenario'] = None
|
||
except Exception:
|
||
run_stats['scenario'] = None
|
||
else:
|
||
# Production mode: indicate real hardware execution
|
||
run_stats['scenario'] = 'Production Run'
|
||
test_statistics['total_runs'] += 1
|
||
|
||
# Reset serial statistics for this run
|
||
terminal.reset_serial_statistics()
|
||
|
||
report.add_comment("The test operator is required to switch off the target and wait 3 seconds.")
|
||
power_grifo_off(wait_after=4, wait_before=1)
|
||
|
||
# Update GUI - power off
|
||
if gui_monitor:
|
||
gui_monitor.update_status(power_on=False, pbit_time=0.0)
|
||
gui_monitor.log_event('info', 'Power OFF - waiting before=1s, after=4s')
|
||
|
||
report.add_comment("The test operator is required to switch on the target.")
|
||
power_grifo_on(wait_after=0.100)
|
||
|
||
# Update GUI - power on
|
||
if gui_monitor:
|
||
gui_monitor.update_status(power_on=True)
|
||
gui_monitor.log_event('info', 'Power ON - waiting for BIT...')
|
||
|
||
remaining_time = PBIT_SEC_TIME
|
||
pbit_start_time = time.perf_counter()
|
||
|
||
max_counter_1553_msg = 20
|
||
while remaining_time > 0:
|
||
start = time.perf_counter()
|
||
ret_rep_is_avail = False
|
||
msg_cnt = 0
|
||
mil1553_error_flag = max_counter_1553_msg
|
||
for i in range(100):
|
||
cnt = interface.getSingleMessageReceivedSz("B6_MsgRdrSettingsAndParametersTellback")
|
||
value = interface.getMessageFieldValue("B6_MsgRdrSettingsAndParametersTellback",
|
||
"radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_bit_report_available")
|
||
ret_rep_is_avail = value == "true"
|
||
if ret_rep_is_avail is True:
|
||
break
|
||
time.sleep(0.05)
|
||
#logging.critical(f"1553 messag count {cnt} {mil1553_error_flag}")
|
||
if cnt > msg_cnt:
|
||
mil1553_error_flag = max_counter_1553_msg
|
||
else :
|
||
mil1553_error_flag -=1
|
||
msg_cnt = cnt
|
||
|
||
if mil1553_error_flag == 0:
|
||
logging.critical("1553 communication lost")
|
||
return False
|
||
|
||
if ret_rep_is_avail is True:
|
||
time.sleep(0.02)
|
||
run_stats['bit_available'] = True
|
||
run_stats['pbit_time'] = time.perf_counter() - pbit_start_time
|
||
|
||
report.add_comment(f"BIT report available after {run_stats['pbit_time']:.1f}s")
|
||
|
||
# Update GUI - BIT available
|
||
if gui_monitor:
|
||
gui_monitor.update_status(pbit_available=True, pbit_time=run_stats['pbit_time'])
|
||
gui_monitor.log_event('success', f"BIT available after {run_stats['pbit_time']:.1f}s")
|
||
|
||
# ===== PHASE 1: Verify ALL B6 LRU Status Fields =====
|
||
b6_lru_fields = bit_fields_categories['B6_LRU_Status']
|
||
b6_failures = []
|
||
b6_known_failures = []
|
||
radar_fail_status_field = "radar_health_status_and_bit_report_valid_RdrHealthStatusAndBitReport_radar_fail_status"
|
||
|
||
# Check all B6 fields EXCEPT radar_fail_status (check it last)
|
||
for f in b6_lru_fields:
|
||
if f == radar_fail_status_field:
|
||
continue # Skip radar_fail_status, check it after all others
|
||
|
||
run_stats['b6_total'] += 1
|
||
ret, err = check(theGrifo1553, "false", "B6_MsgRdrSettingsAndParametersTellback", f)
|
||
|
||
# Update GUI with B6 progress
|
||
if gui_monitor and run_stats['b6_total'] % 3 == 0: # Update every 3 checks
|
||
gui_monitor.update_statistics(
|
||
b6_total=run_stats['b6_total'],
|
||
b6_pass=run_stats['b6_pass'],
|
||
b6_fail=run_stats['b6_fail'],
|
||
b6_known=run_stats['b6_known_fail']
|
||
)
|
||
|
||
if ret:
|
||
run_stats['b6_pass'] += 1
|
||
else:
|
||
if f in KNOWN_FAILURES:
|
||
# Known failure: annotate but don't trigger drill-down
|
||
run_stats['b6_known_fail'] += 1
|
||
b6_known_failures.append((f, err))
|
||
logging.warning(f"Known failure (ignored): {f}")
|
||
else:
|
||
# Real failure: needs investigation
|
||
run_stats['b6_fail'] += 1
|
||
b6_failures.append((f, err))
|
||
test_return = False
|
||
run_stats['success'] = False
|
||
|
||
# ===== SPECIAL CHECK: radar_fail_status (aggregate flag) =====
|
||
# This flag aggregates all component statuses. Logic:
|
||
# - If ONLY known failures exist (e.g., pedestal), ignore it
|
||
# - If ANY real failures exist, it's a valid indicator
|
||
run_stats['b6_total'] += 1
|
||
ret_radar_fail, err_radar_fail = check(theGrifo1553, "false", "B6_MsgRdrSettingsAndParametersTellback", radar_fail_status_field)
|
||
|
||
if ret_radar_fail:
|
||
run_stats['b6_pass'] += 1
|
||
else:
|
||
# radar_fail_status is TRUE (indicating failure)
|
||
if len(b6_failures) > 0:
|
||
# Real failures exist -> radar_fail_status is a valid failure indicator
|
||
run_stats['b6_fail'] += 1
|
||
b6_failures.append((radar_fail_status_field, err_radar_fail))
|
||
test_return = False
|
||
run_stats['success'] = False
|
||
logging.warning(f"Radar fail status: REAL failure (caused by: {', '.join([f.split('_')[-1] for f, _ in b6_failures[:3]])})")
|
||
else:
|
||
# Only known failures exist -> radar_fail_status is caused by known issues
|
||
run_stats['b6_known_fail'] += 1
|
||
b6_known_failures.append((radar_fail_status_field, err_radar_fail))
|
||
logging.warning(f"Radar fail status: Known failure (caused only by pedestal)")
|
||
|
||
# Log B6 summary to console (not as PDF step - will be in final tables)
|
||
logging.info(f"[Run {repetition+1}] B6 LRU Status: {run_stats['b6_total']} total, "
|
||
f"{run_stats['b6_pass']} pass, {run_stats['b6_fail']} fail, "
|
||
f"{run_stats['b6_known_fail']} known")
|
||
|
||
# Update GUI with final B6 stats
|
||
if gui_monitor:
|
||
gui_monitor.update_statistics(
|
||
b6_total=run_stats['b6_total'],
|
||
b6_pass=run_stats['b6_pass'],
|
||
b6_fail=run_stats['b6_fail'],
|
||
b6_known=run_stats['b6_known_fail']
|
||
)
|
||
if run_stats['b6_fail'] > 0:
|
||
gui_monitor.log_event('warning', f'B6: {run_stats["b6_fail"]} real failures detected')
|
||
else:
|
||
gui_monitor.log_event('success', 'B6: All checks passed')
|
||
|
||
# Store failures for final aggregate report (not as steps)
|
||
if b6_known_failures:
|
||
run_stats['known_failures'].extend(b6_known_failures)
|
||
logging.info(f" Known failures (HW setup): {len(b6_known_failures)}")
|
||
|
||
if b6_failures:
|
||
run_stats['failures'].extend(b6_failures)
|
||
fail_summary = ', '.join([f.split('_')[-1] for f, _ in b6_failures[:3]])
|
||
logging.warning(f" Real failures: {fail_summary}{'...' if len(b6_failures) > 3 else ''}")
|
||
|
||
# ===== PHASE 2: Drill-down B8 only if REAL failures in B6 =====
|
||
# Check if B8 drill-down is needed:
|
||
# - Always if there are real B6 failures
|
||
# - OR if FORCE_B8_DRILL_DOWN=True and there are known failures (target2 behavior)
|
||
should_drill_down = b6_failures or (FORCE_B8_DRILL_DOWN and b6_known_failures)
|
||
|
||
if should_drill_down:
|
||
if FORCE_B8_DRILL_DOWN and not b6_failures:
|
||
report.add_comment(f"\nForced B8 drill-down (FORCE_B8_DRILL_DOWN=True): Verifying all {len(bit_fields) - 12} B8 diagnostic fields...")
|
||
logging.info("[FORCE_B8_DRILL_DOWN] Performing B8 drill-down despite only known failures")
|
||
else:
|
||
report.add_comment(f"\nDrill-down: Verifying all {len(bit_fields) - 12} B8 diagnostic fields...")
|
||
|
||
b8_fields = bit_fields[12:] # All B8 fields
|
||
b8_failures = []
|
||
|
||
for category, fields in list(bit_fields_categories.items())[1:]: # Skip B6
|
||
category_fail = 0
|
||
category_pass = 0
|
||
|
||
for f in fields:
|
||
run_stats['b8_checked'] += 1
|
||
ret, err = check(theGrifo1553, "false", "B8_MsgBitReport", f)
|
||
|
||
# Update GUI with B8 progress
|
||
if gui_monitor and run_stats['b8_checked'] % 10 == 0: # Update every 10 checks
|
||
gui_monitor.update_statistics(
|
||
b8_checked=run_stats['b8_checked'],
|
||
b8_pass=run_stats['b8_pass'],
|
||
b8_fail=run_stats['b8_fail']
|
||
)
|
||
|
||
if ret:
|
||
category_pass += 1
|
||
run_stats['b8_pass'] += 1
|
||
else:
|
||
category_fail += 1
|
||
run_stats['b8_fail'] += 1
|
||
b8_failures.append((category, f, err))
|
||
test_return = False
|
||
|
||
if category_fail > 0:
|
||
logging.warning(f"{category}: {category_fail}/{len(fields)} failures")
|
||
|
||
# Log B8 summary to console (not as PDF step - will be in final tables)
|
||
logging.info(f"[Run {repetition+1}] B8 Diagnostics: {run_stats['b8_checked']} checked, "
|
||
f"{run_stats['b8_pass']} pass, {run_stats['b8_fail']} fail")
|
||
|
||
if b8_failures:
|
||
# Store failures for final aggregate report
|
||
# Details will be shown in dedicated PDF section, not as step logs
|
||
for cat, field, err in b8_failures:
|
||
run_stats['failures'].append((field, err))
|
||
|
||
# Log to console for immediate feedback
|
||
fail_by_cat = {}
|
||
for cat, field, err in b8_failures:
|
||
if cat not in fail_by_cat:
|
||
fail_by_cat[cat] = []
|
||
fail_by_cat[cat].append(field.split('_')[-1])
|
||
|
||
for cat, fails in fail_by_cat.items():
|
||
logging.warning(f" {cat}: {len(fails)} failures - {', '.join(fails[:3])}{'...' if len(fails) > 3 else ''}")
|
||
else:
|
||
logging.info(f"[Run {repetition+1}] All B6 LRU Status PASS (no B8 drill-down needed)")
|
||
|
||
# Run statistics
|
||
test_statistics['repetitions'].append(run_stats)
|
||
if run_stats['success']:
|
||
test_statistics['successful_runs'] += 1
|
||
else:
|
||
test_statistics['failed_runs'] += 1
|
||
|
||
time_passed = time.perf_counter() - start
|
||
remaining_time -= time_passed
|
||
if ret_rep_is_avail is True:
|
||
remaining_time = 0
|
||
logging.info(f'{remaining_time:.1f}s remaining ...')
|
||
|
||
# Collect serial statistics for this run before closing session
|
||
serial_stats = terminal.get_serial_statistics()
|
||
run_stats['serial_total'] = serial_stats['total_messages']
|
||
run_stats['serial_errors'] = serial_stats['error_messages']
|
||
run_stats['serial_fatal'] = serial_stats['fatal_messages']
|
||
run_stats['serial_recycles'] = serial_stats['recycle_count']
|
||
|
||
# Store serial details for final aggregate report
|
||
# Details will be shown in dedicated PDF section, not as step logs
|
||
if serial_stats['recycle_count'] > 0:
|
||
for timestamp, message in serial_stats['recycle_details']:
|
||
run_stats['serial_details'].append({'type': 'RECYCLE', 'timestamp': timestamp, 'message': message})
|
||
|
||
if serial_stats['error_messages'] > 0:
|
||
for timestamp, message in serial_stats['error_details'][:5]: # Limit to first 5
|
||
run_stats['serial_details'].append({'type': 'ERROR', 'timestamp': timestamp, 'message': message})
|
||
|
||
if serial_stats['fatal_messages'] > 0:
|
||
for timestamp, message in serial_stats['fatal_details'][:5]: # Limit to first 5
|
||
run_stats['serial_details'].append({'type': 'FATAL', 'timestamp': timestamp, 'message': message})
|
||
|
||
# Log summary to console for immediate feedback during test execution
|
||
logging.info(f"[Run {repetition+1}] Serial: {serial_stats['total_messages']} total, "
|
||
f"{serial_stats['error_messages']} errors, {serial_stats['fatal_messages']} fatal, "
|
||
f"{serial_stats['recycle_count']} recycles")
|
||
|
||
# Update GUI with serial statistics
|
||
if gui_monitor:
|
||
gui_monitor.update_statistics(
|
||
serial_total=serial_stats['total_messages'],
|
||
serial_errors=serial_stats['error_messages'],
|
||
serial_fatal=serial_stats['fatal_messages'],
|
||
serial_recycles=serial_stats['recycle_count']
|
||
)
|
||
if serial_stats['recycle_count'] > 0:
|
||
gui_monitor.log_event('warning', f"Serial: {serial_stats['recycle_count']} RECYCLE events")
|
||
if serial_stats['fatal_messages'] > 0:
|
||
gui_monitor.log_event('error', f"Serial: {serial_stats['fatal_messages']} fatal messages")
|
||
|
||
# Push per-run summary to GUI runs table
|
||
if gui_monitor:
|
||
result_text = 'PASS' if run_stats.get('success', False) else 'FAIL'
|
||
fail_summary = ''
|
||
if run_stats.get('failures'):
|
||
try:
|
||
fail_summary = ', '.join([f.split('_')[-1] for f, _ in run_stats['failures'][:3]])
|
||
except Exception:
|
||
fail_summary = str(run_stats.get('failures'))
|
||
|
||
gui_monitor.update_run(
|
||
run=repetition+1,
|
||
result=result_text,
|
||
pbit=run_stats.get('pbit_time', 0.0),
|
||
b6_fail=run_stats.get('b6_fail', 0),
|
||
b8_fail=run_stats.get('b8_fail', 0),
|
||
known=run_stats.get('b6_known_fail', 0),
|
||
fail_summary=fail_summary,
|
||
serial_events=len(run_stats.get('serial_details', []))
|
||
)
|
||
|
||
report.close_session()
|
||
|
||
if interruptRequest is True:
|
||
report.add_comment("Test interrupted by user (Ctrl-C)")
|
||
break
|
||
|
||
tgt_gen(interface)
|
||
|
||
report.add_comment("Repetitions terminated.")
|
||
|
||
# ===== FINAL STATISTICS REPORT =====
|
||
custom_statistics = generate_final_statistics_report(report, test_statistics)
|
||
|
||
# ===== EXPORT TO CSV (if enabled) =====
|
||
if EXPORT_STATISTICS_CSV and custom_statistics:
|
||
# Generate CSV filename with timestamp (matching log file naming)
|
||
from datetime import datetime
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
csv_base_name = f"{report.title()}_{timestamp}"
|
||
# Use same folder as PDF report for all test outputs
|
||
pdf_folder = report.get_pdf_folder()
|
||
csv_path = export_statistics_to_csv(custom_statistics, csv_base_name, pdf_folder)
|
||
# Also write JSON statistics file alongside CSV for structured consumption
|
||
try:
|
||
if csv_path and os.path.isdir(pdf_folder):
|
||
json_filename = f"{csv_base_name}_statistics.json"
|
||
json_path = os.path.join(pdf_folder, json_filename)
|
||
with open(json_path, 'w', encoding='utf-8') as jf:
|
||
json.dump(custom_statistics, jf, ensure_ascii=False, indent=2)
|
||
logging.info(f"Statistics exported successfully to JSON: {json_path}")
|
||
except Exception as e:
|
||
logging.error(f"Failed to export statistics to JSON: {e}")
|
||
logging.error(traceback.format_exc())
|
||
|
||
############ END STEPS ############
|
||
#report.open_session('Post Conditions')
|
||
power_grifo_off()
|
||
#report.close_session()
|
||
|
||
if terminal is not None:
|
||
terminal.disconnect()
|
||
|
||
return test_return
|
||
except Exception as e:
|
||
report.add_comment(f"Test terminated unexpectedly :{e}")
|
||
return False
|
||
finally:
|
||
report.generate_pdf()
|
||
# Notify GUI (if present) that test finished and where outputs are stored
|
||
if gui_monitor:
|
||
try:
|
||
out_folder = report.get_pdf_folder()
|
||
except Exception:
|
||
out_folder = None
|
||
if out_folder:
|
||
try:
|
||
gui_monitor.show_results(out_folder)
|
||
logging.info(f"Test completed - waiting for user to close GUI...")
|
||
# Wait for user to close the GUI (block main thread until GUI thread finishes)
|
||
if hasattr(gui_monitor, 'thread') and gui_monitor.thread:
|
||
gui_monitor.thread.join()
|
||
except Exception as e:
|
||
logging.error(f"Error waiting for GUI: {e}")
|
||
|
||
|
||
#-- ---------------------------------------------------------------
|
||
if __name__ == '__main__':
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
test_proc() |