S1005403_RisCC/target_simulator/utils/csv_logger.py

152 lines
5.1 KiB
Python

# target_simulator/utils/csv_logger.py
"""
CSV helpers for IO tracing.
... (docstring existing)
"""
import csv
import os
import time
import threading
import atexit
from typing import Iterable, Any, Dict, List, Tuple, Optional
from collections import deque
from target_simulator.config import DEBUG_CONFIG
_CSV_BUFFER_LOCK = threading.Lock()
_CSV_BUFFERS: Dict[str, deque] = {}
_CSV_FLUSH_THREAD: Optional[threading.Thread] = None
_CSV_STOP_EVENT = threading.Event()
_CSV_FLUSH_INTERVAL_S = 2.0
_CSV_MAX_BUFFER_SIZE = 1000
def flush_all_csv_buffers():
"""
Synchronously flushes all buffered CSV rows to their respective files.
This function can be called to ensure data is written to disk before an operation.
"""
with _CSV_BUFFER_LOCK:
for filename, buffer in list(_CSV_BUFFERS.items()):
if not buffer:
continue
temp_folder = _ensure_temp_folder()
if not temp_folder:
continue
file_path = os.path.join(temp_folder, filename)
# Check if file is empty or doesn't exist to decide whether to write headers
write_headers = (
not os.path.exists(file_path) or os.path.getsize(file_path) == 0
)
try:
with open(file_path, "a", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
# Extract all items from buffer to write in one go
rows_to_write = []
while buffer:
rows_to_write.append(buffer.popleft())
if not rows_to_write:
continue
# If headers need to be written, take them from the first buffered item
if write_headers:
_, headers = rows_to_write[0]
if headers:
writer.writerow(list(headers))
# Write all the rows
for row, _ in rows_to_write:
writer.writerow(list(row))
except Exception:
# On error, we can try to put items back in buffer or log the loss
# For now, clear to prevent repeated failures on same data
pass
def _csv_flush_worker():
"""Background thread that periodically flushes buffered CSV rows to disk."""
while not _CSV_STOP_EVENT.wait(_CSV_FLUSH_INTERVAL_S):
flush_all_csv_buffers()
# Final flush on shutdown
flush_all_csv_buffers()
def _ensure_csv_flush_thread():
"""Ensures the background CSV flush thread is running."""
global _CSV_FLUSH_THREAD
if _CSV_FLUSH_THREAD is None or not _CSV_FLUSH_THREAD.is_alive():
_CSV_STOP_EVENT.clear()
_CSV_FLUSH_THREAD = threading.Thread(
target=_csv_flush_worker, daemon=True, name="CSVFlushThread"
)
_CSV_FLUSH_THREAD.start()
atexit.register(_shutdown_csv_logger)
def _shutdown_csv_logger():
"""Signals the flush thread to stop and performs a final flush."""
_CSV_STOP_EVENT.set()
if _CSV_FLUSH_THREAD and _CSV_FLUSH_THREAD.is_alive():
_CSV_FLUSH_THREAD.join(timeout=2.0)
flush_all_csv_buffers() # Final synchronous flush
def _ensure_temp_folder() -> Optional[str]:
"""Ensures the temporary directory exists and returns its path."""
temp_folder = DEBUG_CONFIG.get("temp_folder_name", "Temp")
try:
os.makedirs(temp_folder, exist_ok=True)
return temp_folder
except Exception:
return None
def append_row(
filename: str, row: Iterable[Any], headers: Optional[Iterable[str]] = None
):
"""
Appends a row to a CSV file buffer, to be written asynchronously.
"""
if not DEBUG_CONFIG.get("enable_io_trace", True):
return False
_ensure_csv_flush_thread()
with _CSV_BUFFER_LOCK:
if filename not in _CSV_BUFFERS:
_CSV_BUFFERS[filename] = deque()
# Store row and headers together
_CSV_BUFFERS[filename].append((row, headers))
# Optional: trigger an early flush if buffer gets too large
if len(_CSV_BUFFERS[filename]) >= _CSV_MAX_BUFFER_SIZE:
threading.Thread(target=flush_all_csv_buffers, daemon=True).start()
return True
def append_sent_position(
timestamp: float, target_id: int, x: float, y: float, z: float, command: str
):
"""Logs a sent target position to the corresponding trace file."""
filename = DEBUG_CONFIG.get("io_trace_sent_filename", "sent_positions.csv")
headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft", "command"]
row = [timestamp, target_id, x, y, z, command]
return append_row(filename, row, headers=headers)
def append_received_position(
timestamp: float, target_id: int, x: float, y: float, z: float
):
"""Logs a received target position to the corresponding trace file."""
filename = DEBUG_CONFIG.get("io_trace_received_filename", "received_positions.csv")
headers = ["timestamp", "target_id", "x_ft", "y_ft", "z_ft"]
row = [timestamp, target_id, x, y, z]
return append_row(filename, row, headers=headers)