223 lines
7.9 KiB
Python
223 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Generate an XML map of all A/B 1553 messages and their fields.
|
|
|
|
Usage: python tools/generate_1553_map.py
|
|
Outputs: doc/1553_messages_map.xml
|
|
|
|
The script imports modules under `Grifo_E_1553lib.messages` and
|
|
`Grifo_E_1553lib.data_types`, parses `messages.py` to discover which
|
|
payload class is used for each A/B message, then introspects ctypes
|
|
structures/unions to enumerate fields, sizes and bit-widths.
|
|
"""
|
|
import os
|
|
import sys
|
|
import re
|
|
import importlib
|
|
import inspect
|
|
import ctypes
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
PY_PACKAGE_ROOT = REPO_ROOT / 'pybusmonitor1553'
|
|
GRIFO_PKG = 'Grifo_E_1553lib'
|
|
|
|
|
|
def ensure_sys_path():
|
|
# Add pybusmonitor1553 directory so imports like `from Grifo_E_1553lib...` work
|
|
pkg_path = str(PY_PACKAGE_ROOT)
|
|
if pkg_path not in sys.path:
|
|
sys.path.insert(0, pkg_path)
|
|
|
|
|
|
def list_py_files(subpkg):
|
|
d = PY_PACKAGE_ROOT / GRIFO_PKG / subpkg
|
|
for p in sorted(d.glob('*.py')):
|
|
if p.name == '__init__.py':
|
|
continue
|
|
yield p
|
|
|
|
|
|
def import_modules(subpkg):
|
|
modules = {}
|
|
for p in list_py_files(subpkg):
|
|
modname = f"{GRIFO_PKG}.{subpkg}.{p.stem}"
|
|
try:
|
|
m = importlib.import_module(modname)
|
|
modules[modname] = m
|
|
except Exception as e:
|
|
print(f"Warning: failed to import {modname}: {e}")
|
|
return modules
|
|
|
|
|
|
def build_type_lookup():
|
|
"""Import messages and data_types modules and collect classes by name."""
|
|
ensure_sys_path()
|
|
msgs = import_modules('messages')
|
|
dts = import_modules('data_types')
|
|
lookup = {}
|
|
for m in list(msgs.values()) + list(dts.values()):
|
|
for name, obj in inspect.getmembers(m, inspect.isclass):
|
|
# record user-defined ctypes types and classes
|
|
if getattr(obj, '__module__', '').startswith(GRIFO_PKG):
|
|
lookup[name] = obj
|
|
return lookup
|
|
|
|
|
|
def parse_messages_py(messages_py_path):
|
|
text = messages_py_path.read_text(encoding='utf-8')
|
|
classes = {}
|
|
# find class blocks
|
|
class_iter = re.finditer(r'^class\s+(?P<name>[A-Z]\w*)\b.*?:', text, flags=re.M)
|
|
positions = []
|
|
for m in class_iter:
|
|
positions.append((m.start(), m.group('name')))
|
|
positions.append((len(text), None))
|
|
for i in range(len(positions)-1):
|
|
start, cname = positions[i]
|
|
end = positions[i+1][0]
|
|
block = text[start:end]
|
|
# Simpler: search for literal pattern 'super().__init__(' and capture args
|
|
m3 = re.search(r"super\(\w*,?\s*\w*\)\.\s*__init__\s*\((?P<args>[^\)]*)\)", block)
|
|
m4 = re.search(r"super\(\)\.\s*__init__\s*\((?P<args>[^\)]*)\)", block)
|
|
args = None
|
|
for mm in (m3, m4):
|
|
if mm:
|
|
args = mm.group('args')
|
|
break
|
|
if args:
|
|
# split args by comma
|
|
parts = [p.strip() for p in args.split(',')]
|
|
if len(parts) >= 2:
|
|
msgclass = parts[0]
|
|
subaddr = parts[1]
|
|
request = 'False'
|
|
if len(parts) >= 3:
|
|
request = parts[2]
|
|
classes[cname] = dict(message_class=msgclass, subaddress=subaddr, request=request)
|
|
return classes
|
|
|
|
|
|
def describe_ctype(ctype_obj, lookup, seen=None):
|
|
"""Return a dict describing the ctypes type (fields, sizes)."""
|
|
if seen is None:
|
|
seen = set()
|
|
desc = {}
|
|
desc['type'] = getattr(ctype_obj, '__name__', str(ctype_obj))
|
|
try:
|
|
desc['size_bytes'] = ctypes.sizeof(ctype_obj)
|
|
except Exception:
|
|
desc['size_bytes'] = None
|
|
# avoid infinite recursion
|
|
if getattr(ctype_obj, '__name__', None) in seen:
|
|
desc['recursive'] = True
|
|
return desc
|
|
if inspect.isclass(ctype_obj) and issubclass(ctype_obj, (ctypes.Structure, ctypes.Union)):
|
|
seen.add(ctype_obj.__name__)
|
|
fields = []
|
|
for f in getattr(ctype_obj, '_fields_', []) or []:
|
|
# f can be (name, ctype) or (name, ctype, bits)
|
|
if len(f) == 2:
|
|
fname, ftype = f
|
|
fbits = None
|
|
else:
|
|
fname, ftype, fbits = f
|
|
fdesc = {'name': fname}
|
|
# primitive ctypes
|
|
if isinstance(ftype, str):
|
|
fdesc['type'] = ftype
|
|
fdesc['size_bytes'] = None
|
|
else:
|
|
# if ftype is class object, describe recursively where possible
|
|
try:
|
|
fdesc['type'] = getattr(ftype, '__name__', str(ftype))
|
|
except Exception:
|
|
fdesc['type'] = str(ftype)
|
|
try:
|
|
fdesc['size_bytes'] = ctypes.sizeof(ftype)
|
|
except Exception:
|
|
fdesc['size_bytes'] = None
|
|
# if bitfield
|
|
if fbits is not None:
|
|
fdesc['bits'] = int(fbits)
|
|
# nested
|
|
if inspect.isclass(ftype) and issubclass(ftype, (ctypes.Structure, ctypes.Union)):
|
|
fdesc['nested'] = describe_ctype(ftype, lookup, seen)
|
|
fields.append(fdesc)
|
|
desc['fields'] = fields
|
|
return desc
|
|
|
|
|
|
def build_xml(mapping, type_lookup, outpath):
|
|
root = ET.Element('messages')
|
|
for clsname, info in mapping.items():
|
|
msg_el = ET.SubElement(root, 'message', name=clsname)
|
|
msg_el.set('subaddress', info.get('subaddress', ''))
|
|
if 'request' in info:
|
|
msg_el.set('request', info['request'])
|
|
msgclass = info.get('message_class')
|
|
ET.SubElement(msg_el, 'payload_class').text = msgclass
|
|
# find actual type object
|
|
typename = msgclass.split('.')[-1]
|
|
t = type_lookup.get(typename)
|
|
if t is None:
|
|
ET.SubElement(msg_el, 'error').text = f'Payload class {typename} not found'
|
|
continue
|
|
desc = describe_ctype(t, type_lookup)
|
|
def add_type(parent, d):
|
|
te = ET.SubElement(parent, 'type', name=d.get('type',''))
|
|
if d.get('size_bytes') is not None:
|
|
te.set('size_bytes', str(d['size_bytes']))
|
|
for f in d.get('fields', []):
|
|
fe = ET.SubElement(te, 'field', name=f.get('name',''))
|
|
fe.set('type', f.get('type',''))
|
|
if f.get('size_bytes') is not None:
|
|
fe.set('size_bytes', str(f['size_bytes']))
|
|
if 'bits' in f:
|
|
fe.set('bits', str(f['bits']))
|
|
if 'nested' in f:
|
|
add_type(fe, f['nested'])
|
|
add_type(msg_el, desc)
|
|
|
|
tree = ET.ElementTree(root)
|
|
tree.write(outpath, encoding='utf-8', xml_declaration=True)
|
|
print(f'Wrote {outpath}')
|
|
|
|
|
|
def main():
|
|
ensure_sys_path()
|
|
repo = REPO_ROOT
|
|
messages_py = PY_PACKAGE_ROOT / GRIFO_PKG / 'messages' / 'messages.py'
|
|
if not messages_py.exists():
|
|
print('messages.py not found:', messages_py)
|
|
return
|
|
mapping = parse_messages_py(messages_py)
|
|
# Ensure all A1..A8 and B1..B18 are present in the mapping.
|
|
# Some messages may be not implemented as classes; create placeholders
|
|
# so the XML includes every expected message identifier.
|
|
def ensure_all_ab(msg_map):
|
|
# A messages: A1..A8 -> subaddress = n
|
|
for n in range(1, 9):
|
|
name = f'A{n}'
|
|
if name not in msg_map:
|
|
msg_map[name] = dict(message_class='', subaddress=str(n), request='False')
|
|
# B messages: B1..B18 -> subaddress = n + 10 (11..28)
|
|
for n in range(1, 19):
|
|
name = f'B{n}'
|
|
if name not in msg_map:
|
|
sub = str(n + 10)
|
|
msg_map[name] = dict(message_class='', subaddress=sub, request='True')
|
|
ensure_all_ab(mapping)
|
|
# build type lookup
|
|
type_lookup = build_type_lookup()
|
|
outdir = repo / 'doc'
|
|
outdir.mkdir(exist_ok=True)
|
|
outpath = outdir / '1553_messages_map.xml'
|
|
build_xml(mapping, type_lookup, str(outpath))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|