SXXXXXXX_PyBusMonitor1553/tools/generate_1553_map.py

223 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Generate an XML map of all A/B 1553 messages and their fields.
Usage: python tools/generate_1553_map.py
Outputs: doc/1553_messages_map.xml
The script imports modules under `Grifo_E_1553lib.messages` and
`Grifo_E_1553lib.data_types`, parses `messages.py` to discover which
payload class is used for each A/B message, then introspects ctypes
structures/unions to enumerate fields, sizes and bit-widths.
"""
import os
import sys
import re
import importlib
import inspect
import ctypes
import xml.etree.ElementTree as ET
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
PY_PACKAGE_ROOT = REPO_ROOT / 'pybusmonitor1553'
GRIFO_PKG = 'Grifo_E_1553lib'
def ensure_sys_path():
# Add pybusmonitor1553 directory so imports like `from Grifo_E_1553lib...` work
pkg_path = str(PY_PACKAGE_ROOT)
if pkg_path not in sys.path:
sys.path.insert(0, pkg_path)
def list_py_files(subpkg):
d = PY_PACKAGE_ROOT / GRIFO_PKG / subpkg
for p in sorted(d.glob('*.py')):
if p.name == '__init__.py':
continue
yield p
def import_modules(subpkg):
modules = {}
for p in list_py_files(subpkg):
modname = f"{GRIFO_PKG}.{subpkg}.{p.stem}"
try:
m = importlib.import_module(modname)
modules[modname] = m
except Exception as e:
print(f"Warning: failed to import {modname}: {e}")
return modules
def build_type_lookup():
"""Import messages and data_types modules and collect classes by name."""
ensure_sys_path()
msgs = import_modules('messages')
dts = import_modules('data_types')
lookup = {}
for m in list(msgs.values()) + list(dts.values()):
for name, obj in inspect.getmembers(m, inspect.isclass):
# record user-defined ctypes types and classes
if getattr(obj, '__module__', '').startswith(GRIFO_PKG):
lookup[name] = obj
return lookup
def parse_messages_py(messages_py_path):
text = messages_py_path.read_text(encoding='utf-8')
classes = {}
# find class blocks
class_iter = re.finditer(r'^class\s+(?P<name>[A-Z]\w*)\b.*?:', text, flags=re.M)
positions = []
for m in class_iter:
positions.append((m.start(), m.group('name')))
positions.append((len(text), None))
for i in range(len(positions)-1):
start, cname = positions[i]
end = positions[i+1][0]
block = text[start:end]
# Simpler: search for literal pattern 'super().__init__(' and capture args
m3 = re.search(r"super\(\w*,?\s*\w*\)\.\s*__init__\s*\((?P<args>[^\)]*)\)", block)
m4 = re.search(r"super\(\)\.\s*__init__\s*\((?P<args>[^\)]*)\)", block)
args = None
for mm in (m3, m4):
if mm:
args = mm.group('args')
break
if args:
# split args by comma
parts = [p.strip() for p in args.split(',')]
if len(parts) >= 2:
msgclass = parts[0]
subaddr = parts[1]
request = 'False'
if len(parts) >= 3:
request = parts[2]
classes[cname] = dict(message_class=msgclass, subaddress=subaddr, request=request)
return classes
def describe_ctype(ctype_obj, lookup, seen=None):
"""Return a dict describing the ctypes type (fields, sizes)."""
if seen is None:
seen = set()
desc = {}
desc['type'] = getattr(ctype_obj, '__name__', str(ctype_obj))
try:
desc['size_bytes'] = ctypes.sizeof(ctype_obj)
except Exception:
desc['size_bytes'] = None
# avoid infinite recursion
if getattr(ctype_obj, '__name__', None) in seen:
desc['recursive'] = True
return desc
if inspect.isclass(ctype_obj) and issubclass(ctype_obj, (ctypes.Structure, ctypes.Union)):
seen.add(ctype_obj.__name__)
fields = []
for f in getattr(ctype_obj, '_fields_', []) or []:
# f can be (name, ctype) or (name, ctype, bits)
if len(f) == 2:
fname, ftype = f
fbits = None
else:
fname, ftype, fbits = f
fdesc = {'name': fname}
# primitive ctypes
if isinstance(ftype, str):
fdesc['type'] = ftype
fdesc['size_bytes'] = None
else:
# if ftype is class object, describe recursively where possible
try:
fdesc['type'] = getattr(ftype, '__name__', str(ftype))
except Exception:
fdesc['type'] = str(ftype)
try:
fdesc['size_bytes'] = ctypes.sizeof(ftype)
except Exception:
fdesc['size_bytes'] = None
# if bitfield
if fbits is not None:
fdesc['bits'] = int(fbits)
# nested
if inspect.isclass(ftype) and issubclass(ftype, (ctypes.Structure, ctypes.Union)):
fdesc['nested'] = describe_ctype(ftype, lookup, seen)
fields.append(fdesc)
desc['fields'] = fields
return desc
def build_xml(mapping, type_lookup, outpath):
root = ET.Element('messages')
for clsname, info in mapping.items():
msg_el = ET.SubElement(root, 'message', name=clsname)
msg_el.set('subaddress', info.get('subaddress', ''))
if 'request' in info:
msg_el.set('request', info['request'])
msgclass = info.get('message_class')
ET.SubElement(msg_el, 'payload_class').text = msgclass
# find actual type object
typename = msgclass.split('.')[-1]
t = type_lookup.get(typename)
if t is None:
ET.SubElement(msg_el, 'error').text = f'Payload class {typename} not found'
continue
desc = describe_ctype(t, type_lookup)
def add_type(parent, d):
te = ET.SubElement(parent, 'type', name=d.get('type',''))
if d.get('size_bytes') is not None:
te.set('size_bytes', str(d['size_bytes']))
for f in d.get('fields', []):
fe = ET.SubElement(te, 'field', name=f.get('name',''))
fe.set('type', f.get('type',''))
if f.get('size_bytes') is not None:
fe.set('size_bytes', str(f['size_bytes']))
if 'bits' in f:
fe.set('bits', str(f['bits']))
if 'nested' in f:
add_type(fe, f['nested'])
add_type(msg_el, desc)
tree = ET.ElementTree(root)
tree.write(outpath, encoding='utf-8', xml_declaration=True)
print(f'Wrote {outpath}')
def main():
ensure_sys_path()
repo = REPO_ROOT
messages_py = PY_PACKAGE_ROOT / GRIFO_PKG / 'messages' / 'messages.py'
if not messages_py.exists():
print('messages.py not found:', messages_py)
return
mapping = parse_messages_py(messages_py)
# Ensure all A1..A8 and B1..B18 are present in the mapping.
# Some messages may be not implemented as classes; create placeholders
# so the XML includes every expected message identifier.
def ensure_all_ab(msg_map):
# A messages: A1..A8 -> subaddress = n
for n in range(1, 9):
name = f'A{n}'
if name not in msg_map:
msg_map[name] = dict(message_class='', subaddress=str(n), request='False')
# B messages: B1..B18 -> subaddress = n + 10 (11..28)
for n in range(1, 19):
name = f'B{n}'
if name not in msg_map:
sub = str(n + 10)
msg_map[name] = dict(message_class='', subaddress=sub, request='True')
ensure_all_ab(mapping)
# build type lookup
type_lookup = build_type_lookup()
outdir = repo / 'doc'
outdir.mkdir(exist_ok=True)
outpath = outdir / '1553_messages_map.xml'
build_xml(mapping, type_lookup, str(outpath))
if __name__ == '__main__':
main()