sistemati i campi numerici

This commit is contained in:
VALLONGOL 2026-01-09 13:18:07 +01:00
parent 15ebe4dae2
commit 518246cdc4
6 changed files with 283 additions and 5 deletions

View File

@ -0,0 +1,146 @@
# ARTOS: Airborne Radar Test & Orchestration System
## Technical Design Specification (v2.1)
### 1. Executive Summary
Il sistema **ARTOS** è un framework di orchestrazione per l'automazione dei test radar. L'architettura separa l'interfacciamento hardware (Moduli), la logica di analisi (Algoritmi Dinamici) e la sequenza operativa (Test Plans). Il cuore del sistema è il **TestContext**, un'interfaccia unificata che permette sia ai test che agli algoritmi di accedere a dati e comandi in tempo reale.
---
### 2. Architettura a Livelli (Layered Architecture)
1. **Level 0 - Hardware Abstraction Layer (HAL)**: Moduli `1553`, `SFP`, `TargetSim`.
2. **Level 1 - Orchestrator Core**: Gestisce il ciclo di vita dei moduli e la sincronizzazione.
3. **Level 2 - Dynamic Algorithm Library**: Tool di analisi che utilizzano il `TestContext` per elaborare dati cross-modulo.
4. **Level 3 - Test Execution Layer**: Script (JSON/Python) che orchestrano la chiamata ai moduli e l'esecuzione degli algoritmi.
---
### 3. Il TestContext: Il Motore di Integrazione
Il `TestContext` non è solo un contenitore, ma fornisce i metodi per l'interazione sicura con l'hardware e lo scambio di dati.
#### 3.1 Definizione del TestContext
```python
from typing import Any, Dict, Optional
class TestContext:
"""
Unified interface to access hardware modules and system state.
Passed to both Test Scripts and Dynamic Algorithms.
"""
def __init__(self, modules: Dict[str, Any]):
self._modules = modules
self.bus1553 = modules.get("bus1553")
self.video = modules.get("video")
self.target_sim = modules.get("target_sim")
self.results_cache: Dict[str, Any] = {}
def get_module_data(self, module_name: str) -> Any:
"""Returns the current data buffer or status from a specific module."""
module = self._modules.get(module_name)
if module:
# Assume modules implement a get_current_data method
return module.get_data_stream()
return None
def log_event(self, message: str, level: str = "INFO"):
"""Centralized logging for reports."""
print(f"[{level}] {message}")
```
---
### 4. Level 2: Dynamic Algorithm Library
Gli algoritmi sono ora concepiti come "Agenti di Analisi" che operano sul contesto.
#### 4.1 Interfaccia Algoritmo Aggiornata
```python
import abc
class BaseAlgorithm(abc.ABC):
"""
Interface for dynamic algorithms.
Algorithms use the context to pull data and perform cross-module validation.
"""
def __init__(self):
self.name = self.__class__.__name__
@abc.abstractmethod
def execute(self, context: TestContext, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Executes the logic.
:param context: Access to all HW modules and system data.
:param params: Specific parameters for this execution (e.g. thresholds).
:return: Analysis results with status (PASS/FAIL).
"""
pass
```
#### 4.2 Esempio di Algoritmo Cross-Modulo: `TargetValidationAlgo`
Questo algoritmo dimostra come accedere a due moduli diversi tramite il contesto.
```python
class TargetValidationAlgo(BaseAlgorithm):
"""
Verifies if a target injected via TargetSim appears on the 1553 Bus.
"""
def execute(self, context: TestContext, params: Dict[str, Any]) -> Dict[str, Any]:
# 1. Get injected target info from TargetSim
injected_targets = context.target_sim.get_active_targets()
# 2. Get current messages from 1553 Bus
bus_data = context.bus1553.get_message("B6") # Example: Radar Track Message
# 3. Perform comparison logic (Algorithm-specific)
match_found = self._compare_data(injected_targets, bus_data, params['tolerance'])
return {
"status": "PASS" if match_found else "FAIL",
"details": f"Target match status: {match_found}",
"timestamp": context.bus1553.get_system_timestamp()
}
def _compare_data(self, injected, bus, tolerance):
# Implementation of the mathematical comparison
return True
```
---
### 5. Level 3: Test Execution (JSON & Python)
#### 5.1 Test Plan (Logica Operativa)
L'utente scrive il test decidendo *cosa* fare e *quale* algoritmo di libreria usare per validare l'azione.
**Esempio di Test Script (Python):**
```python
def run_radar_track_test(context: TestContext):
# Step 1: Inject target
context.target_sim.inject_target(id=101, distance=50, azimuth=0)
# Step 2: Use an algorithm from the dynamic library to verify
# The TestManager handles the call passing the context
result = context.run_algorithm("TargetValidationAlgo", tolerance=0.5)
if result["status"] == "FAIL":
context.log_event("Target not detected by radar!", "ERROR")
```
---
### 6. Meccanismo di Data Flow e Visibilità
Per garantire che gli algoritmi abbiano i dati necessari, l'Orchestratore implementa le seguenti regole:
1. **Data Pull**: Gli algoritmi "pescano" (pull) i dati dai moduli hardware tramite il `TestContext`. I moduli devono quindi mantenere un buffer degli ultimi dati ricevuti.
2. **Shared State**: Il `TestContext` mantiene una `results_cache`. Se l'Algoritmo A produce un risultato, l'Algoritmo B può leggerlo dal contesto nello step successivo.
3. **Concurrency**: Mentre i test e gli algoritmi girano nel thread principale (o thread di esecuzione test), i moduli HW continuano a riempire i loro buffer in thread separati, garantendo che il `TestContext` veda sempre dati aggiornati.
---
### 7. Workflow per i Collaboratori
* **Sviluppatore Modulo (L0)**: Deve assicurarsi che il modulo esponga metodi "Getter" (es: `get_last_frame()`, `get_bus_state()`) accessibili dal `TestContext`.
* **Sviluppatore Algoritmo (L1)**: Riceve il `TestContext` e scrive la logica di calcolo. Non deve preoccuparsi di come i dati arrivano, solo di come elaborarli.
* **Sviluppatore Test (L2)**: Utilizza i comandi dei moduli per muovere il sistema e gli algoritmi per validare i requisiti.

Binary file not shown.

View File

@ -1,5 +1,5 @@
Function,Calls,Total Time,Avg Time,Defined In,Called From
start_session,1,0.001735,0.001735,pymsc.core.bus_1553_module,pymsc.core.mission_logic
update_all_1553_messages,1271,0.050627,0.000040,pymsc.core.message_definitions,pymsc.core.bus_1553_module
sync_all_messages,1271,0.369905,0.000291,pymsc.core.bus_1553_module,pymsc.core.mission_logic
_execute_sync_cycle,1271,0.621404,0.000489,pymsc.core.mission_logic,pymsc.core.mission_logic
start_session,1,0.001690,0.001690,pymsc.core.bus_1553_module,pymsc.core.mission_logic
update_all_1553_messages,21138,0.588113,0.000028,pymsc.core.message_definitions,pymsc.core.bus_1553_module
sync_all_messages,21138,5.771698,0.000273,pymsc.core.bus_1553_module,pymsc.core.mission_logic
_execute_sync_cycle,21138,10.108194,0.000478,pymsc.core.mission_logic,pymsc.core.mission_logic

1 Function Calls Total Time Avg Time Defined In Called From
2 start_session 1 0.001735 0.001690 0.001735 0.001690 pymsc.core.bus_1553_module pymsc.core.mission_logic
3 update_all_1553_messages 1271 21138 0.050627 0.588113 0.000040 0.000028 pymsc.core.message_definitions pymsc.core.bus_1553_module
4 sync_all_messages 1271 21138 0.369905 5.771698 0.000291 0.000273 pymsc.core.bus_1553_module pymsc.core.mission_logic
5 _execute_sync_cycle 1271 21138 0.621404 10.108194 0.000489 0.000478 pymsc.core.mission_logic pymsc.core.mission_logic

View File

@ -17,6 +17,22 @@ class CursorXDisplayCoordAndQual(ctypes.LittleEndianStructure):
("current_x_display_coord", ctypes.c_uint16, 9)
]
def get_current_x_display_coord(self):
"""Extract the 9-bit X display coordinate"""
return self.current_x_display_coord
def get_cursor_snowplough_tellback(self):
"""Extract cursor snowplough command tellback bit"""
return self.cursor_snowplough_command_tellback
def get_cursor_zero_tellback(self):
"""Extract cursor zero tellback bit"""
return self.cursor_zero_tellback
def get_cursor_normal_slave_tellback(self):
"""Extract cursor normal/slave selector tellback bit"""
return self.cursor_normal_slave_selector_tellback
class CursorYDisplayCoord(ctypes.LittleEndianStructure):
_pack_ = 1
@ -25,6 +41,10 @@ class CursorYDisplayCoord(ctypes.LittleEndianStructure):
("current_y_display_coord", ctypes.c_uint16, 9)
]
def get_current_y_display_coord(self):
"""Extract the 9-bit Y display coordinate"""
return self.current_y_display_coord
class _CursorPositionLatitudeStr(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [

View File

@ -26,3 +26,27 @@ class WindSpeed(ctypes.Union):
("raw", ctypes.c_int16),
("str", _WindSpeedStr)
]
def get_wind_velocity_amplitude(self):
"""Extract the 13-bit wind velocity amplitude field"""
return self.str.wind_velocity_amplitude
def set_wind_velocity_amplitude(self, value):
"""Set the 13-bit wind velocity amplitude field"""
self.str.wind_velocity_amplitude = int(value) & 0x1FFF
def get(self):
"""Get wind velocity amplitude (for compatibility with other Union types)"""
return self.str.wind_velocity_amplitude
def get_wind_velocity_amplitude(self):
"""Extract the 13-bit wind velocity amplitude field"""
return self.str.wind_velocity_amplitude
def set_wind_velocity_amplitude(self, value):
"""Set the 13-bit wind velocity amplitude field"""
self.str.wind_velocity_amplitude = int(value) & 0x1FFF
def get(self):
"""Get wind velocity amplitude (for compatibility with other Union types)"""
return self.str.wind_velocity_amplitude

View File

@ -199,6 +199,7 @@ class CommandFrameSpinBox(BaseCommandFrame):
super().__init__(parent, command_info, **kwargs)
WIDGET_MAP[self.info['label']] = self
self.is_programmatic_change = False
self.user_editing = False # Track if user is actively editing
self._create_ui()
def _create_ui(self):
@ -222,6 +223,13 @@ class CommandFrameSpinBox(BaseCommandFrame):
self.spin.grid(row=0, column=1)
self.spin.bind("<Return>", self.on_change)
self.spin.bind("<FocusOut>", self.on_change)
# Track when user starts editing
self.spin.bind("<FocusIn>", self._on_focus_in)
self.spin.bind("<KeyPress>", self._on_key_press)
# Detect arrow button clicks - send value immediately after arrow click
self.spin.bind("<ButtonRelease-1>", self._on_arrow_click)
# Also use variable trace to detect arrow changes
self.cmd_var.trace_add("write", self._on_var_change)
if self.info.get('message_tb'):
self.tb_var = tk.StringVar(value="0")
@ -240,6 +248,42 @@ class CommandFrameSpinBox(BaseCommandFrame):
except ValueError:
return False
def _on_focus_in(self, event=None):
"""Called when spinbox receives focus - user is about to edit"""
self.user_editing = True
def _on_key_press(self, event=None):
"""Called when user types - mark as editing"""
self.user_editing = True
def _on_arrow_click(self, event=None):
"""Called when user clicks spinbox arrows - send value immediately"""
# Small delay to ensure the value has been updated by the spinbox
self.after(50, self._send_arrow_value)
def _on_var_change(self, *args):
"""Called when spinbox value changes (from arrows or typing)"""
# If we're not programmatically changing and not already sending
if not self.is_programmatic_change and not hasattr(self, '_sending_arrow_value'):
# User changed value, mark as editing
self.user_editing = True
def _send_arrow_value(self):
"""Send the value after arrow click"""
if self.is_programmatic_change:
return
self._sending_arrow_value = True
try:
val = float(self.cmd_var.get())
raw_val = set_correct_value(self.info, -1, val)
self.info['message'].set_value_for_field(self.info['field'], raw_val)
# Clear editing flag after sending
self.user_editing = False
except (tk.TclError, ValueError):
pass
finally:
delattr(self, '_sending_arrow_value')
def on_change(self, event=None):
if self.is_programmatic_change:
return
@ -250,11 +294,14 @@ class CommandFrameSpinBox(BaseCommandFrame):
try:
val = float(self.cmd_var.get())
except tk.TclError:
self.user_editing = False # Clear flag even on error
return
raw_val = set_correct_value(self.info, -1, val)
self.info['message'].set_value_for_field(self.info['field'], raw_val)
if self.script_manager.is_recording:
self.script_manager.write_command(self.info['label'], "set_value", val)
# Clear editing flag after successful change
self.user_editing = False
if self.info.get('message_tb'):
self.monitor_tellback()
@ -275,6 +322,10 @@ class CommandFrameSpinBox(BaseCommandFrame):
self.after(self.update_interval_ms, self.monitor_tellback)
def check_updated_value(self):
# Skip if user is actively editing this field
if self.user_editing:
return
# Skip if message is None (disabled field)
if not self.info.get('message'):
return
@ -350,6 +401,7 @@ class CommandFrameControls(tk.Frame):
self.column_widths = [20, 14, 14, 14]
self.vars = {}
self.is_programmatic_change = False
self.editing_fields = {} # Track which fields are being edited
WIDGET_MAP[self.info['label']] = self
self._create_ui()
@ -396,6 +448,7 @@ class CommandFrameControls(tk.Frame):
number_format = self.info.get(f"number_format{i}", "float")
is_int = number_format == "integer"
self.vars[field] = tk.IntVar() if is_int else tk.DoubleVar()
self.editing_fields[field] = False # Initialize editing state
fmt = '%.0f' if is_int else '%.4f'
min_v = self.info.get(f"min_value{i}", -999999)
max_v = self.info.get(f"max_value{i}", 999999)
@ -407,6 +460,12 @@ class CommandFrameControls(tk.Frame):
)
sp.grid(row=0, column=i)
sp.bind("<Return>", lambda e, f=field, m=msg, idx=i: self._on_spin(f, m, idx))
sp.bind("<FocusOut>", lambda e, f=field, m=msg, idx=i: self._on_spin(f, m, idx))
# Track editing state
sp.bind("<FocusIn>", lambda e, f=field: self._set_editing(f, True))
sp.bind("<KeyPress>", lambda e, f=field: self._set_editing(f, True))
# Detect arrow button clicks
sp.bind("<ButtonRelease-1>", lambda e, f=field, m=msg, idx=i: self._on_spin_arrow(f, m, idx))
WIDGET_MAP[field] = {"widget": sp, "var": self.vars[field]}
elif ctrl_type == "label":
@ -421,6 +480,10 @@ class CommandFrameControls(tk.Frame):
lbl = tk.Label(self, text="", width=self.column_widths[i])
lbl.grid(row=0, column=i)
def _set_editing(self, field: str, is_editing: bool):
"""Track editing state for a specific field"""
self.editing_fields[field] = is_editing
def _on_check(self, field: str, msg: Any):
val = 1 if self.vars[field].get() else 0
msg.set_value_for_field(field, val)
@ -444,11 +507,31 @@ class CommandFrameControls(tk.Frame):
val = float(self.vars[field].get())
raw = set_correct_value(self.info, idx, val)
msg.set_value_for_field(field, raw)
# Clear editing flag after change
self.editing_fields[field] = False
from pymsc.gui.script_manager import get_script_manager
sm = get_script_manager()
if sm.is_recording:
sm.write_command(field, "set_value", val)
def _on_spin_arrow(self, field: str, msg: Any, idx: int):
"""Handle spinbox arrow button clicks - send value immediately with small delay"""
# Small delay to ensure the value has been updated
self.after(50, lambda: self._send_spin_arrow_value(field, msg, idx))
def _send_spin_arrow_value(self, field: str, msg: Any, idx: int):
"""Send spinbox value after arrow click"""
if self.is_programmatic_change:
return
try:
val = float(self.vars[field].get())
raw = set_correct_value(self.info, idx, val)
msg.set_value_for_field(field, raw)
# Clear editing flag
self.editing_fields[field] = False
except (ValueError, tk.TclError):
pass
def check_updated_value(self):
"""
Syncs sub-controls with their corresponding 1553 message fields.
@ -463,6 +546,11 @@ class CommandFrameControls(tk.Frame):
msg = self.info.get(f"message{i}")
if not msg or not field:
continue
# Skip spinbox refresh if user is actively editing
if ctrl_type == "spinbox" and self.editing_fields.get(field, False):
continue
raw = msg.get_value_for_field(field)
readable = get_correct_value(self.info, i, raw)
if ctrl_type == "checkbox":