S1005403_RisCC/target_simulator/utils/csv_logger.py
2025-11-13 10:38:52 +01:00

110 lines
3.8 KiB
Python

"""CSV helpers for IO tracing.
This module provides lightweight helpers that append sent/received
position records to CSV files located in the application's Temp folder.
Behavior is governed by `target_simulator.config.DEBUG_CONFIG` (see keys
``enable_io_trace``, ``temp_folder_name``, and filename overrides).
These functions are intended for debugging and tracing; they return ``True``
when the append operation succeeded and ``False`` when disabled or on error.
PERFORMANCE: Uses asynchronous buffering to avoid blocking the simulation
thread. Rows are buffered in memory and flushed periodically by a background
thread, eliminating I/O overhead from the critical path.
"""
import csv
import os
import time
import threading
import atexit
from typing import Iterable, Any, Dict, List, Tuple
from collections import deque
from target_simulator.config import DEBUG_CONFIG
# --- Async CSV Buffer ---
_CSV_BUFFER_LOCK = threading.Lock()
_CSV_BUFFERS: Dict[str, deque] = {} # filename -> deque of (row, headers)
_CSV_FLUSH_THREAD: threading.Thread = None
_CSV_STOP_EVENT = threading.Event()
_CSV_FLUSH_INTERVAL_S = 2.0 # Flush every 2 seconds
_CSV_MAX_BUFFER_SIZE = 1000 # Flush immediately if buffer exceeds this
def _ensure_temp_folder():
temp_folder = DEBUG_CONFIG.get("temp_folder_name", "Temp")
if not os.path.exists(temp_folder):
try:
os.makedirs(temp_folder, exist_ok=True)
except Exception:
# If we cannot create the folder, swallow the exception; callers
# should handle absence of files gracefully.
return None
return temp_folder
def append_row(filename: str, row: Iterable[Any], headers: Iterable[str] | None = None):
"""Append a row to a CSV file stored under the Temp folder.
If the file does not exist and ``headers`` is provided, the headers are
written as the first row. The function is a no-op when tracing is
disabled via DEBUG_CONFIG.
Args:
filename: Name of the target CSV file inside the Temp folder.
row: Iterable of values to write as a CSV row.
headers: Optional iterable of header names to write when creating a
new file.
Returns:
True on success, False when tracing is disabled or an error occurred.
"""
if not DEBUG_CONFIG.get("enable_io_trace", False):
return False
temp_folder = _ensure_temp_folder()
if not temp_folder:
return False
file_path = os.path.join(temp_folder, filename)
write_headers = not os.path.exists(file_path) and headers is not None
try:
with open(file_path, "a", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
if write_headers:
writer.writerow(list(headers))
writer.writerow(list(row))
except Exception:
return False
return True
def append_sent_position(
timestamp: float, target_id: int, x: float, y: float, z: float, command: str
):
"""Append a sent-position entry for IO tracing.
The row contains a timestamp, target id, position in feet and the issued
command string.
"""
filename = DEBUG_CONFIG.get("io_trace_sent_filename", "sent_positions.csv")
headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft", "command"]
row = [timestamp, target_id, x, y, z, command]
return append_row(filename, row, headers=headers)
def append_received_position(
timestamp: float, target_id: int, x: float, y: float, z: float
):
"""Append a received-position entry for IO tracing.
The row contains a timestamp, target id and position in feet.
"""
filename = DEBUG_CONFIG.get("io_trace_received_filename", "received_positions.csv")
headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft"]
row = [timestamp, target_id, x, y, z]
return append_row(filename, row, headers=headers)