# FlightMonitor/data/storage.py import sqlite3 import os from datetime import datetime, timezone, timedelta # Aggiunto timedelta per il blocco di test import time from typing import Optional, List, Dict, Any # Aggiunto Any per il blocco di test # Relative imports from . import config as app_config from ..utils.logger import get_logger from .common_models import CanonicalFlightState module_logger = get_logger(__name__) class DataStorage: """ Handles storage of flight data into daily SQLite database files. """ def __init__(self): self._current_db_path: Optional[str] = None self._connection: Optional[sqlite3.Connection] = None self._ensure_database_directory() module_logger.info("DataStorage initialized.") def _ensure_database_directory(self): try: abs_db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY) if not os.path.exists(abs_db_dir): os.makedirs(abs_db_dir) module_logger.info(f"Database directory created: {abs_db_dir}") if not os.path.isdir(abs_db_dir): raise OSError(f"Path {abs_db_dir} exists but is not a directory.") if not os.access(abs_db_dir, os.W_OK): raise OSError(f"Directory {abs_db_dir} is not writable.") except OSError as e: module_logger.error( f"Database directory error for {app_config.DATABASE_DIRECTORY}: {e}", exc_info=True, ) raise def _get_db_path_for_date(self, target_date_utc: Optional[datetime] = None) -> str: if target_date_utc is None: target_date_utc = datetime.now(timezone.utc) db_filename = target_date_utc.strftime(app_config.DATABASE_FILENAME_FORMAT) db_path = os.path.join( os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename ) return db_path def _get_db_connection( self, for_date_utc: Optional[datetime] = None ) -> Optional[sqlite3.Connection]: target_db_path = self._get_db_path_for_date(for_date_utc) if self._connection and self._current_db_path == target_db_path: return self._connection if self._connection: try: self._connection.close() module_logger.info(f"Closed DB connection to: {self._current_db_path}") except sqlite3.Error as e: module_logger.error( f"Error closing previous DB connection to {self._current_db_path}: {e}" ) self._connection = None self._current_db_path = target_db_path try: self._connection = sqlite3.connect(target_db_path, timeout=10.0) self._connection.execute("PRAGMA foreign_keys = ON;") self._connection.row_factory = sqlite3.Row module_logger.info(f"Opened DB connection to: {target_db_path}") self._init_tables_if_not_exist(self._connection) return self._connection except sqlite3.Error as e: module_logger.error( f"Failed to connect/initialize database {target_db_path}: {e}", exc_info=True, ) if self._connection: try: self._connection.close() except sqlite3.Error: pass self._connection = None self._current_db_path = None return None def _init_tables_if_not_exist(self, db_conn: sqlite3.Connection): cursor = None try: cursor = db_conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS flights ( flight_id INTEGER PRIMARY KEY AUTOINCREMENT, icao24 TEXT NOT NULL UNIQUE, callsign TEXT, origin_country TEXT, first_seen_day REAL NOT NULL, last_seen_day REAL NOT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS positions ( position_id INTEGER PRIMARY KEY AUTOINCREMENT, flight_id INTEGER NOT NULL, detection_timestamp REAL NOT NULL, latitude REAL NOT NULL, longitude REAL NOT NULL, baro_altitude_m REAL, geo_altitude_m REAL, velocity_mps REAL, true_track_deg REAL, vertical_rate_mps REAL, on_ground BOOLEAN NOT NULL, squawk TEXT, spi BOOLEAN, position_source TEXT, raw_data_provider TEXT, recorded_at REAL NOT NULL, FOREIGN KEY (flight_id) REFERENCES flights(flight_id) ON DELETE CASCADE ) """ ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_positions_flight_id_timestamp ON positions (flight_id, detection_timestamp);" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_flights_icao24 ON flights (icao24);" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_positions_detection_timestamp ON positions (detection_timestamp);" ) db_conn.commit() module_logger.debug( f"Database tables and indexes ensured in {self._current_db_path}." ) except sqlite3.Error as e: module_logger.error( f"Error initializing tables in {self._current_db_path}: {e}", exc_info=True, ) if db_conn: db_conn.rollback() finally: if cursor: cursor.close() def add_or_update_flight_daily( self, icao24: str, callsign: Optional[str], origin_country: Optional[str], detection_timestamp: float, ) -> Optional[int]: if not icao24: module_logger.warning("icao24 is required to add or update a flight.") return None event_date_utc = datetime.fromtimestamp(detection_timestamp, timezone.utc) conn = self._get_db_connection(for_date_utc=event_date_utc) if not conn: module_logger.error( "Failed to get DB connection for add_or_update_flight_daily." ) return None cursor = None flight_id: Optional[int] = None try: cursor = conn.cursor() cursor.execute( "SELECT flight_id, callsign FROM flights WHERE icao24 = ?", (icao24,) ) row = cursor.fetchone() if row: flight_id = row["flight_id"] existing_callsign = row["callsign"] final_callsign = existing_callsign if callsign and callsign != existing_callsign: final_callsign = callsign elif not existing_callsign and callsign: final_callsign = callsign cursor.execute( """ UPDATE flights SET last_seen_day = ?, callsign = ? WHERE flight_id = ? """, (detection_timestamp, final_callsign, flight_id), ) module_logger.debug( f"Updated flight icao24={icao24}, flight_id={flight_id}, last_seen_day={detection_timestamp:.0f} in {self._current_db_path}" ) else: cursor.execute( """ INSERT INTO flights (icao24, callsign, origin_country, first_seen_day, last_seen_day) VALUES (?, ?, ?, ?, ?) """, ( icao24, callsign, origin_country, detection_timestamp, detection_timestamp, ), ) flight_id = cursor.lastrowid module_logger.info( f"Added new flight icao24={icao24}, flight_id={flight_id} to {self._current_db_path}" ) conn.commit() except sqlite3.Error as e: module_logger.error( f"DB error in add_or_update_flight_daily for icao24={icao24}: {e}", exc_info=True, ) if conn: conn.rollback() flight_id = None finally: if cursor: cursor.close() return flight_id def add_position_daily( self, flight_id: int, flight_state_obj: CanonicalFlightState ) -> Optional[int]: if not flight_id: module_logger.warning("flight_id is required to add a position.") return None if not isinstance(flight_state_obj, CanonicalFlightState): module_logger.error( f"Invalid type for flight_state_obj: expected CanonicalFlightState, got {type(flight_state_obj)}" ) return None event_date_utc = datetime.fromtimestamp( flight_state_obj.timestamp, timezone.utc ) conn = self._get_db_connection(for_date_utc=event_date_utc) if not conn: module_logger.error( f"Failed to get DB connection for add_position_daily (flight_id={flight_id})." ) return None cursor = None position_id: Optional[int] = None recorded_at_ts = time.time() if flight_state_obj.latitude is None or flight_state_obj.longitude is None: module_logger.warning( f"Skipping position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) " f"due to missing latitude/longitude. Timestamp: {flight_state_obj.timestamp:.0f}" ) return None try: cursor = conn.cursor() sql = """ INSERT INTO positions ( flight_id, detection_timestamp, latitude, longitude, baro_altitude_m, geo_altitude_m, velocity_mps, true_track_deg, vertical_rate_mps, on_ground, squawk, spi, position_source, raw_data_provider, recorded_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ params = ( flight_id, flight_state_obj.timestamp, flight_state_obj.latitude, flight_state_obj.longitude, flight_state_obj.baro_altitude_m, flight_state_obj.geo_altitude_m, flight_state_obj.velocity_mps, flight_state_obj.true_track_deg, flight_state_obj.vertical_rate_mps, flight_state_obj.on_ground, flight_state_obj.squawk, flight_state_obj.spi, ( str(flight_state_obj.position_source) if flight_state_obj.position_source is not None else None ), flight_state_obj.raw_data_provider, recorded_at_ts, ) cursor.execute(sql, params) position_id = cursor.lastrowid conn.commit() module_logger.debug( f"Added position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) " f"at {flight_state_obj.timestamp:.0f} (pos_id={position_id}) in {self._current_db_path}" ) except sqlite3.Error as e: module_logger.error( f"DB error in add_position_daily for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}): {e}", exc_info=True, ) if conn: conn.rollback() position_id = None finally: if cursor: cursor.close() return position_id def get_flight_track_for_icao_on_date( self, icao24_to_find: str, target_date_utc: Optional[datetime] = None ) -> List[CanonicalFlightState]: """ Retrieves all recorded positions for a given ICAO24 on a specific UTC date. Returns a list of CanonicalFlightState objects, ordered by timestamp. """ if not icao24_to_find: module_logger.warning("ICAO24 is required to get flight track.") return [] if target_date_utc is None: target_date_utc = datetime.now(timezone.utc) conn = self._get_db_connection(for_date_utc=target_date_utc) if not conn: module_logger.error( f"Failed to get DB connection for date {target_date_utc.strftime('%Y-%m-%d')} to retrieve track for {icao24_to_find}." ) return [] track_points: List[CanonicalFlightState] = [] cursor = None try: cursor = conn.cursor() cursor.execute( "SELECT flight_id FROM flights WHERE icao24 = ?", (icao24_to_find.lower(),) ) flight_row = cursor.fetchone() if not flight_row: module_logger.info( f"No flight record found for ICAO {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}." ) return [] flight_id = flight_row["flight_id"] module_logger.debug( f"Found flight_id {flight_id} for ICAO {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}." ) cursor.execute( """ SELECT * FROM positions WHERE flight_id = ? ORDER BY detection_timestamp ASC """, (flight_id,), ) position_rows = cursor.fetchall() if not position_rows: module_logger.info( f"No position records found for flight_id {flight_id} (ICAO {icao24_to_find}) on {target_date_utc.strftime('%Y-%m-%d')}." ) return [] # Get column names from cursor.description to robustly check for key existence # This is an alternative to pos_row.keys() if issues persist or for older sqlite3 versions # col_names = [description[0] for description in cursor.description] for pos_row in position_rows: try: state = CanonicalFlightState( icao24=icao24_to_find, timestamp=pos_row["detection_timestamp"], last_contact_timestamp=pos_row["detection_timestamp"], latitude=pos_row["latitude"], longitude=pos_row["longitude"], on_ground=bool(pos_row["on_ground"]), callsign=None, origin_country=None, baro_altitude_m=pos_row["baro_altitude_m"] if "baro_altitude_m" in pos_row.keys() and pos_row["baro_altitude_m"] is not None else None, geo_altitude_m=pos_row["geo_altitude_m"] if "geo_altitude_m" in pos_row.keys() and pos_row["geo_altitude_m"] is not None else None, velocity_mps=pos_row["velocity_mps"] if "velocity_mps" in pos_row.keys() and pos_row["velocity_mps"] is not None else None, true_track_deg=pos_row["true_track_deg"] if "true_track_deg" in pos_row.keys() and pos_row["true_track_deg"] is not None else None, vertical_rate_mps=pos_row["vertical_rate_mps"] if "vertical_rate_mps" in pos_row.keys() and pos_row["vertical_rate_mps"] is not None else None, squawk=pos_row["squawk"] if "squawk" in pos_row.keys() and pos_row["squawk"] is not None else None, spi=bool(pos_row["spi"]) if "spi" in pos_row.keys() and pos_row["spi"] is not None else None, position_source=pos_row["position_source"] if "position_source" in pos_row.keys() and pos_row["position_source"] is not None else None, raw_data_provider=pos_row["raw_data_provider"] if "raw_data_provider" in pos_row.keys() and pos_row["raw_data_provider"] is not None else None, ) track_points.append(state) except KeyError as e: module_logger.warning(f"KeyError creating CanonicalFlightState from DB row for ICAO {icao24_to_find}: {e}. Row keys: {pos_row.keys()}") except Exception as e_state_create: module_logger.error(f"Error creating CanonicalFlightState from DB row for ICAO {icao24_to_find}: {e_state_create}", exc_info=True) continue module_logger.info( f"Retrieved {len(track_points)} track points for ICAO {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}." ) except sqlite3.Error as e: module_logger.error( f"DB error retrieving track for {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}: {e}", exc_info=True, ) except Exception as e_gen: # Catch-all for other potential errors during processing module_logger.error( f"Generic error retrieving track for {icao24_to_find}: {e_gen}", exc_info=True, ) finally: if cursor: cursor.close() return track_points def close_connection(self): if self._connection: try: self._connection.close() module_logger.info( f"DB connection to {self._current_db_path} closed by request." ) except sqlite3.Error as e: module_logger.error( f"Error explicitly closing DB connection to {self._current_db_path}: {e}" ) finally: self._connection = None self._current_db_path = None if __name__ == "__main__": import logging # Ensure logging is imported for basicConfig logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)-8s] %(name)-20s : %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) module_logger.info( "--- Running DataStorage Standalone Test (with CanonicalFlightState) ---" ) class MockConfig: DATABASE_DIRECTORY = os.path.join( os.getcwd(), "test_flight_data_storage_canonical" ) DATABASE_FILENAME_FORMAT = "test_flights_canonical_%Y-%m-%d.db" LOG_LEVEL = "DEBUG" # Added for completeness if accessed original_app_config = app_config globals()["app_config"] = MockConfig() storage = None try: storage = DataStorage() current_ts_utc = time.time() # Flight 1 state1_obj = CanonicalFlightState( icao24="CANON01", callsign="CANONFLT1", origin_country="Canonistan", timestamp=current_ts_utc, last_contact_timestamp=current_ts_utc, latitude=51.0, longitude=11.0, on_ground=False, baro_altitude_m=9000.0, geo_altitude_m=9020.0, # Added for test velocity_mps=200.0, true_track_deg=90.0, vertical_rate_mps=10.0, squawk="1234", spi=False, position_source=0, raw_data_provider="TestAdapter", ) flight_id1 = storage.add_or_update_flight_daily( state1_obj.icao24, state1_obj.callsign, state1_obj.origin_country, state1_obj.timestamp, ) if flight_id1: module_logger.info( f"Flight 1 (icao={state1_obj.icao24}) added/updated, flight_id: {flight_id1}" ) pos_id1 = storage.add_position_daily(flight_id1, state1_obj) if pos_id1: module_logger.info( f"Position 1 for flight_id {flight_id1} added, pos_id: {pos_id1}" ) time.sleep(0.01) updated_ts_utc = time.time() state1_update_obj = CanonicalFlightState( icao24="CANON01", callsign="CANONFLT1A", origin_country="Canonistan", timestamp=updated_ts_utc, last_contact_timestamp=updated_ts_utc, latitude=51.5, longitude=11.5, on_ground=False, baro_altitude_m=9050.0, geo_altitude_m=9070.0, # Added for test velocity_mps=205.0, true_track_deg=95.0, vertical_rate_mps=0.0, squawk="1234", spi=False, position_source=0, raw_data_provider="TestAdapter", ) flight_id1_upd = storage.add_or_update_flight_daily( state1_update_obj.icao24, state1_update_obj.callsign, state1_update_obj.origin_country, state1_update_obj.timestamp, ) assert flight_id1_upd == flight_id1 module_logger.info( f"Flight 1 (icao={state1_update_obj.icao24}) updated, flight_id: {flight_id1_upd}" ) if flight_id1_upd: pos_id1_upd = storage.add_position_daily(flight_id1_upd, state1_update_obj) if pos_id1_upd: module_logger.info( f"Position 2 for flight_id {flight_id1_upd} added, pos_id: {pos_id1_upd}" ) # Test get_flight_track_for_icao_on_date module_logger.info(f"--- Testing get_flight_track_for_icao_on_date for CANON01 ---") retrieved_track = storage.get_flight_track_for_icao_on_date("CANON01") assert len(retrieved_track) == 2, f"Expected 2 track points for CANON01, got {len(retrieved_track)}" if len(retrieved_track) == 2: module_logger.info(f"Retrieved {len(retrieved_track)} points for CANON01. First point: {retrieved_track[0]}") assert retrieved_track[0].latitude == 51.0 assert retrieved_track[1].latitude == 51.5 assert retrieved_track[0].baro_altitude_m == 9000.0 assert retrieved_track[1].geo_altitude_m == 9070.0 # Check a field from the update # Flight 2 - for "tomorrow" tomorrow_dt = datetime.now(timezone.utc) + timedelta(days=1) tomorrow_ts_utc = tomorrow_dt.timestamp() state2_obj = CanonicalFlightState( icao24="CANON02", callsign="CANONFLT2", origin_country="OtherCanonPlace", timestamp=tomorrow_ts_utc, last_contact_timestamp=tomorrow_ts_utc, latitude=34.0, longitude=-118.0, on_ground=False, baro_altitude_m=12000.0, geo_altitude_m=None, # Test with None velocity_mps=250.0, true_track_deg=270.0, vertical_rate_mps=-5.0, squawk="7700", spi=True, position_source=1, raw_data_provider="TestAdapter", ) flight_id2 = storage.add_or_update_flight_daily( state2_obj.icao24, state2_obj.callsign, state2_obj.origin_country, state2_obj.timestamp, ) if flight_id2: module_logger.info( f"Flight 2 (icao={state2_obj.icao24}) for tomorrow added, flight_id: {flight_id2}" ) pos_id2 = storage.add_position_daily(flight_id2, state2_obj) if pos_id2: module_logger.info( f"Position for flight_id {flight_id2} (tomorrow) added, pos_id: {pos_id2}" ) module_logger.info(f"--- Testing get_flight_track_for_icao_on_date for CANON02 (tomorrow) ---") retrieved_track_tmr = storage.get_flight_track_for_icao_on_date("CANON02", target_date_utc=tomorrow_dt) assert len(retrieved_track_tmr) == 1, f"Expected 1 track point for CANON02, got {len(retrieved_track_tmr)}" if retrieved_track_tmr: module_logger.info(f"Retrieved point for CANON02: {retrieved_track_tmr[0]}") assert retrieved_track_tmr[0].geo_altitude_m is None # Check None value except Exception as e: module_logger.critical( f"Error during DataStorage standalone test (Canonical): {e}", exc_info=True ) finally: if storage: storage.close_connection() globals()["app_config"] = original_app_config test_dir_to_check = os.path.join( os.getcwd(), "test_flight_data_storage_canonical" ) if os.path.exists(test_dir_to_check): module_logger.info( f"Test directory {test_dir_to_check} was used. Please check/clean manually." ) module_logger.info("--- DataStorage Standalone Test (Canonical) Finished ---")