# FlightMonitor/data/storage.py import sqlite3 import os from datetime import ( datetime, timezone, ) from typing import Optional, List, Dict, Any import time import threading from flightmonitor.data.common_models import CanonicalFlightState from flightmonitor.utils.logger import get_logger from flightmonitor.data import config as app_config module_logger = get_logger(__name__) class DataStorage: """ Handles storage of flight data into daily SQLite database files, managing a pool of connections to avoid file locking issues. """ def __init__(self): self._connection_pool: Dict[str, sqlite3.Connection] = {} self._pool_lock = threading.Lock() self._ensure_database_directory() module_logger.info("DataStorage initialized with a connection pool.") def _ensure_database_directory(self): try: abs_db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY) os.makedirs(abs_db_dir, exist_ok=True) if not os.path.isdir(abs_db_dir) or not os.access(abs_db_dir, os.W_OK): raise OSError(f"Directory '{abs_db_dir}' is not a valid writable directory.") except OSError as e: module_logger.critical(f"Database directory error: {e}", exc_info=True) raise def _get_db_path_for_date_str(self, date_str: str) -> str: """Helper to get a DB path from a 'YYYY-MM-DD' string.""" target_date = datetime.strptime(date_str, "%Y-%m-%d") db_filename = target_date.strftime(app_config.DATABASE_FILENAME_FORMAT) return os.path.join(os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename) def _get_db_path_for_datetime(self, target_date_utc: Optional[datetime] = None) -> str: """Helper to get a DB path from a datetime object.""" date = target_date_utc or datetime.now(timezone.utc) db_filename = date.strftime(app_config.DATABASE_FILENAME_FORMAT) return os.path.join(os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename) def _get_connection(self, db_path: str, read_only: bool = False) -> Optional[sqlite3.Connection]: """Gets a connection from the pool, creating it if necessary.""" with self._pool_lock: if db_path in self._connection_pool: conn = self._connection_pool[db_path] try: conn.cursor() return conn except sqlite3.Error: # Connection has been closed module_logger.warning(f"Found a closed connection for {db_path} in pool. Recreating.") self._connection_pool.pop(db_path, None) try: uri_path = f"file:{db_path}{'?mode=ro' if read_only else ''}" conn = sqlite3.connect(uri_path, uri=True, timeout=10.0) conn.row_factory = sqlite3.Row if not read_only: conn.execute("PRAGMA foreign_keys = ON;") self._init_tables_if_not_exist(conn) self._connection_pool[db_path] = conn module_logger.info(f"Opened new DB connection to: {os.path.basename(db_path)} (pool size: {len(self._connection_pool)})") return conn except sqlite3.Error as e: module_logger.error(f"Failed to connect to database {db_path}: {e}", exc_info=True) return None def _close_connection(self, db_path: str): """Closes a specific connection and removes it from the pool.""" with self._pool_lock: if db_path in self._connection_pool: conn = self._connection_pool.pop(db_path) try: conn.close() module_logger.info(f"Closed DB connection to: {os.path.basename(db_path)} (pool size: {len(self._connection_pool)})") except sqlite3.Error as e: module_logger.error(f"Error closing DB connection to {db_path}: {e}") def close_all_connections(self): """Closes all connections in the pool. Called on application exit.""" with self._pool_lock: paths_to_close = list(self._connection_pool.keys()) for db_path in paths_to_close: if db_path in self._connection_pool: # Check again in case of concurrent modification conn = self._connection_pool.pop(db_path) try: conn.close() except sqlite3.Error: pass module_logger.info("All database connections in the pool have been closed.") def _init_tables_if_not_exist(self, db_conn: sqlite3.Connection): try: with db_conn: db_conn.execute("CREATE TABLE IF NOT EXISTS flights (flight_id INTEGER PRIMARY KEY, icao24 TEXT NOT NULL UNIQUE, callsign TEXT, origin_country TEXT, first_seen_day REAL NOT NULL, last_seen_day REAL NOT NULL)") db_conn.execute("CREATE TABLE IF NOT EXISTS positions (position_id INTEGER PRIMARY KEY, flight_id INTEGER NOT NULL, detection_timestamp REAL NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, baro_altitude_m REAL, geo_altitude_m REAL, velocity_mps REAL, true_track_deg REAL, vertical_rate_mps REAL, on_ground BOOLEAN NOT NULL, squawk TEXT, spi BOOLEAN, position_source TEXT, raw_data_provider TEXT, recorded_at REAL NOT NULL, FOREIGN KEY (flight_id) REFERENCES flights(flight_id) ON DELETE CASCADE, UNIQUE(flight_id, detection_timestamp))") db_conn.execute("CREATE INDEX IF NOT EXISTS idx_positions_flight_id_timestamp ON positions (flight_id, detection_timestamp)") db_conn.execute("CREATE INDEX IF NOT EXISTS idx_flights_icao24 ON flights (icao24)") db_conn.execute("CREATE INDEX IF NOT EXISTS idx_positions_detection_timestamp ON positions (detection_timestamp)") except sqlite3.Error as e: module_logger.error(f"Error initializing tables: {e}", exc_info=True) def add_or_update_flight_daily(self, icao24: str, callsign: Optional[str], origin_country: Optional[str], detection_timestamp: float) -> Optional[int]: event_date_utc = datetime.fromtimestamp(detection_timestamp, timezone.utc) db_path = self._get_db_path_for_datetime(event_date_utc) conn = self._get_connection(db_path) if not conn: return None flight_id: Optional[int] = None try: with conn: cursor = conn.cursor() cursor.execute("SELECT flight_id, callsign FROM flights WHERE icao24 = ?", (icao24,)) row = cursor.fetchone() if row: flight_id = row["flight_id"] existing_callsign = row["callsign"] final_callsign = existing_callsign or callsign if callsign and callsign != existing_callsign: final_callsign = callsign cursor.execute("UPDATE flights SET last_seen_day = ?, callsign = ? WHERE flight_id = ?", (detection_timestamp, final_callsign, flight_id)) else: cursor.execute("INSERT INTO flights (icao24, callsign, origin_country, first_seen_day, last_seen_day) VALUES (?, ?, ?, ?, ?)",(icao24, callsign, origin_country, detection_timestamp, detection_timestamp)) flight_id = cursor.lastrowid except sqlite3.Error as e: module_logger.error(f"DB error in add_or_update_flight_daily for icao24={icao24}: {e}", exc_info=True) flight_id = None return flight_id def add_position_daily(self, flight_id: int, flight_state_obj: CanonicalFlightState) -> Optional[int]: if not flight_id or not isinstance(flight_state_obj, CanonicalFlightState) or flight_state_obj.latitude is None or flight_state_obj.longitude is None: return None event_date_utc = datetime.fromtimestamp(flight_state_obj.timestamp, timezone.utc) db_path = self._get_db_path_for_datetime(event_date_utc) conn = self._get_connection(db_path) if not conn: return None position_id: Optional[int] = None try: with conn: sql = "INSERT OR IGNORE INTO positions (flight_id, detection_timestamp, latitude, longitude, baro_altitude_m, geo_altitude_m, velocity_mps, true_track_deg, vertical_rate_mps, on_ground, squawk, spi, position_source, raw_data_provider, recorded_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" params = (flight_id, flight_state_obj.timestamp, flight_state_obj.latitude, flight_state_obj.longitude, flight_state_obj.baro_altitude_m, flight_state_obj.geo_altitude_m, flight_state_obj.velocity_mps, flight_state_obj.true_track_deg, flight_state_obj.vertical_rate_mps, flight_state_obj.on_ground, flight_state_obj.squawk, flight_state_obj.spi, str(flight_state_obj.position_source) if flight_state_obj.position_source is not None else None, flight_state_obj.raw_data_provider, time.time()) cursor = conn.cursor() cursor.execute(sql, params) if cursor.rowcount > 0: position_id = cursor.lastrowid except sqlite3.Error as e: module_logger.error(f"DB error in add_position_daily for flight_id={flight_id}: {e}", exc_info=True) position_id = None return position_id def get_flight_track_for_icao_on_date(self, icao24_to_find: str, target_date_utc: Optional[datetime] = None, since_timestamp: Optional[float] = None) -> List[CanonicalFlightState]: date = target_date_utc or datetime.now(timezone.utc) db_path = self._get_db_path_for_datetime(date) if not os.path.exists(db_path): return [] conn = self._get_connection(db_path, read_only=True) if not conn: return [] track_points: List[CanonicalFlightState] = [] try: with conn: cursor = conn.cursor() cursor.execute("SELECT flight_id FROM flights WHERE icao24 = ?", (icao24_to_find.lower(),)) flight_row = cursor.fetchone() if not flight_row: return [] flight_id = flight_row["flight_id"] sql_query = "SELECT * FROM positions WHERE flight_id = ?" params: List[Any] = [flight_id] if since_timestamp is not None: sql_query += " AND detection_timestamp >= ?" params.append(since_timestamp) sql_query += " ORDER BY detection_timestamp ASC" for pos_row in cursor.execute(sql_query, params): track_points.append(CanonicalFlightState(icao24=icao24_to_find, timestamp=pos_row["detection_timestamp"], last_contact_timestamp=pos_row["detection_timestamp"], latitude=pos_row["latitude"], longitude=pos_row["longitude"], on_ground=bool(pos_row["on_ground"]), baro_altitude_m=pos_row["baro_altitude_m"], geo_altitude_m=pos_row["geo_altitude_m"], velocity_mps=pos_row["velocity_mps"], true_track_deg=pos_row["true_track_deg"], vertical_rate_mps=pos_row["vertical_rate_mps"], squawk=pos_row["squawk"], spi=(bool(pos_row["spi"]) if pos_row["spi"] is not None else None), position_source=pos_row["position_source"], raw_data_provider=pos_row["raw_data_provider"])) except sqlite3.Error as e: module_logger.error(f"DB error retrieving track for {icao24_to_find}: {e}", exc_info=True) return track_points def get_available_recording_dates(self) -> List[str]: dates = set() db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY) if not os.path.isdir(db_dir): return [] prefix = app_config.DATABASE_FILENAME_FORMAT.split("%")[0] suffix = ".db" for filename in os.listdir(db_dir): if filename.startswith(prefix) and filename.endswith(suffix): date_part = filename[len(prefix) : -len(suffix)] try: datetime.strptime(date_part, "%Y-%m-%d") dates.add(date_part) except ValueError: pass return sorted(list(dates), reverse=True) def get_daily_recording_summary(self, date_str: str) -> Optional[Dict[str, Any]]: db_path = self._get_db_path_for_date_str(date_str) if not os.path.exists(db_path): return None conn = self._get_connection(db_path, read_only=True) if not conn: return None summary = {"file_size_mb": 0.0, "time_range": None, "flights_count": 0, "positions_count": 0, "bbox": None} try: summary["file_size_mb"] = round(os.path.getsize(db_path) / (1024 * 1024), 2) with conn: cursor = conn.cursor() cursor.execute("SELECT MIN(first_seen_day), MAX(last_seen_day) FROM flights") time_res = cursor.fetchone() if time_res and time_res[0] is not None: summary["time_range"] = (time_res[0], time_res[1]) cursor.execute("SELECT COUNT(*) FROM flights") summary["flights_count"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM positions") summary["positions_count"] = cursor.fetchone()[0] cursor.execute("SELECT MIN(latitude), MAX(latitude), MIN(longitude), MAX(longitude) FROM positions") bbox_res = cursor.fetchone() if bbox_res and bbox_res[0] is not None: summary["bbox"] = {"lat_min": bbox_res[0], "lat_max": bbox_res[1], "lon_min": bbox_res[2], "lon_max": bbox_res[3]} except sqlite3.Error as e: module_logger.error(f"DB summary error for {date_str}: {e}", exc_info=True) return None return summary def get_positions_in_range(self, date_str: str, start_ts: float, end_ts: float) -> List[CanonicalFlightState]: db_path = self._get_db_path_for_date_str(date_str) if not os.path.exists(db_path): return [] conn = self._get_connection(db_path, read_only=True) if not conn: return [] states = [] try: with conn: sql = "SELECT p.*, f.icao24, f.callsign, f.origin_country FROM positions p JOIN flights f ON p.flight_id = f.flight_id WHERE p.detection_timestamp BETWEEN ? AND ? ORDER BY p.detection_timestamp ASC;" for row in conn.cursor().execute(sql, (start_ts, end_ts)): states.append(CanonicalFlightState(icao24=row["icao24"], callsign=row["callsign"], origin_country=row["origin_country"], timestamp=row["detection_timestamp"], last_contact_timestamp=row["detection_timestamp"], latitude=row["latitude"], longitude=row["longitude"], baro_altitude_m=row["baro_altitude_m"], geo_altitude_m=row["geo_altitude_m"], on_ground=bool(row["on_ground"]), velocity_mps=row["velocity_mps"], true_track_deg=row["true_track_deg"], vertical_rate_mps=row["vertical_rate_mps"], squawk=row["squawk"], spi=bool(row["spi"]), position_source=row["position_source"], raw_data_provider=row["raw_data_provider"])) except sqlite3.Error as e: module_logger.error(f"DB error getting positions in range for {date_str}: {e}", exc_info=True) return states def delete_daily_db(self, date_str: str) -> bool: """Deletes the daily database file for the given date.""" try: db_path_to_delete = self._get_db_path_for_date_str(date_str) except ValueError: module_logger.error(f"Invalid date string for deletion: {date_str}") return False self._close_connection(db_path_to_delete) time.sleep(0.1) try: if os.path.exists(db_path_to_delete): os.remove(db_path_to_delete) module_logger.info(f"Successfully deleted database: {db_path_to_delete}") return True else: module_logger.warning(f"Attempted to delete non-existent database: {db_path_to_delete}") return True # If it doesn't exist, it's "deleted" except OSError as e: module_logger.error(f"OS error deleting database {db_path_to_delete}: {e}", exc_info=True) return False