SXXXXXXX_ControlPanel/VideoReceiverSFP/core/dump_manager.py
2026-01-19 14:37:12 +01:00

235 lines
8.9 KiB
Python

"""DumpManager: centralized dump saving, rotation and video recording for VideoReceiverSFP.
Minimal, robust implementation focusing on pruning existing files by
category based on modification time and handling real-time video encoding.
"""
from __future__ import annotations
import os
import time
import pathlib
import logging
from collections import deque
from typing import Optional, Dict
try:
import numpy as np
except Exception:
np = None
try:
from PIL import Image
except Exception:
Image = None
try:
import cv2
except Exception:
cv2 = None
logger = logging.getLogger("VideoReceiverSFP.dump_manager")
class DumpManager:
def __init__(self, dumps_dir: str, keep_count: int = 100):
self.dumps_dir = os.path.abspath(dumps_dir)
os.makedirs(self.dumps_dir, exist_ok=True)
self.keep = int(keep_count)
self._saved = {
'mfd': deque(),
'sar': deque(),
'unknown': deque(),
'bin': deque(),
'frame': deque()
}
# Video recording state
self._video_writers = {}
self._video_start_times: Dict[str, float] = {}
self._last_frame_timestamps: Dict[str, float] = {}
self._video_fps_map: Dict[str, float] = {}
# Maximum duplicate frames to write when spacing is large (safety cap)
self._max_duplicate_frames = 1000
# Track last produced video file paths per category
self._video_paths: Dict[str, str] = {}
# prune existing files at startup
for cat in list(self._saved.keys()):
try:
self._prune_category(cat)
except Exception:
pass
def _timestamp(self) -> str:
"""Generate a formatted timestamp for filenames."""
now = time.time()
ms = int(now * 1000) % 1000
return time.strftime('%Y%m%d_%H%M%S') + f"_{ms:03d}"
def save_bin(self, payload: bytes, category: str = 'unknown') -> Optional[str]:
"""Save raw binary payload to disk."""
try:
ts = self._timestamp()
fname = f'VideoReceiverSFP_{category}_{ts}.bin'
path = os.path.join(self.dumps_dir, fname)
with open(path, 'wb') as fh:
fh.write(payload)
self._enqueue(path, category)
return path
except Exception:
return None
def save_preview_from_pil(self, pil_image, category: str = 'unknown', fmt: str = 'bmp') -> Optional[str]:
"""Save a PIL image to disk with rotation."""
if pil_image is None:
return None
try:
ts = self._timestamp()
ext = fmt.lstrip('.')
fname = f'VideoReceiverSFP_{category}_{ts}.{ext}'
path = os.path.join(self.dumps_dir, fname)
img = pil_image
if ext.lower() == 'png' or ext.lower() == 'bmp':
if img.mode != 'RGB' and img.mode != 'L':
img = img.convert('RGB')
img.save(path)
self._enqueue(path, category)
return path
except Exception:
return None
def start_video_record(self, category: str, width: int, height: int, fps: int = 20) -> bool:
"""Initialize an OpenCV VideoWriter for a specific category."""
if cv2 is None:
logger.error("OpenCV not available, cannot record video")
return False
try:
ts = self._timestamp()
fname = f'VideoReceiverSFP_{category}_{ts}.avi'
path = os.path.join(self.dumps_dir, fname)
# Use XVID codec for AVI container
fourcc = cv2.VideoWriter_fourcc(*'XVID')
writer = cv2.VideoWriter(path, fourcc, float(fps), (width, height))
if not writer.isOpened():
logger.error("Failed to open VideoWriter for %s", path)
return False
self._video_writers[category] = writer
self._video_start_times[category] = time.time()
# remember video path for later retrieval
self._video_paths[category] = path
# store configured fps per category
try:
self._video_fps_map[category] = float(fps)
except Exception:
self._video_fps_map[category] = float(20)
self._last_frame_timestamps[category] = time.time()
logger.info("Started video recording: %s (%dx%d @ %d FPS)", path, width, height, fps)
return True
except Exception:
logger.exception("Error starting video recording")
return False
def write_video_frame(self, frame, category: str):
"""Write a frame to the active video writer, converting from PIL/RGB if needed."""
if category not in self._video_writers:
return
try:
writer = self._video_writers[category]
# Convert PIL to numpy BGR
if Image is not None and isinstance(frame, Image.Image):
cv_frame = cv2.cvtColor(np.array(frame.convert('RGB')), cv2.COLOR_RGB2BGR)
else:
cv_frame = frame
# Ensure correct format (uint8, BGR)
if cv_frame.dtype != np.uint8:
cv_frame = cv_frame.astype(np.uint8)
# Determine how many frames to write to reflect real arrival timing.
now = time.time()
last = self._last_frame_timestamps.get(category, now)
fps = float(self._video_fps_map.get(category, 20.0))
# delta seconds since last written frame
delta = max(0.0, now - last)
# compute duplicates: at least 1 frame
try:
dup = max(1, int(round(delta * fps)))
except Exception:
dup = 1
# cap duplicates to avoid runaway file sizes
if dup > self._max_duplicate_frames:
dup = self._max_duplicate_frames
for _ in range(dup):
writer.write(cv_frame)
# update last timestamp to now
self._last_frame_timestamps[category] = now
except Exception:
logger.exception("Error writing frame to video %s", category)
def stop_video_record(self, category: str):
"""Release the video writer for a category."""
if category in self._video_writers:
try:
self._video_writers[category].release()
logger.info("Stopped video recording for %s", category)
except Exception:
logger.exception("Error closing video writer")
finally:
self._video_writers.pop(category, None)
self._video_start_times.pop(category, None)
self._last_frame_timestamps.pop(category, None)
try:
self._video_paths.pop(category, None)
except Exception:
pass
def _enqueue(self, path: str, category: str) -> None:
"""Track saved file and trigger pruning."""
cat = category if category in self._saved else 'unknown'
self._saved[cat].append(path)
self._prune_category(cat)
def get_last_saved(self, category: str) -> Optional[str]:
"""Return the last saved preview path for a category, or None."""
try:
cat = category if category in self._saved else 'unknown'
if not self._saved.get(cat):
return None
return self._saved[cat][-1]
except Exception:
return None
def get_last_video_path(self, category: Optional[str] = None) -> Optional[str]:
"""Return the last started video path.
If `category` is provided, return that category's last video path.
Otherwise return the most-recently-started video across categories.
"""
try:
if category:
return self._video_paths.get(category)
# choose latest by _video_start_times
best_cat = None
best_ts = 0.0
for cat, ts in self._video_start_times.items():
if ts and ts > best_ts:
best_ts = ts
best_cat = cat
if best_cat:
return self._video_paths.get(best_cat)
return None
except Exception:
return None
def _prune_category(self, category: str) -> None:
"""Remove oldest files if they exceed the keep limit."""
prefix = f'VideoReceiverSFP_{category}_'
pdir = pathlib.Path(self.dumps_dir)
files = [p for p in pdir.iterdir() if p.is_file() and p.name.startswith(prefix)]
if len(files) <= self.keep:
return
# Sort by modification time (oldest first)
files_sorted = sorted(files, key=lambda p: p.stat().st_mtime)
remove_count = len(files_sorted) - self.keep
for old in files_sorted[:remove_count]:
try:
old.unlink()
except Exception:
pass