SXXXXXXX_PyBusMonitor1553/tools/export_message_fields_csv.py

178 lines
7.1 KiB
Python

"""
Export message fields to a flattened tab-separated CSV.
Columns:
- Message: logical message name (A1_..., B5_...)
- FieldName: full field path/name
- WordOffset: word index (0-based)
- BitOffset: start bit within word (LSB=0)
- Width: bit width
- Type: field type
- EnumType: enum name if present
- CodeLocation: module.Class where the message is defined
- DefinedIn: parent struct name where the field is declared (if applicable)
Usage:
python tools/export_message_fields_csv.py
Output: `message_fields_flat.csv` (tab-delimited)
"""
import sys
import os
import csv
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, ROOT)
sys.path.insert(0, os.path.join(ROOT, 'pybusmonitor1553'))
from Grifo_E_1553lib.messages.message_inspector import MessageInspector
def build_messages_map():
# Build same mapping used by inspect_all_messages
from Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters import MsgRdrSettingsAndParameters
from Grifo_E_1553lib.messages.msg_rdr_operation_command import MsgRdrOperationCommand
from Grifo_E_1553lib.messages.msg_nav_data_and_cursor import MsgNavDataAndCursor
from Grifo_E_1553lib.messages.msg_inu_high_speed import MsgInuHighSpeed
from Grifo_E_1553lib.messages.msg_graphic_setting import MsgGraphicSetting
from Grifo_E_1553lib.messages.msg1_data_link_target import Msg1DataLinkTarget
from Grifo_E_1553lib.messages.msg2_data_link_target import Msg2DataLinkTarget
from Grifo_E_1553lib.messages.tracked_target import TrackedTarget01, TrackedTarget02_10
from Grifo_E_1553lib.messages.msg_rdr_status_tellback import MsgRdrStatusTellback
from Grifo_E_1553lib.messages.msg_rdr_settings_and_parameters_tellback import MsgRdrSettingsAndParametersTellback
return {
'A1_MsgRdrSettingsAndParameters': MsgRdrSettingsAndParameters,
'A2_MsgRdrOperationCommand': MsgRdrOperationCommand,
'A3_MsgGraphicSetting': MsgGraphicSetting,
'A4_MsgNavDataAndCursor': MsgNavDataAndCursor,
'A5_MsgInuHighSpeed': MsgInuHighSpeed,
'A7_Msg1DataLinkTarget': Msg1DataLinkTarget,
'A8_Msg2DataLinkTarget': Msg2DataLinkTarget,
'B4_TrackedTarget02_10': TrackedTarget02_10,
'B5_TrackedTarget01': TrackedTarget01,
'B6_MsgRdrSettingsAndParametersTellback': MsgRdrSettingsAndParametersTellback,
'B7_MsgRdrStatusTellback': MsgRdrStatusTellback,
}
def export_csv(output_path='message_fields_flat.csv'):
messages = build_messages_map()
rows = []
for msg_name, msg_class in messages.items():
fields = MessageInspector.inspect_message(msg_class)
code_location = f"{msg_class.__module__}.{msg_class.__name__}"
for f in fields:
# Determine parent/defined-in from field name (if contains dot)
if '.' in f.name:
parent, field_simple = f.name.split('.', 1)
defined_in = parent
else:
field_simple = f.name
defined_in = msg_class.__name__
# computed values
bit_end = f.bit_offset + max(0, f.width - 1)
# Determine a display type: if this is a nested '.raw' field,
# prefer showing the parent field's type name (e.g. ACIdentifier)
display_type = f.field_type
if '.' in f.name:
parent = f.name.split('.', 1)[0]
# Try to find the parent field in the message class _fields_
try:
for fld_name, fld_type in getattr(msg_class, '_fields_', []):
if fld_name == parent:
# use the declared type's __name__ when possible
try:
display_type = fld_type.__name__
except Exception:
display_type = str(fld_type)
break
except Exception:
pass
# try optional attributes if present on FieldMetadata
lsb_value = ''
scale = ''
for attr in ('lsb_value', 'lsb', 'scale', 'scale_factor', 'multiplier'):
if hasattr(f, attr):
val = getattr(f, attr)
# prefer numeric formatting for floats
lsb_value = lsb_value or (str(val) if val is not None else '')
if attr in ('scale', 'scale_factor', 'multiplier'):
scale = scale or (str(val) if val is not None else '')
rows.append({
'Message': msg_name,
'FieldName': f.name,
'WordOffset': f.word_offset,
'BitOffset': f.bit_offset,
'BitEnd': bit_end,
'Width': f.width,
'Type': display_type,
'EnumType': f.enum_type or '',
'LsbValue': lsb_value,
'Scale': scale,
'CodeLocation': code_location,
'DefinedIn': defined_in,
})
# Write TSV-like CSV (tab-delimited)
fieldnames = ['Message', 'FieldName', 'WordOffset', 'BitOffset', 'BitEnd', 'Width', 'Type', 'EnumType', 'LsbValue', 'Scale', 'CodeLocation', 'DefinedIn']
with open(output_path, 'w', encoding='utf-8', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t')
writer.writeheader()
for r in rows:
writer.writerow(r)
print(f"Wrote {len(rows)} field rows to {output_path}")
def write_aligned_text(rows, fieldnames, outpath='message_fields_flat.txt'):
"""Write a readable fixed-width, aligned text file."""
# Prepare string values and compute column widths
str_rows = []
widths = {fn: len(fn) for fn in fieldnames}
for r in rows:
sr = {fn: ('' if r.get(fn) is None else str(r.get(fn))) for fn in fieldnames}
str_rows.append(sr)
for fn, val in sr.items():
widths[fn] = max(widths[fn], len(val))
# Build format string
fmt_parts = []
headers = []
for fn in fieldnames:
w = widths[fn]
fmt_parts.append(f"{{:{w}}}")
headers.append(fn.ljust(w))
fmt = ' '.join(fmt_parts)
with open(outpath, 'w', encoding='utf-8') as f:
f.write(fmt.format(*[fn for fn in fieldnames]) + "\n")
f.write(fmt.format(*['-' * widths[fn] for fn in fieldnames]) + "\n")
for sr in str_rows:
f.write(fmt.format(*[sr[fn] for fn in fieldnames]) + "\n")
print(f"Wrote aligned text file to {outpath}")
if __name__ == '__main__':
export_csv()
# also write a nicely aligned text file for human inspection
# read back CSV rows to reuse the same data
import csv as _csv
out_csv = 'message_fields_flat.csv'
fieldnames = ['Message', 'FieldName', 'WordOffset', 'BitOffset', 'BitEnd', 'Width', 'Type', 'EnumType', 'LsbValue', 'Scale', 'CodeLocation', 'DefinedIn']
rows = []
with open(out_csv, 'r', encoding='utf-8', newline='') as cf:
reader = _csv.DictReader(cf, delimiter='\t')
for r in reader:
rows.append(r)
write_aligned_text(rows, fieldnames, outpath='message_fields_flat.txt')