"""CSV helpers for IO tracing. This module provides lightweight helpers that append sent/received position records to CSV files located in the application's Temp folder. Behavior is governed by `target_simulator.config.DEBUG_CONFIG` (see keys ``enable_io_trace``, ``temp_folder_name``, and filename overrides). These functions are intended for debugging and tracing; they return ``True`` when the append operation succeeded and ``False`` when disabled or on error. PERFORMANCE: Uses asynchronous buffering to avoid blocking the simulation thread. Rows are buffered in memory and flushed periodically by a background thread, eliminating I/O overhead from the critical path. """ import csv import os import time import threading import atexit from typing import Iterable, Any, Dict, List, Tuple from collections import deque from target_simulator.config import DEBUG_CONFIG # --- Async CSV Buffer --- _CSV_BUFFER_LOCK = threading.Lock() _CSV_BUFFERS: Dict[str, deque] = {} # filename -> deque of (row, headers) _CSV_FLUSH_THREAD: threading.Thread = None _CSV_STOP_EVENT = threading.Event() _CSV_FLUSH_INTERVAL_S = 2.0 # Flush every 2 seconds _CSV_MAX_BUFFER_SIZE = 1000 # Flush immediately if buffer exceeds this def _csv_flush_worker(): """Background thread that periodically flushes buffered CSV rows to disk.""" while not _CSV_STOP_EVENT.is_set(): time.sleep(_CSV_FLUSH_INTERVAL_S) _flush_all_buffers() # Final flush on shutdown _flush_all_buffers() def _flush_all_buffers(): """Flush all buffered CSV rows to their respective files.""" with _CSV_BUFFER_LOCK: for filename, buffer in list(_CSV_BUFFERS.items()): if not buffer: continue temp_folder = _ensure_temp_folder() if not temp_folder: continue file_path = os.path.join(temp_folder, filename) # Check if we need to write headers write_headers = not os.path.exists(file_path) try: with open(file_path, "a", newline="", encoding="utf-8") as csvfile: writer = csv.writer(csvfile) # Write all buffered rows while buffer: row, headers = buffer.popleft() # Write headers only once for new files if write_headers and headers is not None: writer.writerow(list(headers)) write_headers = False writer.writerow(list(row)) except Exception: # Clear buffer on error to avoid accumulation buffer.clear() def _ensure_csv_flush_thread(): """Ensure the background flush thread is running.""" global _CSV_FLUSH_THREAD if _CSV_FLUSH_THREAD is None or not _CSV_FLUSH_THREAD.is_alive(): _CSV_STOP_EVENT.clear() _CSV_FLUSH_THREAD = threading.Thread( target=_csv_flush_worker, daemon=True, name="CSVFlushThread" ) _CSV_FLUSH_THREAD.start() # Register cleanup on exit atexit.register(_shutdown_csv_logger) def _shutdown_csv_logger(): """Stop the flush thread and ensure all data is written.""" _CSV_STOP_EVENT.set() if _CSV_FLUSH_THREAD and _CSV_FLUSH_THREAD.is_alive(): _CSV_FLUSH_THREAD.join(timeout=5.0) def _ensure_temp_folder(): temp_folder = DEBUG_CONFIG.get("temp_folder_name", "Temp") if not os.path.exists(temp_folder): try: os.makedirs(temp_folder, exist_ok=True) except Exception: # If we cannot create the folder, swallow the exception; callers # should handle absence of files gracefully. return None return temp_folder def append_row(filename: str, row: Iterable[Any], headers: Iterable[str] | None = None): """Append a row to a CSV file stored under the Temp folder. If the file does not exist and ``headers`` is provided, the headers are written as the first row. The function is a no-op when tracing is disabled via DEBUG_CONFIG. PERFORMANCE: This function is now async-buffered and returns immediately without blocking on I/O. Rows are written to disk by a background thread. Args: filename: Name of the target CSV file inside the Temp folder. row: Iterable of values to write as a CSV row. headers: Optional iterable of header names to write when creating a new file. Returns: True on success, False when tracing is disabled or an error occurred. """ if not DEBUG_CONFIG.get("enable_io_trace", False): return False temp_folder = _ensure_temp_folder() if not temp_folder: return False # Ensure flush thread is running _ensure_csv_flush_thread() # Buffer the row for async writing with _CSV_BUFFER_LOCK: if filename not in _CSV_BUFFERS: _CSV_BUFFERS[filename] = deque(maxlen=_CSV_MAX_BUFFER_SIZE * 2) _CSV_BUFFERS[filename].append((row, headers)) # Force immediate flush if buffer is getting large if len(_CSV_BUFFERS[filename]) >= _CSV_MAX_BUFFER_SIZE: # Schedule immediate flush without blocking threading.Thread(target=_flush_all_buffers, daemon=True).start() return True def append_sent_position( timestamp: float, target_id: int, x: float, y: float, z: float, command: str ): """Append a sent-position entry for IO tracing. The row contains a timestamp, target id, position in feet and the issued command string. """ filename = DEBUG_CONFIG.get("io_trace_sent_filename", "sent_positions.csv") headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft", "command"] row = [timestamp, target_id, x, y, z, command] return append_row(filename, row, headers=headers) def append_received_position( timestamp: float, target_id: int, x: float, y: float, z: float ): """Append a received-position entry for IO tracing. The row contains a timestamp, target id and position in feet. """ filename = DEBUG_CONFIG.get("io_trace_received_filename", "received_positions.csv") headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft"] row = [timestamp, target_id, x, y, z] return append_row(filename, row, headers=headers)