121 lines
4.1 KiB
Python
121 lines
4.1 KiB
Python
import re
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
icd_hdr = ROOT / 'cpp' / 'GrifoScope' / 'GrifoSdkEif' / 'pub' / 'TH' / 'th_b1553_icd_messages_id.h'
|
|
msgs_dir = ROOT / 'pybusmonitor1553' / 'Grifo_E_1553lib' / 'messages'
|
|
|
|
def parse_icd(path):
|
|
text = path.read_text(encoding='utf-8')
|
|
sa = {}
|
|
ln = {}
|
|
m = re.search(r'enum message_subaddress_enum_t\s*\{([^}]*)\}', text, re.S)
|
|
if m:
|
|
body = m.group(1)
|
|
for line in body.splitlines():
|
|
line = line.strip().rstrip(',')
|
|
if not line or line.startswith('//'):
|
|
continue
|
|
parts = line.split('=')
|
|
if len(parts) == 2:
|
|
name = parts[0].strip()
|
|
rhs = parts[1].strip()
|
|
try:
|
|
val = int(rhs)
|
|
except Exception:
|
|
try:
|
|
# allow references to previously parsed names or simple expressions
|
|
val = int(eval(rhs, {}, sa))
|
|
except Exception:
|
|
# fallback: store rhs as-is
|
|
val = rhs
|
|
sa[name] = val
|
|
else:
|
|
# auto-incremented enum value
|
|
name = parts[0].strip()
|
|
if sa:
|
|
last = max(sa.values())
|
|
sa[name] = last + 1
|
|
else:
|
|
sa[name] = 0
|
|
|
|
m2 = re.search(r'enum message_len_enum_t\s*\{([^}]*)\}', text, re.S)
|
|
if m2:
|
|
body = m2.group(1)
|
|
for line in body.splitlines():
|
|
line = line.strip().rstrip(',')
|
|
if not line or line.startswith('//'):
|
|
continue
|
|
parts = line.split('=')
|
|
if len(parts) == 2:
|
|
name = parts[0].strip()
|
|
try:
|
|
val = eval(parts[1].strip(), {})
|
|
except Exception:
|
|
val = parts[1].strip()
|
|
ln[name] = val
|
|
return sa, ln
|
|
|
|
def list_python_msgs(path):
|
|
files = sorted([p.name for p in path.glob('msg_*.py')])
|
|
return files
|
|
|
|
def main():
|
|
sa, ln = parse_icd(icd_hdr)
|
|
py = list_python_msgs(msgs_dir)
|
|
print('ICD SA entries:')
|
|
for k,v in sorted(sa.items(), key=lambda x:x[1]):
|
|
print(f' {k} = {v}')
|
|
print('\nICD LEN entries:')
|
|
for k,v in sorted(ln.items()):
|
|
print(f' {k} = {v}')
|
|
print('\nPython message files:')
|
|
for f in py:
|
|
print(' ', f)
|
|
|
|
# simple membership check for main A/B messages
|
|
print('\nSimple mapping check (fuzzy token match):')
|
|
def fuzzy_match(sa_key):
|
|
parts = sa_key.split('_', 2)
|
|
if len(parts) < 3:
|
|
return []
|
|
suffix = parts[2]
|
|
tokens = [t for t in re.split('[_ ]+', suffix.lower()) if t]
|
|
scored = []
|
|
for f in py:
|
|
fname = f.lower()
|
|
score = sum(1 for tok in tokens if tok and tok in fname)
|
|
if score > 0:
|
|
scored.append((score, f))
|
|
scored.sort(reverse=True)
|
|
return [f for s,f in scored]
|
|
|
|
mapping = []
|
|
for sa_key, sa_val in sa.items():
|
|
found = fuzzy_match(sa_key)
|
|
if found:
|
|
print(f' {sa_key}: {found}')
|
|
len_key = None
|
|
# try to find corresponding LEN_* entry
|
|
suffix = sa_key.replace('SA_', 'LEN_')
|
|
if suffix in ln:
|
|
len_key = suffix
|
|
else:
|
|
# fallback: search LEN keys that contain token from sa_key
|
|
for lk in ln:
|
|
if sa_key.split('_',2)[-1].split('_')[0].lower() in lk.lower():
|
|
len_key = lk
|
|
break
|
|
mapping.append((sa_key, sa_val, len_key, ln.get(len_key), ';'.join(found)))
|
|
|
|
# write CSV mapping
|
|
out = Path(__file__).resolve().parents[2] / 'tests' / 'tools' / 'icd_mapping.csv'
|
|
with out.open('w', encoding='utf-8') as fh:
|
|
fh.write('sa_key,sa_value,len_key,len_value,python_matches\n')
|
|
for row in mapping:
|
|
fh.write(','.join([str(x) if x is not None else '' for x in row]) + '\n')
|
|
print('\nWrote mapping to', out)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|