275 lines
16 KiB
Python
275 lines
16 KiB
Python
# FlightMonitor/data/storage.py
|
|
import sqlite3
|
|
import os
|
|
from datetime import (
|
|
datetime,
|
|
timezone,
|
|
)
|
|
from typing import Optional, List, Dict, Any
|
|
import time
|
|
import threading
|
|
|
|
from flightmonitor.data.common_models import CanonicalFlightState
|
|
from flightmonitor.utils.logger import get_logger
|
|
from flightmonitor.data import config as app_config
|
|
|
|
module_logger = get_logger(__name__)
|
|
|
|
|
|
class DataStorage:
|
|
"""
|
|
Handles storage of flight data into daily SQLite database files, managing
|
|
a pool of connections to avoid file locking issues.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._connection_pool: Dict[str, sqlite3.Connection] = {}
|
|
self._pool_lock = threading.Lock()
|
|
self._ensure_database_directory()
|
|
module_logger.info("DataStorage initialized with a connection pool.")
|
|
|
|
def _ensure_database_directory(self):
|
|
try:
|
|
abs_db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY)
|
|
os.makedirs(abs_db_dir, exist_ok=True)
|
|
if not os.path.isdir(abs_db_dir) or not os.access(abs_db_dir, os.W_OK):
|
|
raise OSError(f"Directory '{abs_db_dir}' is not a valid writable directory.")
|
|
except OSError as e:
|
|
module_logger.critical(f"Database directory error: {e}", exc_info=True)
|
|
raise
|
|
|
|
def _get_db_path_for_date_str(self, date_str: str) -> str:
|
|
"""Helper to get a DB path from a 'YYYY-MM-DD' string."""
|
|
target_date = datetime.strptime(date_str, "%Y-%m-%d")
|
|
db_filename = target_date.strftime(app_config.DATABASE_FILENAME_FORMAT)
|
|
return os.path.join(os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename)
|
|
|
|
def _get_db_path_for_datetime(self, target_date_utc: Optional[datetime] = None) -> str:
|
|
"""Helper to get a DB path from a datetime object."""
|
|
date = target_date_utc or datetime.now(timezone.utc)
|
|
db_filename = date.strftime(app_config.DATABASE_FILENAME_FORMAT)
|
|
return os.path.join(os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename)
|
|
|
|
def _get_connection(self, db_path: str, read_only: bool = False) -> Optional[sqlite3.Connection]:
|
|
"""Gets a connection from the pool, creating it if necessary."""
|
|
with self._pool_lock:
|
|
if db_path in self._connection_pool:
|
|
conn = self._connection_pool[db_path]
|
|
try:
|
|
conn.cursor()
|
|
return conn
|
|
except sqlite3.Error: # Connection has been closed
|
|
module_logger.warning(f"Found a closed connection for {db_path} in pool. Recreating.")
|
|
self._connection_pool.pop(db_path, None)
|
|
|
|
try:
|
|
uri_path = f"file:{db_path}{'?mode=ro' if read_only else ''}"
|
|
conn = sqlite3.connect(uri_path, uri=True, timeout=10.0)
|
|
conn.row_factory = sqlite3.Row
|
|
if not read_only:
|
|
conn.execute("PRAGMA foreign_keys = ON;")
|
|
self._init_tables_if_not_exist(conn)
|
|
|
|
self._connection_pool[db_path] = conn
|
|
module_logger.info(f"Opened new DB connection to: {os.path.basename(db_path)} (pool size: {len(self._connection_pool)})")
|
|
return conn
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"Failed to connect to database {db_path}: {e}", exc_info=True)
|
|
return None
|
|
|
|
def _close_connection(self, db_path: str):
|
|
"""Closes a specific connection and removes it from the pool."""
|
|
with self._pool_lock:
|
|
if db_path in self._connection_pool:
|
|
conn = self._connection_pool.pop(db_path)
|
|
try:
|
|
conn.close()
|
|
module_logger.info(f"Closed DB connection to: {os.path.basename(db_path)} (pool size: {len(self._connection_pool)})")
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"Error closing DB connection to {db_path}: {e}")
|
|
|
|
def close_all_connections(self):
|
|
"""Closes all connections in the pool. Called on application exit."""
|
|
with self._pool_lock:
|
|
paths_to_close = list(self._connection_pool.keys())
|
|
for db_path in paths_to_close:
|
|
if db_path in self._connection_pool: # Check again in case of concurrent modification
|
|
conn = self._connection_pool.pop(db_path)
|
|
try:
|
|
conn.close()
|
|
except sqlite3.Error:
|
|
pass
|
|
module_logger.info("All database connections in the pool have been closed.")
|
|
|
|
def _init_tables_if_not_exist(self, db_conn: sqlite3.Connection):
|
|
try:
|
|
with db_conn:
|
|
db_conn.execute("CREATE TABLE IF NOT EXISTS flights (flight_id INTEGER PRIMARY KEY, icao24 TEXT NOT NULL UNIQUE, callsign TEXT, origin_country TEXT, first_seen_day REAL NOT NULL, last_seen_day REAL NOT NULL)")
|
|
db_conn.execute("CREATE TABLE IF NOT EXISTS positions (position_id INTEGER PRIMARY KEY, flight_id INTEGER NOT NULL, detection_timestamp REAL NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, baro_altitude_m REAL, geo_altitude_m REAL, velocity_mps REAL, true_track_deg REAL, vertical_rate_mps REAL, on_ground BOOLEAN NOT NULL, squawk TEXT, spi BOOLEAN, position_source TEXT, raw_data_provider TEXT, recorded_at REAL NOT NULL, FOREIGN KEY (flight_id) REFERENCES flights(flight_id) ON DELETE CASCADE, UNIQUE(flight_id, detection_timestamp))")
|
|
db_conn.execute("CREATE INDEX IF NOT EXISTS idx_positions_flight_id_timestamp ON positions (flight_id, detection_timestamp)")
|
|
db_conn.execute("CREATE INDEX IF NOT EXISTS idx_flights_icao24 ON flights (icao24)")
|
|
db_conn.execute("CREATE INDEX IF NOT EXISTS idx_positions_detection_timestamp ON positions (detection_timestamp)")
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"Error initializing tables: {e}", exc_info=True)
|
|
|
|
def add_or_update_flight_daily(self, icao24: str, callsign: Optional[str], origin_country: Optional[str], detection_timestamp: float) -> Optional[int]:
|
|
event_date_utc = datetime.fromtimestamp(detection_timestamp, timezone.utc)
|
|
db_path = self._get_db_path_for_datetime(event_date_utc)
|
|
conn = self._get_connection(db_path)
|
|
if not conn: return None
|
|
|
|
flight_id: Optional[int] = None
|
|
try:
|
|
with conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT flight_id, callsign FROM flights WHERE icao24 = ?", (icao24,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
flight_id = row["flight_id"]
|
|
existing_callsign = row["callsign"]
|
|
final_callsign = existing_callsign or callsign
|
|
if callsign and callsign != existing_callsign:
|
|
final_callsign = callsign
|
|
cursor.execute("UPDATE flights SET last_seen_day = ?, callsign = ? WHERE flight_id = ?", (detection_timestamp, final_callsign, flight_id))
|
|
else:
|
|
cursor.execute("INSERT INTO flights (icao24, callsign, origin_country, first_seen_day, last_seen_day) VALUES (?, ?, ?, ?, ?)",(icao24, callsign, origin_country, detection_timestamp, detection_timestamp))
|
|
flight_id = cursor.lastrowid
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"DB error in add_or_update_flight_daily for icao24={icao24}: {e}", exc_info=True)
|
|
flight_id = None
|
|
return flight_id
|
|
|
|
def add_position_daily(self, flight_id: int, flight_state_obj: CanonicalFlightState) -> Optional[int]:
|
|
if not flight_id or not isinstance(flight_state_obj, CanonicalFlightState) or flight_state_obj.latitude is None or flight_state_obj.longitude is None:
|
|
return None
|
|
|
|
event_date_utc = datetime.fromtimestamp(flight_state_obj.timestamp, timezone.utc)
|
|
db_path = self._get_db_path_for_datetime(event_date_utc)
|
|
conn = self._get_connection(db_path)
|
|
if not conn: return None
|
|
|
|
position_id: Optional[int] = None
|
|
try:
|
|
with conn:
|
|
sql = "INSERT OR IGNORE INTO positions (flight_id, detection_timestamp, latitude, longitude, baro_altitude_m, geo_altitude_m, velocity_mps, true_track_deg, vertical_rate_mps, on_ground, squawk, spi, position_source, raw_data_provider, recorded_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
|
params = (flight_id, flight_state_obj.timestamp, flight_state_obj.latitude, flight_state_obj.longitude, flight_state_obj.baro_altitude_m, flight_state_obj.geo_altitude_m, flight_state_obj.velocity_mps, flight_state_obj.true_track_deg, flight_state_obj.vertical_rate_mps, flight_state_obj.on_ground, flight_state_obj.squawk, flight_state_obj.spi, str(flight_state_obj.position_source) if flight_state_obj.position_source is not None else None, flight_state_obj.raw_data_provider, time.time())
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql, params)
|
|
if cursor.rowcount > 0:
|
|
position_id = cursor.lastrowid
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"DB error in add_position_daily for flight_id={flight_id}: {e}", exc_info=True)
|
|
position_id = None
|
|
return position_id
|
|
|
|
def get_flight_track_for_icao_on_date(self, icao24_to_find: str, target_date_utc: Optional[datetime] = None, since_timestamp: Optional[float] = None) -> List[CanonicalFlightState]:
|
|
date = target_date_utc or datetime.now(timezone.utc)
|
|
db_path = self._get_db_path_for_datetime(date)
|
|
if not os.path.exists(db_path): return []
|
|
conn = self._get_connection(db_path, read_only=True)
|
|
if not conn: return []
|
|
|
|
track_points: List[CanonicalFlightState] = []
|
|
try:
|
|
with conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT flight_id FROM flights WHERE icao24 = ?", (icao24_to_find.lower(),))
|
|
flight_row = cursor.fetchone()
|
|
if not flight_row: return []
|
|
|
|
flight_id = flight_row["flight_id"]
|
|
sql_query = "SELECT * FROM positions WHERE flight_id = ?"
|
|
params: List[Any] = [flight_id]
|
|
if since_timestamp is not None:
|
|
sql_query += " AND detection_timestamp >= ?"
|
|
params.append(since_timestamp)
|
|
sql_query += " ORDER BY detection_timestamp ASC"
|
|
|
|
for pos_row in cursor.execute(sql_query, params):
|
|
track_points.append(CanonicalFlightState(icao24=icao24_to_find, timestamp=pos_row["detection_timestamp"], last_contact_timestamp=pos_row["detection_timestamp"], latitude=pos_row["latitude"], longitude=pos_row["longitude"], on_ground=bool(pos_row["on_ground"]), baro_altitude_m=pos_row["baro_altitude_m"], geo_altitude_m=pos_row["geo_altitude_m"], velocity_mps=pos_row["velocity_mps"], true_track_deg=pos_row["true_track_deg"], vertical_rate_mps=pos_row["vertical_rate_mps"], squawk=pos_row["squawk"], spi=(bool(pos_row["spi"]) if pos_row["spi"] is not None else None), position_source=pos_row["position_source"], raw_data_provider=pos_row["raw_data_provider"]))
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"DB error retrieving track for {icao24_to_find}: {e}", exc_info=True)
|
|
return track_points
|
|
|
|
def get_available_recording_dates(self) -> List[str]:
|
|
dates = set()
|
|
db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY)
|
|
if not os.path.isdir(db_dir): return []
|
|
prefix = app_config.DATABASE_FILENAME_FORMAT.split("%")[0]
|
|
suffix = ".db"
|
|
for filename in os.listdir(db_dir):
|
|
if filename.startswith(prefix) and filename.endswith(suffix):
|
|
date_part = filename[len(prefix) : -len(suffix)]
|
|
try:
|
|
datetime.strptime(date_part, "%Y-%m-%d")
|
|
dates.add(date_part)
|
|
except ValueError: pass
|
|
return sorted(list(dates), reverse=True)
|
|
|
|
def get_daily_recording_summary(self, date_str: str) -> Optional[Dict[str, Any]]:
|
|
db_path = self._get_db_path_for_date_str(date_str)
|
|
if not os.path.exists(db_path): return None
|
|
conn = self._get_connection(db_path, read_only=True)
|
|
if not conn: return None
|
|
|
|
summary = {"file_size_mb": 0.0, "time_range": None, "flights_count": 0, "positions_count": 0, "bbox": None}
|
|
try:
|
|
summary["file_size_mb"] = round(os.path.getsize(db_path) / (1024 * 1024), 2)
|
|
with conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT MIN(first_seen_day), MAX(last_seen_day) FROM flights")
|
|
time_res = cursor.fetchone()
|
|
if time_res and time_res[0] is not None:
|
|
summary["time_range"] = (time_res[0], time_res[1])
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM flights")
|
|
summary["flights_count"] = cursor.fetchone()[0]
|
|
cursor.execute("SELECT COUNT(*) FROM positions")
|
|
summary["positions_count"] = cursor.fetchone()[0]
|
|
cursor.execute("SELECT MIN(latitude), MAX(latitude), MIN(longitude), MAX(longitude) FROM positions")
|
|
bbox_res = cursor.fetchone()
|
|
if bbox_res and bbox_res[0] is not None:
|
|
summary["bbox"] = {"lat_min": bbox_res[0], "lat_max": bbox_res[1], "lon_min": bbox_res[2], "lon_max": bbox_res[3]}
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"DB summary error for {date_str}: {e}", exc_info=True)
|
|
return None
|
|
return summary
|
|
|
|
def get_positions_in_range(self, date_str: str, start_ts: float, end_ts: float) -> List[CanonicalFlightState]:
|
|
db_path = self._get_db_path_for_date_str(date_str)
|
|
if not os.path.exists(db_path): return []
|
|
conn = self._get_connection(db_path, read_only=True)
|
|
if not conn: return []
|
|
|
|
states = []
|
|
try:
|
|
with conn:
|
|
sql = "SELECT p.*, f.icao24, f.callsign, f.origin_country FROM positions p JOIN flights f ON p.flight_id = f.flight_id WHERE p.detection_timestamp BETWEEN ? AND ? ORDER BY p.detection_timestamp ASC;"
|
|
for row in conn.cursor().execute(sql, (start_ts, end_ts)):
|
|
states.append(CanonicalFlightState(icao24=row["icao24"], callsign=row["callsign"], origin_country=row["origin_country"], timestamp=row["detection_timestamp"], last_contact_timestamp=row["detection_timestamp"], latitude=row["latitude"], longitude=row["longitude"], baro_altitude_m=row["baro_altitude_m"], geo_altitude_m=row["geo_altitude_m"], on_ground=bool(row["on_ground"]), velocity_mps=row["velocity_mps"], true_track_deg=row["true_track_deg"], vertical_rate_mps=row["vertical_rate_mps"], squawk=row["squawk"], spi=bool(row["spi"]), position_source=row["position_source"], raw_data_provider=row["raw_data_provider"]))
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"DB error getting positions in range for {date_str}: {e}", exc_info=True)
|
|
return states
|
|
|
|
def delete_daily_db(self, date_str: str) -> bool:
|
|
"""Deletes the daily database file for the given date."""
|
|
try:
|
|
db_path_to_delete = self._get_db_path_for_date_str(date_str)
|
|
except ValueError:
|
|
module_logger.error(f"Invalid date string for deletion: {date_str}")
|
|
return False
|
|
|
|
self._close_connection(db_path_to_delete)
|
|
time.sleep(0.1)
|
|
|
|
try:
|
|
if os.path.exists(db_path_to_delete):
|
|
os.remove(db_path_to_delete)
|
|
module_logger.info(f"Successfully deleted database: {db_path_to_delete}")
|
|
return True
|
|
else:
|
|
module_logger.warning(f"Attempted to delete non-existent database: {db_path_to_delete}")
|
|
return True # If it doesn't exist, it's "deleted"
|
|
except OSError as e:
|
|
module_logger.error(f"OS error deleting database {db_path_to_delete}: {e}", exc_info=True)
|
|
return False |