Add IO trace CSV logger (disabled by default) and log sent/received target positions when debug flag enabled

This commit is contained in:
VALLONGOL 2025-10-22 09:23:35 +02:00
parent 1259ff1a38
commit e81affb5f1
4 changed files with 84 additions and 0 deletions

View File

@ -23,4 +23,9 @@ LOGGING_CONFIG = {
DEBUG_CONFIG = { DEBUG_CONFIG = {
"save_tftp_scripts": True, # Set to False to disable "save_tftp_scripts": True, # Set to False to disable
"temp_folder_name": "Temp", "temp_folder_name": "Temp",
# Enable saving of IO traces (sent/received positions) to CSV files in Temp/
# Set to True during debugging to collect logs.
"enable_io_trace": False,
"io_trace_sent_filename": "sent_positions.csv",
"io_trace_received_filename": "received_positions.csv",
} }

View File

@ -15,6 +15,7 @@ from target_simulator.core.models import Scenario, Target
from target_simulator.core import command_builder from target_simulator.core import command_builder
from target_simulator.utils.logger import get_logger from target_simulator.utils.logger import get_logger
from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.utils.csv_logger import append_sent_position
# Simulation frequency in Hertz # Simulation frequency in Hertz
TICK_RATE_HZ = 20.0 TICK_RATE_HZ = 20.0
@ -131,6 +132,19 @@ class SimulationEngine(threading.Thread):
self.simulation_hub.add_simulated_state( self.simulation_hub.add_simulated_state(
target.target_id, timestamp_for_batch, state_tuple target.target_id, timestamp_for_batch, state_tuple
) )
# 1b. Optionally save the sent positions to CSV for debugging
try:
append_sent_position(
timestamp_for_batch,
target.target_id,
state_tuple[0],
state_tuple[1],
state_tuple[2],
command_builder.build_tgtset_from_target_state(target),
)
except Exception:
# Do not break the simulation for logging failures
pass
# 2. Build the command to send to the radar # 2. Build the command to send to the radar
cmd = command_builder.build_tgtset_from_target_state(target) cmd = command_builder.build_tgtset_from_target_state(target)

View File

@ -10,6 +10,7 @@ from queue import Queue, Full
from target_simulator.core.sfp_structures import SfpRisStatusPayload from target_simulator.core.sfp_structures import SfpRisStatusPayload
from target_simulator.analysis.simulation_state_hub import SimulationStateHub from target_simulator.analysis.simulation_state_hub import SimulationStateHub
from target_simulator.utils.csv_logger import append_received_position
PayloadHandlerFunc = Callable[[bytearray], None] PayloadHandlerFunc = Callable[[bytearray], None]
@ -84,6 +85,11 @@ class SimulationPayloadHandler:
timestamp=radar_timestamp_s, timestamp=radar_timestamp_s,
state=state state=state
) )
# Optionally append the received state to CSV for debugging
try:
append_received_position(radar_timestamp_s, target_id, state[0], state[1], state[2])
except Exception:
pass
processed += 1 processed += 1
# Notify GUI to refresh display (we enqueue an empty list which # Notify GUI to refresh display (we enqueue an empty list which

View File

@ -0,0 +1,59 @@
import csv
import os
import time
from typing import Iterable, Any
from target_simulator.config import DEBUG_CONFIG
def _ensure_temp_folder():
temp_folder = DEBUG_CONFIG.get("temp_folder_name", "Temp")
if not os.path.exists(temp_folder):
try:
os.makedirs(temp_folder, exist_ok=True)
except Exception:
# If we cannot create the folder, swallow the exception; callers
# should handle absence of files gracefully.
return None
return temp_folder
def append_row(filename: str, row: Iterable[Any], headers: Iterable[str] | None = None):
"""Append a row to a CSV file inside the Temp folder.
If the file doesn't exist and headers are provided, write headers first.
"""
if not DEBUG_CONFIG.get("enable_io_trace", False):
return False
temp_folder = _ensure_temp_folder()
if not temp_folder:
return False
file_path = os.path.join(temp_folder, filename)
write_headers = not os.path.exists(file_path) and headers is not None
try:
with open(file_path, "a", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
if write_headers:
writer.writerow(list(headers))
writer.writerow(list(row))
except Exception:
return False
return True
def append_sent_position(timestamp: float, target_id: int, x: float, y: float, z: float, command: str):
filename = DEBUG_CONFIG.get("io_trace_sent_filename", "sent_positions.csv")
headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft", "command"]
row = [timestamp, target_id, x, y, z, command]
return append_row(filename, row, headers=headers)
def append_received_position(timestamp: float, target_id: int, x: float, y: float, z: float):
filename = DEBUG_CONFIG.get("io_trace_received_filename", "received_positions.csv")
headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft"]
row = [timestamp, target_id, x, y, z]
return append_row(filename, row, headers=headers)