SXXXXXXX_FlightMonitor/flightmonitor/data/storage.py

275 lines
16 KiB
Python

# FlightMonitor/data/storage.py
import sqlite3
import os
from datetime import (
datetime,
timezone,
)
from typing import Optional, List, Dict, Any
import time
import threading
from flightmonitor.data.common_models import CanonicalFlightState
from flightmonitor.utils.logger import get_logger
from flightmonitor.data import config as app_config
module_logger = get_logger(__name__)
class DataStorage:
"""
Handles storage of flight data into daily SQLite database files, managing
a pool of connections to avoid file locking issues.
"""
def __init__(self):
self._connection_pool: Dict[str, sqlite3.Connection] = {}
self._pool_lock = threading.Lock()
self._ensure_database_directory()
module_logger.info("DataStorage initialized with a connection pool.")
def _ensure_database_directory(self):
try:
abs_db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY)
os.makedirs(abs_db_dir, exist_ok=True)
if not os.path.isdir(abs_db_dir) or not os.access(abs_db_dir, os.W_OK):
raise OSError(f"Directory '{abs_db_dir}' is not a valid writable directory.")
except OSError as e:
module_logger.critical(f"Database directory error: {e}", exc_info=True)
raise
def _get_db_path_for_date_str(self, date_str: str) -> str:
"""Helper to get a DB path from a 'YYYY-MM-DD' string."""
target_date = datetime.strptime(date_str, "%Y-%m-%d")
db_filename = target_date.strftime(app_config.DATABASE_FILENAME_FORMAT)
return os.path.join(os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename)
def _get_db_path_for_datetime(self, target_date_utc: Optional[datetime] = None) -> str:
"""Helper to get a DB path from a datetime object."""
date = target_date_utc or datetime.now(timezone.utc)
db_filename = date.strftime(app_config.DATABASE_FILENAME_FORMAT)
return os.path.join(os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename)
def _get_connection(self, db_path: str, read_only: bool = False) -> Optional[sqlite3.Connection]:
"""Gets a connection from the pool, creating it if necessary."""
with self._pool_lock:
if db_path in self._connection_pool:
conn = self._connection_pool[db_path]
try:
conn.cursor()
return conn
except sqlite3.Error: # Connection has been closed
module_logger.warning(f"Found a closed connection for {db_path} in pool. Recreating.")
self._connection_pool.pop(db_path, None)
try:
uri_path = f"file:{db_path}{'?mode=ro' if read_only else ''}"
conn = sqlite3.connect(uri_path, uri=True, timeout=10.0)
conn.row_factory = sqlite3.Row
if not read_only:
conn.execute("PRAGMA foreign_keys = ON;")
self._init_tables_if_not_exist(conn)
self._connection_pool[db_path] = conn
module_logger.info(f"Opened new DB connection to: {os.path.basename(db_path)} (pool size: {len(self._connection_pool)})")
return conn
except sqlite3.Error as e:
module_logger.error(f"Failed to connect to database {db_path}: {e}", exc_info=True)
return None
def _close_connection(self, db_path: str):
"""Closes a specific connection and removes it from the pool."""
with self._pool_lock:
if db_path in self._connection_pool:
conn = self._connection_pool.pop(db_path)
try:
conn.close()
module_logger.info(f"Closed DB connection to: {os.path.basename(db_path)} (pool size: {len(self._connection_pool)})")
except sqlite3.Error as e:
module_logger.error(f"Error closing DB connection to {db_path}: {e}")
def close_all_connections(self):
"""Closes all connections in the pool. Called on application exit."""
with self._pool_lock:
paths_to_close = list(self._connection_pool.keys())
for db_path in paths_to_close:
if db_path in self._connection_pool: # Check again in case of concurrent modification
conn = self._connection_pool.pop(db_path)
try:
conn.close()
except sqlite3.Error:
pass
module_logger.info("All database connections in the pool have been closed.")
def _init_tables_if_not_exist(self, db_conn: sqlite3.Connection):
try:
with db_conn:
db_conn.execute("CREATE TABLE IF NOT EXISTS flights (flight_id INTEGER PRIMARY KEY, icao24 TEXT NOT NULL UNIQUE, callsign TEXT, origin_country TEXT, first_seen_day REAL NOT NULL, last_seen_day REAL NOT NULL)")
db_conn.execute("CREATE TABLE IF NOT EXISTS positions (position_id INTEGER PRIMARY KEY, flight_id INTEGER NOT NULL, detection_timestamp REAL NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, baro_altitude_m REAL, geo_altitude_m REAL, velocity_mps REAL, true_track_deg REAL, vertical_rate_mps REAL, on_ground BOOLEAN NOT NULL, squawk TEXT, spi BOOLEAN, position_source TEXT, raw_data_provider TEXT, recorded_at REAL NOT NULL, FOREIGN KEY (flight_id) REFERENCES flights(flight_id) ON DELETE CASCADE, UNIQUE(flight_id, detection_timestamp))")
db_conn.execute("CREATE INDEX IF NOT EXISTS idx_positions_flight_id_timestamp ON positions (flight_id, detection_timestamp)")
db_conn.execute("CREATE INDEX IF NOT EXISTS idx_flights_icao24 ON flights (icao24)")
db_conn.execute("CREATE INDEX IF NOT EXISTS idx_positions_detection_timestamp ON positions (detection_timestamp)")
except sqlite3.Error as e:
module_logger.error(f"Error initializing tables: {e}", exc_info=True)
def add_or_update_flight_daily(self, icao24: str, callsign: Optional[str], origin_country: Optional[str], detection_timestamp: float) -> Optional[int]:
event_date_utc = datetime.fromtimestamp(detection_timestamp, timezone.utc)
db_path = self._get_db_path_for_datetime(event_date_utc)
conn = self._get_connection(db_path)
if not conn: return None
flight_id: Optional[int] = None
try:
with conn:
cursor = conn.cursor()
cursor.execute("SELECT flight_id, callsign FROM flights WHERE icao24 = ?", (icao24,))
row = cursor.fetchone()
if row:
flight_id = row["flight_id"]
existing_callsign = row["callsign"]
final_callsign = existing_callsign or callsign
if callsign and callsign != existing_callsign:
final_callsign = callsign
cursor.execute("UPDATE flights SET last_seen_day = ?, callsign = ? WHERE flight_id = ?", (detection_timestamp, final_callsign, flight_id))
else:
cursor.execute("INSERT INTO flights (icao24, callsign, origin_country, first_seen_day, last_seen_day) VALUES (?, ?, ?, ?, ?)",(icao24, callsign, origin_country, detection_timestamp, detection_timestamp))
flight_id = cursor.lastrowid
except sqlite3.Error as e:
module_logger.error(f"DB error in add_or_update_flight_daily for icao24={icao24}: {e}", exc_info=True)
flight_id = None
return flight_id
def add_position_daily(self, flight_id: int, flight_state_obj: CanonicalFlightState) -> Optional[int]:
if not flight_id or not isinstance(flight_state_obj, CanonicalFlightState) or flight_state_obj.latitude is None or flight_state_obj.longitude is None:
return None
event_date_utc = datetime.fromtimestamp(flight_state_obj.timestamp, timezone.utc)
db_path = self._get_db_path_for_datetime(event_date_utc)
conn = self._get_connection(db_path)
if not conn: return None
position_id: Optional[int] = None
try:
with conn:
sql = "INSERT OR IGNORE INTO positions (flight_id, detection_timestamp, latitude, longitude, baro_altitude_m, geo_altitude_m, velocity_mps, true_track_deg, vertical_rate_mps, on_ground, squawk, spi, position_source, raw_data_provider, recorded_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
params = (flight_id, flight_state_obj.timestamp, flight_state_obj.latitude, flight_state_obj.longitude, flight_state_obj.baro_altitude_m, flight_state_obj.geo_altitude_m, flight_state_obj.velocity_mps, flight_state_obj.true_track_deg, flight_state_obj.vertical_rate_mps, flight_state_obj.on_ground, flight_state_obj.squawk, flight_state_obj.spi, str(flight_state_obj.position_source) if flight_state_obj.position_source is not None else None, flight_state_obj.raw_data_provider, time.time())
cursor = conn.cursor()
cursor.execute(sql, params)
if cursor.rowcount > 0:
position_id = cursor.lastrowid
except sqlite3.Error as e:
module_logger.error(f"DB error in add_position_daily for flight_id={flight_id}: {e}", exc_info=True)
position_id = None
return position_id
def get_flight_track_for_icao_on_date(self, icao24_to_find: str, target_date_utc: Optional[datetime] = None, since_timestamp: Optional[float] = None) -> List[CanonicalFlightState]:
date = target_date_utc or datetime.now(timezone.utc)
db_path = self._get_db_path_for_datetime(date)
if not os.path.exists(db_path): return []
conn = self._get_connection(db_path, read_only=True)
if not conn: return []
track_points: List[CanonicalFlightState] = []
try:
with conn:
cursor = conn.cursor()
cursor.execute("SELECT flight_id FROM flights WHERE icao24 = ?", (icao24_to_find.lower(),))
flight_row = cursor.fetchone()
if not flight_row: return []
flight_id = flight_row["flight_id"]
sql_query = "SELECT * FROM positions WHERE flight_id = ?"
params: List[Any] = [flight_id]
if since_timestamp is not None:
sql_query += " AND detection_timestamp >= ?"
params.append(since_timestamp)
sql_query += " ORDER BY detection_timestamp ASC"
for pos_row in cursor.execute(sql_query, params):
track_points.append(CanonicalFlightState(icao24=icao24_to_find, timestamp=pos_row["detection_timestamp"], last_contact_timestamp=pos_row["detection_timestamp"], latitude=pos_row["latitude"], longitude=pos_row["longitude"], on_ground=bool(pos_row["on_ground"]), baro_altitude_m=pos_row["baro_altitude_m"], geo_altitude_m=pos_row["geo_altitude_m"], velocity_mps=pos_row["velocity_mps"], true_track_deg=pos_row["true_track_deg"], vertical_rate_mps=pos_row["vertical_rate_mps"], squawk=pos_row["squawk"], spi=(bool(pos_row["spi"]) if pos_row["spi"] is not None else None), position_source=pos_row["position_source"], raw_data_provider=pos_row["raw_data_provider"]))
except sqlite3.Error as e:
module_logger.error(f"DB error retrieving track for {icao24_to_find}: {e}", exc_info=True)
return track_points
def get_available_recording_dates(self) -> List[str]:
dates = set()
db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY)
if not os.path.isdir(db_dir): return []
prefix = app_config.DATABASE_FILENAME_FORMAT.split("%")[0]
suffix = ".db"
for filename in os.listdir(db_dir):
if filename.startswith(prefix) and filename.endswith(suffix):
date_part = filename[len(prefix) : -len(suffix)]
try:
datetime.strptime(date_part, "%Y-%m-%d")
dates.add(date_part)
except ValueError: pass
return sorted(list(dates), reverse=True)
def get_daily_recording_summary(self, date_str: str) -> Optional[Dict[str, Any]]:
db_path = self._get_db_path_for_date_str(date_str)
if not os.path.exists(db_path): return None
conn = self._get_connection(db_path, read_only=True)
if not conn: return None
summary = {"file_size_mb": 0.0, "time_range": None, "flights_count": 0, "positions_count": 0, "bbox": None}
try:
summary["file_size_mb"] = round(os.path.getsize(db_path) / (1024 * 1024), 2)
with conn:
cursor = conn.cursor()
cursor.execute("SELECT MIN(first_seen_day), MAX(last_seen_day) FROM flights")
time_res = cursor.fetchone()
if time_res and time_res[0] is not None:
summary["time_range"] = (time_res[0], time_res[1])
cursor.execute("SELECT COUNT(*) FROM flights")
summary["flights_count"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM positions")
summary["positions_count"] = cursor.fetchone()[0]
cursor.execute("SELECT MIN(latitude), MAX(latitude), MIN(longitude), MAX(longitude) FROM positions")
bbox_res = cursor.fetchone()
if bbox_res and bbox_res[0] is not None:
summary["bbox"] = {"lat_min": bbox_res[0], "lat_max": bbox_res[1], "lon_min": bbox_res[2], "lon_max": bbox_res[3]}
except sqlite3.Error as e:
module_logger.error(f"DB summary error for {date_str}: {e}", exc_info=True)
return None
return summary
def get_positions_in_range(self, date_str: str, start_ts: float, end_ts: float) -> List[CanonicalFlightState]:
db_path = self._get_db_path_for_date_str(date_str)
if not os.path.exists(db_path): return []
conn = self._get_connection(db_path, read_only=True)
if not conn: return []
states = []
try:
with conn:
sql = "SELECT p.*, f.icao24, f.callsign, f.origin_country FROM positions p JOIN flights f ON p.flight_id = f.flight_id WHERE p.detection_timestamp BETWEEN ? AND ? ORDER BY p.detection_timestamp ASC;"
for row in conn.cursor().execute(sql, (start_ts, end_ts)):
states.append(CanonicalFlightState(icao24=row["icao24"], callsign=row["callsign"], origin_country=row["origin_country"], timestamp=row["detection_timestamp"], last_contact_timestamp=row["detection_timestamp"], latitude=row["latitude"], longitude=row["longitude"], baro_altitude_m=row["baro_altitude_m"], geo_altitude_m=row["geo_altitude_m"], on_ground=bool(row["on_ground"]), velocity_mps=row["velocity_mps"], true_track_deg=row["true_track_deg"], vertical_rate_mps=row["vertical_rate_mps"], squawk=row["squawk"], spi=bool(row["spi"]), position_source=row["position_source"], raw_data_provider=row["raw_data_provider"]))
except sqlite3.Error as e:
module_logger.error(f"DB error getting positions in range for {date_str}: {e}", exc_info=True)
return states
def delete_daily_db(self, date_str: str) -> bool:
"""Deletes the daily database file for the given date."""
try:
db_path_to_delete = self._get_db_path_for_date_str(date_str)
except ValueError:
module_logger.error(f"Invalid date string for deletion: {date_str}")
return False
self._close_connection(db_path_to_delete)
time.sleep(0.1)
try:
if os.path.exists(db_path_to_delete):
os.remove(db_path_to_delete)
module_logger.info(f"Successfully deleted database: {db_path_to_delete}")
return True
else:
module_logger.warning(f"Attempted to delete non-existent database: {db_path_to_delete}")
return True # If it doesn't exist, it's "deleted"
except OSError as e:
module_logger.error(f"OS error deleting database {db_path_to_delete}: {e}", exc_info=True)
return False