aggiunto la schermata di configurazione del logger

This commit is contained in:
VALLONGOL 2025-10-28 09:43:23 +01:00
parent 6fb0aca8f9
commit 3bfa5edf88
15 changed files with 527 additions and 339 deletions

106
README.md
View File

@ -48,3 +48,109 @@ Where to look in the code
If you want the debug UI to exactly mimic runtime behaviour, uncheck the
Active/Traceable/Restart checkboxes before sending, or use the runtime APIs to
send `tgtset` without qualifiers.
## Debug logging (attivazione localizzata)
Il progetto utilizza il modulo `logging` di Python con logger a livello di
modulo (per esempio `target_simulator.analysis.simulation_state_hub`). In
diversi punti del codice è stato standardizzato l'uso di `logger =
logging.getLogger(__name__)` e aggiunto un helper `temporary_log_level`
in `target_simulator/utils/logger.py` per attivare temporaneamente livelli di
log più verbosi.
Di seguito alcuni esempi rapidi per attivare/disattivare il DEBUG solo per la
parte del codice che ti interessa analizzare.
### Attivare DEBUG per un modulo (a runtime)
Apri una shell Python o esegui lo snippet nel tuo script/main e imposta il
livello del logger del modulo che vuoi investigare.
Esempio — abilitare DEBUG solo per lo state hub:
```python
import logging
logging.getLogger('target_simulator.analysis.simulation_state_hub').setLevel(logging.DEBUG)
```
Per tornare a INFO (o disattivare DEBUG):
```python
logging.getLogger('target_simulator.analysis.simulation_state_hub').setLevel(logging.INFO)
```
I nomi dei logger corrispondono ai path dei moduli. Esempi utili:
- `target_simulator.analysis.simulation_state_hub`
- `target_simulator.gui.sfp_debug_window`
- `target_simulator.gui.ppi_display`
- `target_simulator.gui.payload_router`
- `target_simulator.core.sfp_transport`
### Attivare DEBUG solo per un blocco di codice (temporaneo)
Usa il context manager `temporary_log_level` fornito in
`target_simulator.utils.logger`:
```python
from logging import getLogger
from target_simulator.utils.logger import temporary_log_level
mod_logger = getLogger('target_simulator.gui.payload_router')
with temporary_log_level(mod_logger, logging.DEBUG):
# dentro questo blocco il logger è DEBUG
router.process_some_payload(...) # esempio
# All'uscita il livello viene ripristinato automaticamente
```
### Debug ancora più granulare — child logger e filtri
Per messaggi molto verbosi puoi creare child logger o usare `extra` + `Filter`:
Child logger:
```python
base = logging.getLogger('target_simulator.analysis.simulation_state_hub')
diag = base.getChild('diagnostics') # 'target_simulator.analysis.simulation_state_hub.diagnostics'
diag.setLevel(logging.DEBUG)
diag.debug("dettagli diagnostici: ...")
```
Esempio di filtro basato su `extra`:
```python
class ExtraKeyFilter(logging.Filter):
def __init__(self, key, allowed_values):
super().__init__()
self.key = key
self.allowed = set(allowed_values)
def filter(self, record):
return getattr(record, self.key, None) in self.allowed
handler = logging.StreamHandler()
handler.addFilter(ExtraKeyFilter('topic', ['diagnostic']))
logging.getLogger('target_simulator').addHandler(handler)
# Emetti:
logging.getLogger('target_simulator.analysis.simulation_state_hub').debug("...", extra={'topic':'diagnostic'})
```
### Esecuzione rapida da PowerShell
Esempio per aprire REPL con PYTHONPATH corretto (Windows PowerShell):
```powershell
$env:PYTHONPATH='C:\src\____GitProjects\target_simulator'
python
# dentro REPL eseguire i comandi logging mostrati sopra
```
Oppure eseguire un singolo comando:
```powershell
$env:PYTHONPATH='C:\src\____GitProjects\target_simulator'; python -c "import logging; logging.getLogger('target_simulator.gui.payload_router').setLevel(logging.DEBUG); print('DEBUG enabled')"
```
Se vuoi, possiamo aggiungere un piccolo pannello nella GUI per controllare i
livelli dei logger a runtime; fammi sapere se preferisci questa opzione.

View File

@ -1,268 +1 @@
{
"scenario1": {
"name": "scenario1",
"targets": [
{
"target_id": 0,
"active": true,
"traceable": true,
"trajectory": [
{
"maneuver_type": "Fly to Point",
"duration_s": 10.0,
"target_altitude_ft": 10000.0,
"target_range_nm": 20.0,
"target_azimuth_deg": 0.0
},
{
"maneuver_type": "Fly for Duration",
"target_velocity_fps": 1670.9318999999994,
"target_heading_deg": 10.0,
"duration_s": 300.0,
"target_altitude_ft": 10000.0
},
{
"maneuver_type": "Fly to Point",
"duration_s": 400.0,
"target_altitude_ft": 10000.0,
"target_range_nm": 25.0,
"target_azimuth_deg": -20.0
}
]
}
]
},
"scenario2": {
"name": "scenario2",
"targets": [
{
"target_id": 0,
"active": true,
"traceable": true,
"trajectory": [
{
"maneuver_type": "Fly to Point",
"duration_s": 10.0,
"target_range_nm": 10.0,
"target_azimuth_deg": 1.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 200.0,
"target_range_nm": 20.0,
"target_azimuth_deg": 10.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 200.0,
"target_range_nm": 30.0,
"target_azimuth_deg": -10.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 200.0,
"target_range_nm": 35.0,
"target_azimuth_deg": 10.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 200.0,
"target_range_nm": 35.0,
"target_azimuth_deg": 30.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 200.0,
"target_range_nm": 20.0,
"target_azimuth_deg": 45.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
}
],
"use_spline": true
},
{
"target_id": 1,
"active": true,
"traceable": true,
"trajectory": [
{
"maneuver_type": "Fly to Point",
"duration_s": 10.0,
"target_range_nm": 10.0,
"target_azimuth_deg": 10.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 10.0,
"target_range_nm": 20.0,
"target_azimuth_deg": 20.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 30.0,
"target_range_nm": 30.0,
"target_azimuth_deg": 30.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 30.0,
"target_range_nm": 35.0,
"target_azimuth_deg": -10.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
}
],
"use_spline": false
}
]
},
"scenario3": {
"name": "scenario3",
"targets": [
{
"target_id": 0,
"active": true,
"traceable": true,
"trajectory": [
{
"maneuver_type": "Fly to Point",
"duration_s": 10.0,
"target_range_nm": 10.0,
"target_azimuth_deg": 0.0,
"target_altitude_ft": 10000.0,
"target_velocity_fps": 506.343,
"target_heading_deg": 180.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 40.0,
"target_range_nm": 20.0,
"target_azimuth_deg": 0.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 40.0,
"target_range_nm": 20.0,
"target_azimuth_deg": -90.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly to Point",
"duration_s": 40.0,
"target_range_nm": 30.0,
"target_azimuth_deg": -30.0,
"target_altitude_ft": 10000.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
}
],
"use_spline": true
},
{
"target_id": 1,
"active": true,
"traceable": true,
"trajectory": [
{
"maneuver_type": "Fly to Point",
"duration_s": 10.0,
"target_range_nm": 10.0,
"target_azimuth_deg": 25.0,
"target_altitude_ft": 10000.0,
"target_velocity_fps": 506.343,
"target_heading_deg": 180.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly for Duration",
"duration_s": 50.0,
"target_altitude_ft": 10000.0,
"target_velocity_fps": 1519.029,
"target_heading_deg": 45.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
},
{
"maneuver_type": "Fly for Duration",
"duration_s": 80.0,
"target_altitude_ft": 10000.0,
"target_velocity_fps": 1181.467,
"target_heading_deg": -30.0,
"longitudinal_acceleration_g": 0.0,
"lateral_acceleration_g": 0.0,
"vertical_acceleration_g": 0.0,
"turn_direction": "Right"
}
],
"use_spline": false
}
]
}
}
{}

View File

@ -18,7 +18,8 @@
"sfp": {
"ip": "127.0.0.1",
"port": 60001,
"local_port": 60002
"local_port": 60002,
"use_json_protocol": true
}
},
"lru": {
@ -34,7 +35,8 @@
"sfp": {
"ip": "127.0.0.1",
"port": 60001,
"local_port": 60002
"local_port": 60002,
"use_json_protocol": false
}
}
}

View File

@ -10,6 +10,9 @@ import math
import logging
from typing import Dict, Deque, Tuple, Optional, List
# Module-level logger for this module
logger = logging.getLogger(__name__)
# A state tuple can contain (timestamp, x, y, z, vx, vy, vz, ...)
# For now, we focus on timestamp and position in feet.
TargetState = Tuple[float, float, float, float]
@ -76,7 +79,6 @@ class SimulationStateHub:
# Diagnostic logging: compute azimuth under both axis interpretations
# to detect a possible 90-degree rotation due to swapped axes.
try:
logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
# State is now expected to be (x_ft, y_ft, z_ft)
x_ft = float(state[0]) if len(state) > 0 else 0.0

View File

@ -22,6 +22,7 @@ LOGGING_CONFIG = {
# --- Debug Configuration ---
DEBUG_CONFIG = {
"save_tftp_scripts": True, # Set to False to disable
"save_sfp_json_payloads": True, # Set to False to disable saving JSON payloads
"temp_folder_name": "Temp",
# Enable saving of IO traces (sent/received positions) to CSV files in Temp/
# Set to True during debugging to collect logs.

View File

@ -4,13 +4,55 @@
Constructs MMI command strings based on high-level data models.
"""
from typing import Dict, Any
import json
from typing import Dict, Any, List, Optional
from .models import Target
from target_simulator.utils.logger import get_logger
logger = get_logger(__name__)
# --- Constants for JSON protocol unit conversion ---
NM_TO_M = 1852.0
FT_TO_M = 0.3048
FPS_TO_MPS = 0.3048
def build_json_update(targets: List[Target], global_cmd: Optional[str] = None) -> str:
"""
Builds a JSON string for atomically updating multiple targets.
Args:
targets: A list of Target objects to include in the update.
global_cmd: An optional global command string (e.g., "reset").
Returns:
A formatted JSON string representing the bulk update command.
"""
payload: Dict[str, Any] = {"TGTS": []}
if global_cmd:
payload["CMD"] = global_cmd
for target in targets:
target_dict = {
"UID": target.target_id + 100,
"ID": target.target_id,
"a": int(target.active), # 1 for active, 0 for inactive
"t": int(target.traceable), # 1 for traceable, 0 for not traceable
"R": target.current_range_nm * NM_TO_M,
"NAZ": target.current_azimuth_deg,
"GV": target.current_velocity_fps * FPS_TO_MPS,
"VV": 0.0, # Vertical velocity, set to 0 as per initial spec
"H": target.current_heading_deg,
"A": target.current_altitude_ft * FT_TO_M,
"CMD": "CONTINUE" if target.active else "STOP",
}
payload["TGTS"].append(target_dict)
# Use indent for readability in debug files
return json.dumps(payload, indent=4)
def build_tgtinit(target: Target) -> str:
"""
@ -126,3 +168,8 @@ def build_aclatch() -> str:
def build_acunlatch() -> str:
"""Builds the command to release the A/C data latch."""
return "acunlatch"
def build_reset() -> str:
"""Builds the 'reset' command."""
return "reset"

View File

@ -6,6 +6,8 @@ Handles SFP (Simple Fragmentation Protocol) communication with the target device
import socket
import time
import os
from datetime import datetime
from typing import List, Optional, Dict, Any, Callable
from queue import Queue
@ -16,6 +18,7 @@ from target_simulator.core import command_builder
from target_simulator.utils.logger import get_logger
from target_simulator.gui.payload_router import DebugPayloadRouter
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.config import DEBUG_CONFIG
class SFPCommunicator(CommunicatorInterface):
@ -32,10 +35,38 @@ class SFPCommunicator(CommunicatorInterface):
self.simulation_hub = simulation_hub
self.update_queue = update_queue
self._connection_state_callbacks: List[Callable[[bool], None]] = []
# State variable to determine which communication protocol to use
self._use_json_protocol: bool = False
# Attributes for debug file saving
self.debug_config = DEBUG_CONFIG
self.temp_folder_path = os.path.join(
os.getcwd(), self.debug_config.get("temp_folder_name", "Temp")
)
# Unified payload router
self.payload_router = DebugPayloadRouter(simulation_hub=simulation_hub, update_queue=update_queue)
def _save_json_payload_to_temp(self, content: str, prefix: str):
"""Saves the JSON payload to a timestamped file in the Temp folder if debug is enabled."""
if not self.debug_config.get("save_sfp_json_payloads", False):
return
try:
if not os.path.exists(self.temp_folder_path):
os.makedirs(self.temp_folder_path)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
filename = f"{prefix}_{timestamp}.json"
filepath = os.path.join(self.temp_folder_path, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
self.logger.debug(f"Saved debug JSON payload to '{filepath}'")
except IOError as e:
self.logger.error(f"Failed to save debug JSON payload: {e}")
def router(self) -> DebugPayloadRouter:
return self.payload_router
@ -71,6 +102,10 @@ class SFPCommunicator(CommunicatorInterface):
"SFP connection failed: Missing 'ip', 'port' (remote), or 'local_port' in config."
)
return False
# Read the protocol selection flag from the configuration
self._use_json_protocol = config.get("use_json_protocol", False)
self.logger.info(f"SFP JSON protocol enabled: {self._use_json_protocol}")
result = False
try:
@ -139,29 +174,35 @@ class SFPCommunicator(CommunicatorInterface):
self.logger.info(f"Sending scenario '{scenario.name}' via SFP...")
# Build a list of commands to be sent atomically
commands = [
command_builder.build_pause(),
command_builder.build_aclatch() # Add aclatch for atomic update
]
if self._use_json_protocol:
# --- JSON Protocol Logic for scenario initialization ---
self.logger.debug("Using JSON protocol for scenario initialization.")
json_command = command_builder.build_json_update(scenario.get_all_targets())
self._save_json_payload_to_temp(json_command, "sfp_scenario_init")
for target in scenario.get_all_targets():
commands.append(command_builder.build_tgtinit(target))
commands.extend([
command_builder.build_acunlatch(), # Add acunlatch to apply changes
command_builder.build_continue()
])
# Send commands with a small delay between each to prevent packet loss
for cmd in commands:
if not self._send_single_command(cmd):
self.logger.error(f"Failed to send command '{cmd}'. Aborting scenario send.")
# Attempt to leave the server in a good state
self._send_single_command(command_builder.build_acunlatch())
self._send_single_command(command_builder.build_continue())
if not self._send_single_command(json_command):
self.logger.error("Failed to send JSON scenario payload.")
return False
time.sleep(0.05) # Use a safer 50ms delay between commands
else:
# --- Legacy Command Logic ---
self.logger.debug("Using legacy command protocol for scenario initialization.")
commands = [
command_builder.build_pause(),
command_builder.build_aclatch()
]
for target in scenario.get_all_targets():
commands.append(command_builder.build_tgtinit(target))
commands.extend([
command_builder.build_acunlatch(),
command_builder.build_continue()
])
for cmd in commands:
if not self._send_single_command(cmd):
self.logger.error(f"Failed to send command '{cmd}'. Aborting scenario send.")
self._send_single_command(command_builder.build_acunlatch())
self._send_single_command(command_builder.build_continue())
return False
time.sleep(0.05)
self.logger.info("Finished sending scenario via SFP.")
return True
@ -170,17 +211,30 @@ class SFPCommunicator(CommunicatorInterface):
if not self.is_open:
return False
all_success = True
for cmd in commands:
if not self._send_single_command(cmd):
all_success = False
return all_success
if self._use_json_protocol:
# --- JSON Protocol Logic for live updates ---
if not commands:
self.logger.debug("send_commands (JSON mode) called with empty list. Nothing to send.")
return True
# Expect a single command string which is the JSON payload
json_payload = commands[0]
self._save_json_payload_to_temp(json_payload, "sfp_live_update")
return self._send_single_command(json_payload)
else:
# --- Legacy Command Logic ---
all_success = True
for cmd in commands:
if not self._send_single_command(cmd):
all_success = False
return all_success
def _send_single_command(self, command: str) -> bool:
if not self.transport or not self._destination:
return False
return self.transport.send_script_command(command, self._destination)
@staticmethod
def test_connection(config: Dict[str, Any]) -> bool:
local_port = config.get("local_port")
if not local_port:
@ -193,5 +247,6 @@ class SFPCommunicator(CommunicatorInterface):
except Exception:
return False
@staticmethod
def list_available_ports() -> List[str]:
return []
return []

View File

@ -37,6 +37,13 @@ class SimulationEngine(threading.Thread):
self.simulation_hub = simulation_hub # Hub for data analysis
self.time_multiplier = 1.0
self.update_interval_s = 1.0
# Determine communication protocol from the communicator's config
self.use_json_protocol = False
if communicator:
# Safely access the _use_json_protocol flag if it exists (for SFPCommunicator)
self.use_json_protocol = getattr(communicator, '_use_json_protocol', False)
self.logger.info(f"SimulationEngine will use JSON protocol: {self.use_json_protocol}")
self.scenario: Optional[Scenario] = None
self._stop_event = threading.Event()
@ -115,19 +122,14 @@ class SimulationEngine(threading.Thread):
if (current_time - self._last_update_time) >= self.update_interval_s:
self._last_update_time = current_time
# Only proceed if the communicator is valid and open
if self.communicator and self.communicator.is_open:
commands_to_send = []
timestamp_for_batch = time.monotonic()
active_targets = [t for t in updated_targets if t.active]
for target in active_targets:
# Build the command string ONCE and reuse it
cmd = command_builder.build_tgtset_from_target_state(target)
commands_to_send.append(cmd)
# 1. Log the simulated state to the hub for analysis
if self.simulation_hub:
# Log simulated state for all active targets to the hub for analysis
if self.simulation_hub:
for target in active_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
@ -136,24 +138,49 @@ class SimulationEngine(threading.Thread):
self.simulation_hub.add_simulated_state(
target.target_id, timestamp_for_batch, state_tuple
)
# 1b. Optionally save sent positions to CSV using the pre-built command
try:
append_sent_position(
timestamp_for_batch,
target.target_id,
state_tuple[0],
state_tuple[1],
state_tuple[2],
cmd, # Use the existing command string
)
except Exception:
pass
# 3. Send all commands in a single batch
# --- Protocol-dependent command generation ---
if self.use_json_protocol:
# --- JSON Protocol Logic ---
if active_targets:
json_payload = command_builder.build_json_update(active_targets)
commands_to_send.append(json_payload)
# Log to CSV for debugging
for target in active_targets:
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
append_sent_position(
timestamp_for_batch, target.target_id,
state_tuple[0], state_tuple[1], state_tuple[2],
"JSON_BULK_UPDATE" # Use a placeholder in the command field
)
else:
# --- Legacy Protocol Logic ---
for target in active_targets:
cmd = command_builder.build_tgtset_from_target_state(target)
commands_to_send.append(cmd)
# Log to CSV for debugging
state_tuple = (
getattr(target, "_pos_x_ft", 0.0),
getattr(target, "_pos_y_ft", 0.0),
getattr(target, "_pos_z_ft", 0.0),
)
append_sent_position(
timestamp_for_batch, target.target_id,
state_tuple[0], state_tuple[1], state_tuple[2],
cmd
)
# --- Send the batch of commands ---
if commands_to_send:
self.communicator.send_commands(commands_to_send)
# 4. Update the GUI queue, now synced with the communication update
# Update the GUI queue
if self.update_queue:
try:
self.update_queue.put_nowait(updated_targets)

View File

@ -46,6 +46,7 @@ class ConnectionSettingsWindow(tk.Toplevel):
def _load_settings(self):
# --- Load Target Settings ---
target_cfg = self.connection_config.get("target", {})
target_sfp_cfg = target_cfg.get("sfp", {})
# Normalize connection type so it matches the radiobutton values used elsewhere
t = (target_cfg.get("type", "SFP") or "").lower()
if t == "serial":
@ -56,9 +57,10 @@ class ConnectionSettingsWindow(tk.Toplevel):
self.target_vars["tftp_port"].set(target_cfg.get("tftp", {}).get("port", 69))
self.target_vars["serial_port"].set(target_cfg.get("serial", {}).get("port", "COM1"))
self.target_vars["serial_baud"].set(target_cfg.get("serial", {}).get("baudrate", 9600))
self.target_vars["sfp_ip"].set(target_cfg.get("sfp", {}).get("ip", "127.0.0.1"))
self.target_vars["sfp_port"].set(target_cfg.get("sfp", {}).get("port", 60001))
self.target_vars["sfp_local_port"].set(target_cfg.get("sfp", {}).get("local_port", 60002))
self.target_vars["sfp_ip"].set(target_sfp_cfg.get("ip", "127.0.0.1"))
self.target_vars["sfp_port"].set(target_sfp_cfg.get("port", 60001))
self.target_vars["sfp_local_port"].set(target_sfp_cfg.get("local_port", 60002))
self.target_vars["sfp_use_json"].set(target_sfp_cfg.get("use_json_protocol", False))
# Select the correct notebook tab for target
try:
tab_idx = {"SFP": 0, "TFTP": 1, "Serial": 2}[self.target_vars["conn_type"].get()]
@ -68,6 +70,7 @@ class ConnectionSettingsWindow(tk.Toplevel):
# --- Load LRU Settings ---
lru_cfg = self.connection_config.get("lru", {})
lru_sfp_cfg = lru_cfg.get("sfp", {})
t = (lru_cfg.get("type", "SFP") or "").lower()
if t == "serial":
self.lru_vars["conn_type"].set("Serial")
@ -77,9 +80,10 @@ class ConnectionSettingsWindow(tk.Toplevel):
self.lru_vars["tftp_port"].set(lru_cfg.get("tftp", {}).get("port", 69))
self.lru_vars["serial_port"].set(lru_cfg.get("serial", {}).get("port", "COM1"))
self.lru_vars["serial_baud"].set(lru_cfg.get("serial", {}).get("baudrate", 9600))
self.lru_vars["sfp_ip"].set(lru_cfg.get("sfp", {}).get("ip", "127.0.0.1"))
self.lru_vars["sfp_port"].set(lru_cfg.get("sfp", {}).get("port", 60001))
self.lru_vars["sfp_local_port"].set(lru_cfg.get("sfp", {}).get("local_port", 60002))
self.lru_vars["sfp_ip"].set(lru_sfp_cfg.get("ip", "127.0.0.1"))
self.lru_vars["sfp_port"].set(lru_sfp_cfg.get("port", 60001))
self.lru_vars["sfp_local_port"].set(lru_sfp_cfg.get("local_port", 60002))
self.lru_vars["sfp_use_json"].set(lru_sfp_cfg.get("use_json_protocol", False))
# Select the correct notebook tab for lru
try:
tab_idx = {"SFP": 0, "TFTP": 1, "Serial": 2}[self.lru_vars["conn_type"].get()]
@ -187,6 +191,10 @@ class ConnectionSettingsWindow(tk.Toplevel):
vars["sfp_local_port"] = tk.IntVar()
ttk.Spinbox(sfp_grid, from_=1, to=65535, textvariable=vars["sfp_local_port"], width=7).grid(row=2, column=1, sticky=tk.W, padx=5)
# New checkbox for JSON protocol
vars["sfp_use_json"] = tk.BooleanVar()
ttk.Checkbutton(sfp_grid, text="Use JSON Protocol", variable=vars["sfp_use_json"]).grid(row=3, column=0, columnspan=2, sticky=tk.W, pady=(5,0))
# place test button to the right of the notebook area
test_row = ttk.Frame(parent_frame)
test_row.pack(fill=tk.X, padx=5, pady=(2, 8))
@ -231,13 +239,23 @@ class ConnectionSettingsWindow(tk.Toplevel):
"type": self.target_vars["conn_type"].get().lower(),
"tftp": {"ip": self.target_vars["tftp_ip"].get(), "port": self.target_vars["tftp_port"].get()},
"serial": {"port": self.target_vars["serial_port"].get(), "baudrate": self.target_vars["serial_baud"].get()},
"sfp": {"ip": self.target_vars["sfp_ip"].get(), "port": self.target_vars["sfp_port"].get(), "local_port": self.target_vars["sfp_local_port"].get()}
"sfp": {
"ip": self.target_vars["sfp_ip"].get(),
"port": self.target_vars["sfp_port"].get(),
"local_port": self.target_vars["sfp_local_port"].get(),
"use_json_protocol": self.target_vars["sfp_use_json"].get(),
}
},
"lru": {
"type": self.lru_vars["conn_type"].get().lower(),
"tftp": {"ip": self.lru_vars["tftp_ip"].get(), "port": self.lru_vars["tftp_port"].get()},
"serial": {"port": self.lru_vars["serial_port"].get(), "baudrate": self.lru_vars["serial_baud"].get()},
"sfp": {"ip": self.lru_vars["sfp_ip"].get(), "port": self.lru_vars["sfp_port"].get(), "local_port": self.lru_vars["sfp_local_port"].get()}
"sfp": {
"ip": self.lru_vars["sfp_ip"].get(),
"port": self.lru_vars["sfp_port"].get(),
"local_port": self.lru_vars["sfp_local_port"].get(),
"use_json_protocol": self.lru_vars["sfp_use_json"].get(),
}
},
}
self.master_view.update_connection_settings(new_config)

View File

@ -0,0 +1,159 @@
# target_simulator/gui/logger_panel.py
"""
A small Toplevel UI to inspect and change logger levels at runtime.
"""
import tkinter as tk
from tkinter import ttk, messagebox
import logging
from typing import List
LEVELS = [
("NOTSET", logging.NOTSET),
("DEBUG", logging.DEBUG),
("INFO", logging.INFO),
("WARNING", logging.WARNING),
("ERROR", logging.ERROR),
("CRITICAL", logging.CRITICAL),
]
class LoggerPanel(tk.Toplevel):
"""Toplevel window that allows setting logger levels at runtime."""
def __init__(self, master=None):
super().__init__(master)
self.title("Logger Levels")
self.geometry("520x420")
self.transient(master)
self.grab_set()
self.logger_names = [] # type: List[str]
self._create_widgets()
self._populate_logger_list()
def _create_widgets(self):
top = ttk.Frame(self)
top.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
# Left: list of logger names
left = ttk.Frame(top)
left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
ttk.Label(left, text="Available loggers:").pack(anchor=tk.W)
self.logger_listbox = tk.Listbox(left, exportselection=False)
self.logger_listbox.pack(fill=tk.BOTH, expand=True, padx=(0, 6), pady=(4, 0))
self.logger_listbox.bind("<<ListboxSelect>>", self._on_select_logger)
# Right: controls
right = ttk.Frame(top)
right.pack(side=tk.RIGHT, fill=tk.Y)
ttk.Label(right, text="Selected logger:").pack(anchor=tk.W)
self.selected_name_var = tk.StringVar(value="")
ttk.Label(right, textvariable=self.selected_name_var, foreground="blue").pack(anchor=tk.W, pady=(0, 6))
ttk.Label(right, text="Level:").pack(anchor=tk.W)
self.level_var = tk.StringVar(value="INFO")
level_names = [n for n, v in LEVELS]
self.level_combo = ttk.Combobox(right, values=level_names, textvariable=self.level_var, state="readonly", width=12)
self.level_combo.pack(anchor=tk.W, pady=(0, 6))
ttk.Button(right, text="Apply", command=self._apply_level).pack(fill=tk.X, pady=(6, 4))
ttk.Button(right, text="Reset to NOTSET", command=self._reset_level).pack(fill=tk.X)
ttk.Separator(self).pack(fill=tk.X, pady=6)
bottom = ttk.Frame(self)
bottom.pack(fill=tk.X, padx=8, pady=6)
ttk.Label(bottom, text="Add / open logger by name:").pack(anchor=tk.W)
self.new_logger_var = tk.StringVar()
entry = ttk.Entry(bottom, textvariable=self.new_logger_var)
entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 6))
ttk.Button(bottom, text="Open", command=self._open_named_logger).pack(side=tk.LEFT)
ttk.Button(self, text="Refresh", command=self._populate_logger_list).pack(side=tk.RIGHT, padx=8, pady=(0, 8))
ttk.Button(self, text="Close", command=self._on_close).pack(side=tk.RIGHT, pady=(0, 8))
def _gather_logger_names(self) -> List[str]:
# Gather logger names from the logging manager plus some defaults
manager = logging.root.manager
names = list(getattr(manager, 'loggerDict', {}).keys())
# Add a few commonly useful module names if missing
defaults = [
'target_simulator',
'target_simulator.analysis.simulation_state_hub',
'target_simulator.gui.sfp_debug_window',
'target_simulator.gui.ppi_display',
'target_simulator.gui.payload_router',
'target_simulator.core.sfp_transport',
]
for d in defaults:
if d not in names:
names.append(d)
names = sorted(set(names))
return names
def _populate_logger_list(self):
self.logger_listbox.delete(0, tk.END)
self.logger_names = self._gather_logger_names()
for n in self.logger_names:
self.logger_listbox.insert(tk.END, n)
def _on_select_logger(self, event=None):
sel = self.logger_listbox.curselection()
if not sel:
return
idx = sel[0]
name = self.logger_names[idx]
self.selected_name_var.set(name)
lg = logging.getLogger(name)
lvl = lg.getEffectiveLevel()
# Translate to name
lvl_name = logging.getLevelName(lvl)
self.level_var.set(lvl_name)
def _apply_level(self):
name = self.selected_name_var.get()
if not name:
messagebox.showwarning("No logger selected", "Select a logger from the list first.")
return
lvl_name = self.level_var.get()
lvl = next((v for n, v in LEVELS if n == lvl_name), logging.INFO)
logging.getLogger(name).setLevel(lvl)
messagebox.showinfo("Logger level set", f"Logger '{name}' set to {lvl_name}.")
def _reset_level(self):
name = self.selected_name_var.get()
if not name:
messagebox.showwarning("No logger selected", "Select a logger from the list first.")
return
logging.getLogger(name).setLevel(logging.NOTSET)
self.level_var.set('NOTSET')
messagebox.showinfo("Logger reset", f"Logger '{name}' reset to NOTSET.")
def _open_named_logger(self):
name = self.new_logger_var.get().strip()
if not name:
return
# If exists in list, select it
try:
idx = self.logger_names.index(name)
except ValueError:
# Add to list
self.logger_names.append(name)
self.logger_names.sort()
self._populate_logger_list()
idx = self.logger_names.index(name)
self.logger_listbox.select_clear(0, tk.END)
self.logger_listbox.select_set(idx)
self.logger_listbox.see(idx)
self._on_select_logger()
def _on_close(self):
try:
self.grab_release()
except Exception:
pass
self.destroy()

View File

@ -27,6 +27,7 @@ from target_simulator.core.models import Scenario, Target
from target_simulator.utils.logger import get_logger, shutdown_logging_system
from target_simulator.utils.config_manager import ConfigManager
from target_simulator.gui.sfp_debug_window import SfpDebugWindow
from target_simulator.gui.logger_panel import LoggerPanel
from target_simulator.core.sfp_communicator import SFPCommunicator
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.analysis.performance_analyzer import PerformanceAnalyzer
@ -360,6 +361,7 @@ class MainView(tk.Tk):
debug_menu.add_command(
label="SFP Packet Inspector...", command=self._open_sfp_debug_window
)
debug_menu.add_command(label="Logger Levels...", command=self._open_logger_panel)
def _create_statusbar(self):
status_bar = ttk.Frame(self, relief=tk.SUNKEN)
@ -1145,6 +1147,15 @@ class MainView(tk.Tk):
self.logger.info("Opening SFP Packet Inspector window...")
self.sfp_debug_window = SfpDebugWindow(self)
def _open_logger_panel(self):
"""Open the LoggerPanel to inspect/change logger levels at runtime."""
try:
# Create transient dialog attached to main window
LoggerPanel(self)
except Exception:
# Avoid crashing the UI if the panel fails to open
self.logger.exception("Failed to open LoggerPanel")
def _on_reset_simulation(self):
self.logger.info("Resetting scenario to initial state.")

View File

@ -22,6 +22,9 @@ from target_simulator.core.sfp_structures import SFPHeader, SfpRisStatusPayload
from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.core.models import Target
# Module-level logger for this module
logger = logging.getLogger(__name__)
PayloadHandler = Callable[[bytearray], None]
TargetListListener = Callable[[List[Target]], None]
@ -63,8 +66,8 @@ class DebugPayloadRouter:
ord("R"): self._handle_ris_status,
ord("r"): self._handle_ris_status,
}
logging.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}, Queue: {self._update_queue is not None}).")
self._logger = logging.getLogger(__name__)
logger.info(f"{self._log_prefix} Initialized (Hub: {self._hub is not None}, Queue: {self._update_queue is not None}).")
self._logger = logger
def add_ris_target_listener(self, listener: TargetListListener):
"""Registers a callback function to receive updates for real targets."""

View File

@ -18,6 +18,9 @@ from typing import List, Dict, Union
from target_simulator.core.models import Target, Waypoint, ManeuverType, NM_TO_FT
# Module-level logger
logger = logging.getLogger(__name__)
class PPIDisplay(ttk.Frame):
"""
@ -197,7 +200,6 @@ class PPIDisplay(ttk.Frame):
def _draw_target_visuals(self, targets: List[Target], color: str, artist_list: List, label_artist_list: List):
vector_len_nm = self.range_var.get() / 20.0
logger = logging.getLogger(__name__)
# Determine marker size based on the target type (color)
marker_size = 6 if color == 'red' else 8 # Simulated targets (green) are smaller

View File

@ -29,6 +29,9 @@ from target_simulator.gui.payload_router import DebugPayloadRouter
from target_simulator.core.models import Target, Waypoint, ManeuverType, KNOTS_TO_FPS
from target_simulator.core import command_builder
# Module-level logger
logger = logging.getLogger(__name__)
DEF_TEST_ID = 1
DEF_TEST_RANGE = 30.0
@ -48,7 +51,7 @@ class SfpDebugWindow(tk.Toplevel):
self.master = master
self.geometry("1100x700")
self.protocol("WM_DELETE_WINDOW", self._on_close)
self.logger = logging.getLogger(__name__)
self.logger = logger
self.debug_update_queue = Queue()

View File

@ -5,6 +5,11 @@ import tkinter as tk
from tkinter.scrolledtext import ScrolledText
from queue import Queue, Empty as QueueEmpty
from typing import Optional, Dict, Any
from contextlib import contextmanager
from logging import Logger
# Module-level logger for utils.logging helpers
logger = logging.getLogger(__name__)
# --- Module-level globals for the centralized logging queue system ---
_global_log_queue: Optional[Queue[logging.LogRecord]] = None
@ -156,17 +161,14 @@ def add_tkinter_handler(gui_log_widget: tk.Text, logging_config_dict: Dict[str,
if _actual_tkinter_handler:
_actual_tkinter_handler.close()
if (
isinstance(gui_log_widget, (tk.Text, ScrolledText))
and gui_log_widget.winfo_exists()
):
if isinstance(gui_log_widget, (tk.Text, ScrolledText)) and gui_log_widget.winfo_exists():
level_colors = logging_config_dict.get("colors", {})
_actual_tkinter_handler = TkinterTextHandler(
text_widget=gui_log_widget, level_colors=level_colors
)
_actual_tkinter_handler.setFormatter(_base_formatter)
_actual_tkinter_handler.setLevel(logging.DEBUG)
logging.getLogger(__name__).info("Tkinter log handler added successfully.")
logger.info("Tkinter log handler added successfully.")
else:
print(
"ERROR: GUI log widget invalid, cannot add TkinterTextHandler.", flush=True
@ -177,6 +179,23 @@ def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)
@contextmanager
def temporary_log_level(logger: Logger, level: int):
"""Context manager to temporarily set a logger's level.
Usage:
with temporary_log_level(logging.getLogger('some.name'), logging.DEBUG):
# inside this block the logger will be DEBUG
...
"""
old_level = logger.level
logger.setLevel(level)
try:
yield
finally:
logger.setLevel(old_level)
def shutdown_logging_system():
global _logging_system_active, _log_processor_after_id
if not _logging_system_active: