# FlightMonitor/data/aircraft_database_manager.py import sqlite3 import csv import os import threading import time from typing import Optional, Dict, Any, Tuple, Callable, List try: from ..utils.logger import get_logger logger = get_logger(__name__) except ImportError: import logging logger = logging.getLogger(__name__) if not logger.hasHandlers(): logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s [%(levelname)s] %(funcName)s (%(threadName)s): %(message)s") logger.warning("AircraftDatabaseManager using fallback standard Python logger.") DEFAULT_AIRCRAFT_DB_FILENAME = "aircraft_database.db" try: from . import config as app_config AIRCRAFT_DB_ROOT_DIRECTORY = getattr(app_config, "DATABASE_DIRECTORY", "flight_data_history") logger.info(f"AircraftDatabaseManager will use directory from app_config: {AIRCRAFT_DB_ROOT_DIRECTORY}") except ImportError: AIRCRAFT_DB_ROOT_DIRECTORY = "flight_data_history" logger.warning(f"AircraftDatabaseManager: app_config not found. Using fallback directory: {AIRCRAFT_DB_ROOT_DIRECTORY}") class AircraftDatabaseManager: """ Manages the local SQLite database for static aircraft details and scan history. Handles thread-safe connections for operations that might be called from different threads. """ def __init__(self, db_name: str = DEFAULT_AIRCRAFT_DB_FILENAME, db_root_dir: Optional[str] = None): actual_db_root_dir = db_root_dir if db_root_dir is not None else AIRCRAFT_DB_ROOT_DIRECTORY self.db_path: str = os.path.join(os.path.abspath(actual_db_root_dir), db_name) self.conn: Optional[sqlite3.Connection] = None self._main_thread_id: int = threading.get_ident() logger.info(f"Aircraft database path set to: {self.db_path}") db_parent_dir = os.path.dirname(self.db_path) if not os.path.exists(db_parent_dir): try: os.makedirs(db_parent_dir, exist_ok=True) logger.info(f"Aircraft database directory created: {db_parent_dir}") except OSError as e: logger.error(f"Critical error creating directory {db_parent_dir}: {e}. DB might be inaccessible.", exc_info=True) raise RuntimeError(f"Unable to create database directory: {db_parent_dir}") from e if threading.get_ident() == self._main_thread_id: self._connect_safely() if self.conn: self._create_tables() else: logger.error("Main database connection failed on init. Cannot ensure tables exist.") logger.info(f"AircraftDatabaseManager initialized. Main thread ID: {self._main_thread_id}") def _get_thread_safe_connection(self) -> Optional[sqlite3.Connection]: current_thread_id = threading.get_ident() if current_thread_id == self._main_thread_id: if self.conn is None or not self._is_connection_alive(self.conn): self._connect_safely() return self.conn else: try: local_conn = sqlite3.connect(self.db_path, timeout=10.0) local_conn.row_factory = sqlite3.Row return local_conn except sqlite3.Error as e: logger.error(f"Worker thread ({current_thread_id}): Error creating local DB connection: {e}", exc_info=True) return None def _connect_safely(self): if threading.get_ident() != self._main_thread_id: logger.error("_connect_safely called from non-main thread. This is for self.conn only.") return try: if self.conn and not self._is_connection_alive(self.conn): try: self.conn.close() except sqlite3.Error: pass self.conn = None if self.conn is None: self.conn = sqlite3.connect(self.db_path, timeout=10.0) self.conn.row_factory = sqlite3.Row logger.info(f"Main thread: Connected/Reconnected to aircraft database: {self.db_path}") except sqlite3.Error as e: logger.error(f"Main thread: Error connecting/reconnecting to aircraft database {self.db_path}: {e}", exc_info=True) self.conn = None def _is_connection_alive(self, conn_to_check: Optional[sqlite3.Connection]) -> bool: if conn_to_check is None: return False try: conn_to_check.execute("SELECT 1").fetchone() return True except sqlite3.Error: return False def _create_tables(self): if threading.get_ident() != self._main_thread_id: logger.warning("Table creation attempt from a non-main thread. Aborting.") return if not self.conn: logger.error("No main database connection to create tables.") self._connect_safely() if not self.conn: logger.critical("Failed to establish main DB connection for table creation.") return try: with self.conn: cursor = self.conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS aircraft_details ( icao24 TEXT PRIMARY KEY NOT NULL, registration TEXT, manufacturericao TEXT, manufacturername TEXT, model TEXT, typecode TEXT, serialnumber TEXT, operator TEXT, operatorcallsign TEXT, operatoricao TEXT, operatoriata TEXT, owner TEXT, country TEXT, built_year INTEGER, firstflightdate TEXT, categorydescription TEXT, engines TEXT, icaoclass TEXT, linenumber TEXT, modes INTEGER, notes TEXT, status TEXT, timestamp_metadata TEXT ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_aircraft_registration ON aircraft_details(registration);") cursor.execute("CREATE INDEX IF NOT EXISTS idx_aircraft_typecode ON aircraft_details(typecode);") cursor.execute(""" CREATE TABLE IF NOT EXISTS scan_history ( scan_id INTEGER PRIMARY KEY AUTOINCREMENT, start_timestamp REAL NOT NULL, end_timestamp REAL NOT NULL, sampling_interval_sec INTEGER, scan_rate_sec INTEGER, lat_min REAL NOT NULL, lon_min REAL NOT NULL, lat_max REAL NOT NULL, lon_max REAL NOT NULL, status TEXT, completed_at REAL ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_scan_history_time ON scan_history(start_timestamp, end_timestamp);") logger.info("Tables 'aircraft_details' and 'scan_history' ensured in DB.") except sqlite3.Error as e: logger.error(f"Error creating tables: {e}", exc_info=True) def get_aircraft_details(self, icao24: str) -> Optional[Dict[str, Any]]: conn = self._get_thread_safe_connection() if not conn: logger.error(f"Could not get DB connection for get_aircraft_details (ICAO: {icao24}).") return None icao24_clean = icao24.lower().strip() details = None try: with conn: cursor = conn.cursor() cursor.execute("SELECT * FROM aircraft_details WHERE icao24 = ?", (icao24_clean,)) row = cursor.fetchone() if row: details = dict(row) except sqlite3.Error as e: logger.error(f"DB error in get_aircraft_details for {icao24_clean}: {e}", exc_info=True) except Exception as e_generic: logger.error(f"Generic error in get_aircraft_details for {icao24_clean}: {e_generic}", exc_info=True) finally: if conn is not self.conn and conn is not None: try: conn.close() except sqlite3.Error: pass return details def _clean_csv_value(self, value: Optional[str]) -> Optional[str]: if value is None: return None cleaned = value.strip() if len(cleaned) >= 2 and cleaned.startswith("'") and cleaned.endswith("'"): cleaned = cleaned[1:-1] return cleaned if cleaned else None def import_from_csv( self, csv_filepath: str, replace_existing: bool = True, progress_callback: Optional[Callable[[int, int, Optional[int]], None]] = None, total_rows_for_callback: Optional[int] = None, ) -> Tuple[int, int]: conn = self._get_thread_safe_connection() if not conn: logger.error(f"Could not get DB connection for import_from_csv (File: {csv_filepath}).") if progress_callback: try: progress_callback(0, 0, total_rows_for_callback) except Exception as e_cb_fail: logger.error(f"Error in progress_callback during connection failure: {e_cb_fail}") return 0, 0 current_thread_id = threading.get_ident() logger.info(f"Thread ({current_thread_id}) - Starting CSV import: {csv_filepath}. Replace mode: {replace_existing}") processed_data_rows = 0 imported_or_updated_rows = 0 db_columns = [ "icao24", "registration", "manufacturericao", "manufacturername", "model", "typecode", "serialnumber", "operator", "operatorcallsign", "operatoricao", "operatoriata", "owner", "country", "built_year", "firstflightdate", "categorydescription", "engines", "icaoclass", "linenumber", "modes", "notes", "status", "timestamp_metadata" ] csv_to_db_column_map = { "icao24": "icao24", "registration": "registration", "manufacturericao": "manufacturericao", "manufacturername": "manufacturername", "model": "model", "typecode": "typecode", "serialnumber": "serialnumber", "operator": "operator", "operatorcallsign": "operatorcallsign", "operatoricao": "operatoricao", "operatoriata": "operatoriata", "owner": "owner", "country": "country", "built": "built_year", "firstflightdate": "firstflightdate", "categorydescription": "categorydescription", "engines": "engines", "icaoaircraftclass": "icaoclass", "linenumber": "linenumber", "modes": "modes", "notes": "notes", "status": "status", "timestamp": "timestamp_metadata" } callback_interval = 500 try: with conn: cursor = conn.cursor() if replace_existing: cursor.execute("DELETE FROM aircraft_details") logger.info(f"Thread ({current_thread_id}): Existing data deleted from aircraft_details.") with open(csv_filepath, "r", encoding="utf-8-sig") as csvfile: reader = csv.DictReader(csvfile) if not reader.fieldnames: logger.error(f"Thread ({current_thread_id}): CSV file {csv_filepath} has no valid header or is empty.") return 0, 0 reader.fieldnames = [self._clean_csv_value(fn).lower() if isinstance(fn, str) else fn for fn in reader.fieldnames] for csv_row_index, raw_row_data in enumerate(reader): processed_data_rows += 1 icao24_val = self._clean_csv_value(raw_row_data.get("icao24", "")).lower() if not icao24_val: logger.warning(f"Thread ({current_thread_id}) CSV Row {processed_data_rows}: Missing or empty icao24. Skipping.") continue data_for_db = {"icao24": icao24_val} for csv_col, db_col in csv_to_db_column_map.items(): if csv_col == "icao24": continue raw_val = raw_row_data.get(csv_col) if raw_val is not None: cleaned_val = self._clean_csv_value(raw_val) final_val = None if cleaned_val: if db_col in ["built_year", "modes"]: final_val = int(cleaned_val) if cleaned_val.isdigit() else None else: final_val = cleaned_val data_for_db[db_col] = final_val cols_for_sql = [col for col in db_columns if col in data_for_db] vals_for_sql = [data_for_db[col] for col in cols_for_sql] if not cols_for_sql: continue placeholders = ", ".join(["?"] * len(cols_for_sql)) sql = f"INSERT OR REPLACE INTO aircraft_details ({', '.join(cols_for_sql)}) VALUES ({placeholders})" try: cursor.execute(sql, vals_for_sql) if cursor.rowcount > 0: imported_or_updated_rows += 1 except sqlite3.Error as e_sql: logger.error(f"Thread ({current_thread_id}) CSV Row {processed_data_rows}: SQL error for ICAO '{icao24_val}': {e_sql}") if progress_callback and processed_data_rows % callback_interval == 0: progress_callback(processed_data_rows, imported_or_updated_rows, total_rows_for_callback) except FileNotFoundError: logger.error(f"Thread ({current_thread_id}): CSV file not found: {csv_filepath}") except csv.Error as e_csv_format: logger.error(f"Thread ({current_thread_id}): CSV format error in {csv_filepath} (around row {processed_data_rows+1}): {e_csv_format}", exc_info=True) except Exception as e_general_import: logger.error(f"Thread ({current_thread_id}): General error during CSV import {csv_filepath}: {e_general_import}", exc_info=True) finally: if progress_callback: progress_callback(processed_data_rows, imported_or_updated_rows, total_rows_for_callback) if conn is not self.conn and conn is not None: conn.close() return processed_data_rows, imported_or_updated_rows def add_scan_history(self, params: Dict[str, Any], bbox: Dict[str, float], status: str) -> Optional[int]: """Adds a record of a completed or failed historical scan.""" conn = self._get_thread_safe_connection() if not conn: logger.error("Could not get DB connection for add_scan_history.") return None sql = """ INSERT INTO scan_history ( start_timestamp, end_timestamp, sampling_interval_sec, scan_rate_sec, lat_min, lon_min, lat_max, lon_max, status, completed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ db_params = ( params.get('start_time'), params.get('end_time'), params.get('sampling_interval_sec'), params.get('scan_rate_sec'), bbox.get('lat_min'), bbox.get('lon_min'), bbox.get('lat_max'), bbox.get('lon_max'), status, time.time() ) scan_id = None try: with conn: cursor = conn.cursor() cursor.execute(sql, db_params) scan_id = cursor.lastrowid logger.info(f"Scan history record added with ID {scan_id} and status '{status}'.") except sqlite3.Error as e: logger.error(f"DB error adding scan history record: {e}", exc_info=True) finally: if conn is not self.conn and conn is not None: conn.close() return scan_id def find_overlapping_scans(self, start_time: float, end_time: float, bbox: Dict[str, float]) -> List[Dict[str, Any]]: """ Finds completed scans that overlap with the given time range and a similar area. A "similar area" is defined as a BBox whose center is very close and size is comparable. """ conn = self._get_thread_safe_connection() if not conn: logger.error("Could not get DB connection for find_overlapping_scans.") return [] sql = """ SELECT * FROM scan_history WHERE status = 'completed' AND start_timestamp < ? AND end_timestamp > ? """ overlapping_scans = [] try: with conn: cursor = conn.cursor() cursor.execute(sql, (end_time, start_time)) potential_matches = [dict(row) for row in cursor.fetchall()] if not potential_matches: return [] from ..map.map_utils import calculate_geographic_bbox_size_km req_center_lat = (bbox['lat_min'] + bbox['lat_max']) / 2 req_center_lon = (bbox['lon_min'] + bbox['lon_max']) / 2 req_size = calculate_geographic_bbox_size_km((bbox['lon_min'], bbox['lat_min'], bbox['lon_max'], bbox['lat_max'])) for scan in potential_matches: scan_bbox_tuple = (scan['lon_min'], scan['lat_min'], scan['lon_max'], scan['lat_max']) scan_center_lat = (scan['lat_min'] + scan['lat_max']) / 2 scan_center_lon = (scan['lon_min'] + scan['lon_max']) / 2 scan_size = calculate_geographic_bbox_size_km(scan_bbox_tuple) if req_size is None or scan_size is None: continue center_dist_lat = abs(req_center_lat - scan_center_lat) center_dist_lon = abs(req_center_lon - scan_center_lon) size_diff_w = abs(req_size[0] - scan_size[0]) / req_size[0] if req_size[0] > 0 else 1 size_diff_h = abs(req_size[1] - scan_size[1]) / req_size[1] if req_size[1] > 0 else 1 if center_dist_lat < 0.1 and center_dist_lon < 0.1 and size_diff_w < 0.1 and size_diff_h < 0.1: overlapping_scans.append(scan) except Exception as e: logger.error(f"Error in find_overlapping_scans: {e}", exc_info=True) finally: if conn is not self.conn and conn is not None: conn.close() return overlapping_scans def close_connection(self): if self.conn and threading.get_ident() == self._main_thread_id: try: if self._is_connection_alive(self.conn): self.conn.close() logger.info(f"Main aircraft database connection closed: {self.db_path}") except sqlite3.Error as e: logger.error(f"Error closing main aircraft DB connection: {e}", exc_info=True) finally: self.conn = None elif self.conn and threading.get_ident() != self._main_thread_id: logger.warning(f"Attempt to close main connection from non-main thread ({threading.get_ident()}). Ignored.")