SXXXXXXX_ControlPanel/VideoReceiverSFP/core/dump_manager.py

126 lines
4.2 KiB
Python

"""DumpManager: centralized dump saving and rotation for VideoReceiverSFP.
Minimal, robust implementation focusing on pruning existing files by
category based on modification time so the number of files does not
exceed the configured keep limit.
"""
from __future__ import annotations
import os
import time
import pathlib
from collections import deque
from typing import Optional
try:
import numpy as np
except Exception:
np = None
try:
from PIL import Image
except Exception:
Image = None
class DumpManager:
def __init__(self, dumps_dir: str, keep_count: int = 1000):
self.dumps_dir = os.path.abspath(dumps_dir)
os.makedirs(self.dumps_dir, exist_ok=True)
self.keep = int(keep_count)
self._saved = {'mfd': deque(), 'sar': deque(), 'unknown': deque(), 'bin': deque(), 'frame': deque()}
# prune existing files at startup
for cat in list(self._saved.keys()):
try:
self._prune_category(cat)
except Exception:
pass
def _timestamp(self) -> str:
return time.strftime('%Y%m%d_%H%M%S') + (f"_{int(time.time() * 1000) % 1000:03d}")
def save_bin(self, payload: bytes, category: str = 'unknown') -> Optional[str]:
try:
ts = self._timestamp()
fname = f'VideoReceiverSFP_{category}_{ts}.bin'
path = os.path.join(self.dumps_dir, fname)
with open(path, 'wb') as fh:
fh.write(payload)
self._enqueue(path, category)
return path
except Exception:
return None
def save_preview_from_array(self, arr, category: str = 'unknown', fmt: str = 'bmp') -> Optional[str]:
if Image is None:
return None
try:
ts = self._timestamp()
ext = fmt.lstrip('.')
fname = f'VideoReceiverSFP_{category}_{ts}.{ext}'
path = os.path.join(self.dumps_dir, fname)
if np is not None and not isinstance(arr, Image.Image):
img = Image.fromarray(arr, mode='L')
else:
img = arr if isinstance(arr, Image.Image) else None
if img is None:
return None
if ext.lower() == 'png':
img = img.convert('RGB')
img.save(path)
self._enqueue(path, category)
return path
except Exception:
return None
def save_preview_from_pil(self, pil_image, category: str = 'unknown', fmt: str = 'bmp') -> Optional[str]:
if pil_image is None:
return None
try:
ts = self._timestamp()
ext = fmt.lstrip('.')
fname = f'VideoReceiverSFP_{category}_{ts}.{ext}'
path = os.path.join(self.dumps_dir, fname)
img = pil_image
if ext.lower() == 'png':
img = img.convert('RGB')
img.save(path)
self._enqueue(path, category)
return path
except Exception:
return None
def _enqueue(self, path: str, category: str) -> None:
cat = category if category in self._saved else 'unknown'
try:
self._saved[cat].append(path)
except Exception:
pass
try:
self._prune_category(cat)
except Exception:
# best-effort fallback: remove tracked entries
try:
while len(self._saved[cat]) > self.keep:
old = self._saved[cat].popleft()
try:
pathlib.Path(old).unlink()
except Exception:
pass
except Exception:
pass
def _prune_category(self, category: str) -> None:
prefix = f'VideoReceiverSFP_{category}_'
pdir = pathlib.Path(self.dumps_dir)
files = [p for p in pdir.iterdir() if p.is_file() and p.name.startswith(prefix)]
if len(files) <= self.keep:
return
files_sorted = sorted(files, key=lambda p: p.stat().st_mtime)
remove_count = len(files_sorted) - self.keep
for old in files_sorted[:remove_count]:
try:
old.unlink()
except Exception:
pass