393 lines
19 KiB
Python
393 lines
19 KiB
Python
# FlightMonitor/data/storage.py
|
|
import sqlite3
|
|
import os
|
|
from datetime import datetime, timezone
|
|
import time
|
|
from typing import Optional, List, Dict
|
|
|
|
|
|
# Relative imports
|
|
from . import config as app_config
|
|
from ..utils.logger import get_logger
|
|
from .common_models import CanonicalFlightState # Import our canonical model
|
|
|
|
module_logger = get_logger(__name__)
|
|
|
|
class DataStorage:
|
|
"""
|
|
Handles storage of flight data into daily SQLite database files.
|
|
"""
|
|
def __init__(self):
|
|
self._current_db_path: Optional[str] = None
|
|
self._connection: Optional[sqlite3.Connection] = None
|
|
self._ensure_database_directory()
|
|
module_logger.info("DataStorage initialized.")
|
|
|
|
def _ensure_database_directory(self):
|
|
# ... (implementation as before) ...
|
|
try:
|
|
abs_db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY)
|
|
if not os.path.exists(abs_db_dir):
|
|
os.makedirs(abs_db_dir)
|
|
module_logger.info(f"Database directory created: {abs_db_dir}")
|
|
if not os.path.isdir(abs_db_dir):
|
|
raise OSError(f"Path {abs_db_dir} exists but is not a directory.")
|
|
if not os.access(abs_db_dir, os.W_OK):
|
|
raise OSError(f"Directory {abs_db_dir} is not writable.")
|
|
except OSError as e:
|
|
module_logger.error(f"Database directory error for {app_config.DATABASE_DIRECTORY}: {e}", exc_info=True)
|
|
raise
|
|
|
|
def _get_db_path_for_date(self, target_date_utc: Optional[datetime] = None) -> str:
|
|
# ... (implementation as before) ...
|
|
if target_date_utc is None:
|
|
target_date_utc = datetime.now(timezone.utc)
|
|
db_filename = target_date_utc.strftime(app_config.DATABASE_FILENAME_FORMAT)
|
|
db_path = os.path.join(os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename)
|
|
return db_path
|
|
|
|
def _get_db_connection(self, for_date_utc: Optional[datetime] = None) -> Optional[sqlite3.Connection]:
|
|
# ... (implementation as before, ensuring row_factory is set) ...
|
|
target_db_path = self._get_db_path_for_date(for_date_utc)
|
|
if self._connection and self._current_db_path == target_db_path:
|
|
return self._connection
|
|
if self._connection:
|
|
try:
|
|
self._connection.close()
|
|
module_logger.info(f"Closed DB connection to: {self._current_db_path}")
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"Error closing previous DB connection to {self._current_db_path}: {e}")
|
|
self._connection = None
|
|
self._current_db_path = target_db_path
|
|
try:
|
|
self._connection = sqlite3.connect(target_db_path, timeout=10.0)
|
|
self._connection.execute("PRAGMA foreign_keys = ON;")
|
|
self._connection.row_factory = sqlite3.Row # Ensures select queries return dict-like rows
|
|
module_logger.info(f"Opened DB connection to: {target_db_path}")
|
|
self._init_tables_if_not_exist(self._connection)
|
|
return self._connection
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"Failed to connect/initialize database {target_db_path}: {e}", exc_info=True)
|
|
if self._connection:
|
|
try: self._connection.close()
|
|
except sqlite3.Error: pass
|
|
self._connection = None
|
|
self._current_db_path = None
|
|
return None
|
|
|
|
def _init_tables_if_not_exist(self, db_conn: sqlite3.Connection):
|
|
# ... (implementation as before) ...
|
|
# Schema includes:
|
|
# flights (flight_id, icao24, callsign, origin_country, first_seen_day, last_seen_day)
|
|
# positions (position_id, flight_id, detection_timestamp, latitude, longitude,
|
|
# baro_altitude, geo_altitude, velocity, true_track, on_ground, recorded_at)
|
|
cursor = None
|
|
try:
|
|
cursor = db_conn.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS flights (
|
|
flight_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
icao24 TEXT NOT NULL UNIQUE,
|
|
callsign TEXT,
|
|
origin_country TEXT,
|
|
first_seen_day REAL NOT NULL,
|
|
last_seen_day REAL NOT NULL
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS positions (
|
|
position_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
flight_id INTEGER NOT NULL,
|
|
detection_timestamp REAL NOT NULL,
|
|
latitude REAL NOT NULL,
|
|
longitude REAL NOT NULL,
|
|
baro_altitude_m REAL, /* MODIFIED: Suffix _m for clarity */
|
|
geo_altitude_m REAL, /* MODIFIED: Suffix _m for clarity */
|
|
velocity_mps REAL, /* MODIFIED: Suffix _mps for clarity */
|
|
true_track_deg REAL, /* MODIFIED: Suffix _deg for clarity */
|
|
vertical_rate_mps REAL, /* ADDED: New field from CanonicalFlightState */
|
|
on_ground BOOLEAN NOT NULL,
|
|
squawk TEXT, /* ADDED: New field */
|
|
spi BOOLEAN, /* ADDED: New field */
|
|
position_source TEXT, /* ADDED: New field (TEXT to be flexible) */
|
|
raw_data_provider TEXT, /* ADDED: New field */
|
|
recorded_at REAL NOT NULL,
|
|
FOREIGN KEY (flight_id) REFERENCES flights(flight_id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
# Update indexes if new columns are frequently queried
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_flight_id_timestamp ON positions (flight_id, detection_timestamp);")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_flights_icao24 ON flights (icao24);")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_detection_timestamp ON positions (detection_timestamp);")
|
|
db_conn.commit()
|
|
module_logger.debug(f"Database tables and indexes ensured in {self._current_db_path}.")
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"Error initializing tables in {self._current_db_path}: {e}", exc_info=True)
|
|
if db_conn: db_conn.rollback()
|
|
finally:
|
|
if cursor: cursor.close()
|
|
|
|
def add_or_update_flight_daily(self,
|
|
icao24: str,
|
|
callsign: Optional[str],
|
|
origin_country: Optional[str],
|
|
detection_timestamp: float) -> Optional[int]:
|
|
"""
|
|
Adds a new flight to the daily DB or updates its last_seen_day.
|
|
Operates on the DB file for the UTC date of detection_timestamp.
|
|
|
|
Args:
|
|
icao24 (str): ICAO24 address.
|
|
callsign (Optional[str]): Callsign.
|
|
origin_country (Optional[str]): Origin country.
|
|
detection_timestamp (float): Unix timestamp (UTC) of flight data detection.
|
|
|
|
Returns:
|
|
Optional[int]: The flight_id, or None on error.
|
|
"""
|
|
if not icao24: # Should be caught by CanonicalFlightState constructor too
|
|
module_logger.warning("icao24 is required to add or update a flight.")
|
|
return None
|
|
|
|
event_date_utc = datetime.fromtimestamp(detection_timestamp, timezone.utc)
|
|
conn = self._get_db_connection(for_date_utc=event_date_utc)
|
|
if not conn:
|
|
module_logger.error("Failed to get DB connection for add_or_update_flight_daily.")
|
|
return None
|
|
|
|
cursor = None
|
|
flight_id: Optional[int] = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT flight_id, callsign FROM flights WHERE icao24 = ?", (icao24,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
flight_id = row["flight_id"]
|
|
existing_callsign = row["callsign"]
|
|
|
|
# Update callsign only if the new one is provided and different,
|
|
# or if the existing one was None/empty and the new one is provided.
|
|
final_callsign = existing_callsign
|
|
if callsign and callsign != existing_callsign:
|
|
final_callsign = callsign
|
|
elif not existing_callsign and callsign: # Existing was empty, new one is not
|
|
final_callsign = callsign
|
|
|
|
cursor.execute("""
|
|
UPDATE flights SET last_seen_day = ?, callsign = ?
|
|
WHERE flight_id = ?
|
|
""", (detection_timestamp, final_callsign, flight_id))
|
|
module_logger.debug(f"Updated flight icao24={icao24}, flight_id={flight_id}, last_seen_day={detection_timestamp:.0f} in {self._current_db_path}")
|
|
else:
|
|
cursor.execute("""
|
|
INSERT INTO flights (icao24, callsign, origin_country, first_seen_day, last_seen_day)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (icao24, callsign, origin_country, detection_timestamp, detection_timestamp))
|
|
flight_id = cursor.lastrowid
|
|
module_logger.info(f"Added new flight icao24={icao24}, flight_id={flight_id} to {self._current_db_path}")
|
|
conn.commit()
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"DB error in add_or_update_flight_daily for icao24={icao24}: {e}", exc_info=True)
|
|
if conn: conn.rollback()
|
|
flight_id = None
|
|
finally:
|
|
if cursor: cursor.close()
|
|
return flight_id
|
|
|
|
def add_position_daily(self,
|
|
flight_id: int,
|
|
flight_state_obj: CanonicalFlightState) -> Optional[int]: # MODIFIED: Accepts CanonicalFlightState
|
|
"""
|
|
Adds a new position record using data from a CanonicalFlightState object.
|
|
Operates on the DB file for the UTC date of flight_state_obj.timestamp.
|
|
|
|
Args:
|
|
flight_id (int): The flight_id from the 'flights' table.
|
|
flight_state_obj (CanonicalFlightState): The canonical flight state object.
|
|
|
|
Returns:
|
|
Optional[int]: The position_id of the inserted record, or None on error.
|
|
"""
|
|
if not flight_id:
|
|
module_logger.warning("flight_id is required to add a position.")
|
|
return None
|
|
if not isinstance(flight_state_obj, CanonicalFlightState):
|
|
module_logger.error(f"Invalid type for flight_state_obj: expected CanonicalFlightState, got {type(flight_state_obj)}")
|
|
return None
|
|
|
|
# Use the timestamp from the flight state object to determine the correct daily DB
|
|
event_date_utc = datetime.fromtimestamp(flight_state_obj.timestamp, timezone.utc)
|
|
conn = self._get_db_connection(for_date_utc=event_date_utc)
|
|
if not conn:
|
|
module_logger.error(f"Failed to get DB connection for add_position_daily (flight_id={flight_id}).")
|
|
return None
|
|
|
|
cursor = None
|
|
position_id: Optional[int] = None
|
|
recorded_at_ts = time.time() # Timestamp of actual DB record insertion
|
|
|
|
# Validate essential position data
|
|
if flight_state_obj.latitude is None or flight_state_obj.longitude is None:
|
|
module_logger.warning(
|
|
f"Skipping position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) "
|
|
f"due to missing latitude/longitude. Timestamp: {flight_state_obj.timestamp:.0f}"
|
|
)
|
|
return None
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
# MODIFIED: SQL query to include new fields from CanonicalFlightState
|
|
sql = """
|
|
INSERT INTO positions (
|
|
flight_id, detection_timestamp, latitude, longitude,
|
|
baro_altitude_m, geo_altitude_m, velocity_mps, true_track_deg,
|
|
vertical_rate_mps, on_ground, squawk, spi, position_source,
|
|
raw_data_provider, recorded_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
params = (
|
|
flight_id,
|
|
flight_state_obj.timestamp, # This is the primary timestamp from the canonical object
|
|
flight_state_obj.latitude,
|
|
flight_state_obj.longitude,
|
|
flight_state_obj.baro_altitude_m,
|
|
flight_state_obj.geo_altitude_m,
|
|
flight_state_obj.velocity_mps,
|
|
flight_state_obj.true_track_deg,
|
|
flight_state_obj.vertical_rate_mps, # New field
|
|
flight_state_obj.on_ground,
|
|
flight_state_obj.squawk, # New field
|
|
flight_state_obj.spi, # New field
|
|
str(flight_state_obj.position_source) if flight_state_obj.position_source is not None else None, # Store as TEXT
|
|
flight_state_obj.raw_data_provider, # New field
|
|
recorded_at_ts
|
|
)
|
|
|
|
cursor.execute(sql, params)
|
|
position_id = cursor.lastrowid
|
|
conn.commit()
|
|
module_logger.debug(
|
|
f"Added position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) "
|
|
f"at {flight_state_obj.timestamp:.0f} (pos_id={position_id}) in {self._current_db_path}"
|
|
)
|
|
except sqlite3.Error as e:
|
|
module_logger.error(
|
|
f"DB error in add_position_daily for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}): {e}",
|
|
exc_info=True
|
|
)
|
|
if conn: conn.rollback()
|
|
position_id = None
|
|
finally:
|
|
if cursor: cursor.close()
|
|
|
|
return position_id
|
|
|
|
def close_connection(self):
|
|
# ... (implementation as before) ...
|
|
if self._connection:
|
|
try:
|
|
self._connection.close()
|
|
module_logger.info(f"DB connection to {self._current_db_path} closed by request.")
|
|
except sqlite3.Error as e:
|
|
module_logger.error(f"Error explicitly closing DB connection to {self._current_db_path}: {e}")
|
|
finally:
|
|
self._connection = None
|
|
self._current_db_path = None
|
|
|
|
# Standalone test block
|
|
if __name__ == '__main__':
|
|
# ... (Test setup as before, using MockConfig and CanonicalFlightState) ...
|
|
import logging # Ensure logging is imported for basicConfig
|
|
from datetime import timedelta # Ensure timedelta is imported
|
|
|
|
logging.basicConfig(level=logging.DEBUG,
|
|
format="%(asctime)s [%(levelname)-8s] %(name)-20s : %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
module_logger.info("--- Running DataStorage Standalone Test (with CanonicalFlightState) ---")
|
|
|
|
class MockConfig:
|
|
DATABASE_DIRECTORY = os.path.join(os.getcwd(), "test_flight_data_storage_canonical")
|
|
DATABASE_FILENAME_FORMAT = "test_flights_canonical_%Y-%m-%d.db"
|
|
LOG_LEVEL = "DEBUG"
|
|
|
|
original_app_config = app_config
|
|
app_config_module_ref = globals()['app_config'] # Get a reference to the module-level app_config
|
|
globals()['app_config'] = MockConfig() # Temporarily override module-level app_config for test
|
|
|
|
storage = None
|
|
try:
|
|
storage = DataStorage()
|
|
|
|
current_ts_utc = time.time()
|
|
|
|
# Flight 1
|
|
state1_obj = CanonicalFlightState(
|
|
icao24="CANON01", callsign="CANONFLT1", origin_country="Canonistan",
|
|
timestamp=current_ts_utc, last_contact_timestamp=current_ts_utc,
|
|
latitude=51.0, longitude=11.0, on_ground=False, baro_altitude_m=9000.0,
|
|
velocity_mps=200.0, true_track_deg=90.0, vertical_rate_mps=10.0,
|
|
squawk="1234", spi=False, position_source=0, raw_data_provider="TestAdapter"
|
|
)
|
|
flight_id1 = storage.add_or_update_flight_daily(
|
|
state1_obj.icao24, state1_obj.callsign, state1_obj.origin_country, state1_obj.timestamp
|
|
)
|
|
if flight_id1:
|
|
module_logger.info(f"Flight 1 (icao={state1_obj.icao24}) added/updated, flight_id: {flight_id1}")
|
|
pos_id1 = storage.add_position_daily(flight_id1, state1_obj)
|
|
if pos_id1:
|
|
module_logger.info(f"Position 1 for flight_id {flight_id1} added, pos_id: {pos_id1}")
|
|
|
|
# Simulate update for Flight 1
|
|
time.sleep(0.01) # Ensure different timestamp
|
|
updated_ts_utc = time.time()
|
|
state1_update_obj = CanonicalFlightState(
|
|
icao24="CANON01", callsign="CANONFLT1A", origin_country="Canonistan", # Callsign changed
|
|
timestamp=updated_ts_utc, last_contact_timestamp=updated_ts_utc, # New timestamp
|
|
latitude=51.5, longitude=11.5, on_ground=False, baro_altitude_m=9050.0,
|
|
velocity_mps=205.0, true_track_deg=95.0, vertical_rate_mps=0.0, # Leveling off
|
|
squawk="1234", spi=False, position_source=0, raw_data_provider="TestAdapter"
|
|
)
|
|
flight_id1_upd = storage.add_or_update_flight_daily(
|
|
state1_update_obj.icao24, state1_update_obj.callsign, state1_update_obj.origin_country, state1_update_obj.timestamp
|
|
)
|
|
assert flight_id1_upd == flight_id1, "Flight ID should remain the same on update"
|
|
module_logger.info(f"Flight 1 (icao={state1_update_obj.icao24}) updated, flight_id: {flight_id1_upd}")
|
|
if flight_id1_upd:
|
|
pos_id1_upd = storage.add_position_daily(flight_id1_upd, state1_update_obj)
|
|
if pos_id1_upd:
|
|
module_logger.info(f"Position 2 for flight_id {flight_id1_upd} added, pos_id: {pos_id1_upd}")
|
|
|
|
# Flight 2 - for "tomorrow"
|
|
tomorrow_dt = datetime.now(timezone.utc) + timedelta(days=1)
|
|
tomorrow_ts_utc = tomorrow_dt.timestamp()
|
|
state2_obj = CanonicalFlightState(
|
|
icao24="CANON02", callsign="CANONFLT2", origin_country="OtherCanonPlace",
|
|
timestamp=tomorrow_ts_utc, last_contact_timestamp=tomorrow_ts_utc,
|
|
latitude=34.0, longitude=-118.0, on_ground=False, baro_altitude_m=12000.0,
|
|
velocity_mps=250.0, true_track_deg=270.0, raw_data_provider="TestAdapter"
|
|
)
|
|
flight_id2 = storage.add_or_update_flight_daily(
|
|
state2_obj.icao24, state2_obj.callsign, state2_obj.origin_country, state2_obj.timestamp
|
|
)
|
|
if flight_id2:
|
|
module_logger.info(f"Flight 2 (icao={state2_obj.icao24}) for tomorrow added, flight_id: {flight_id2}")
|
|
pos_id2 = storage.add_position_daily(flight_id2, state2_obj)
|
|
if pos_id2:
|
|
module_logger.info(f"Position for flight_id {flight_id2} (tomorrow) added, pos_id: {pos_id2}")
|
|
|
|
except Exception as e:
|
|
module_logger.critical(f"Error during DataStorage standalone test (Canonical): {e}", exc_info=True)
|
|
finally:
|
|
if storage:
|
|
storage.close_connection()
|
|
globals()['app_config'] = original_app_config # Restore original app_config
|
|
|
|
test_dir_to_check = os.path.join(os.getcwd(), "test_flight_data_storage_canonical")
|
|
if os.path.exists(test_dir_to_check):
|
|
module_logger.info(f"Test directory {test_dir_to_check} was used. Please check/clean manually.")
|
|
# import shutil
|
|
# shutil.rmtree(test_dir_to_check)
|
|
# module_logger.info(f"Test directory {test_dir_to_check} cleaned up.")
|
|
module_logger.info("--- DataStorage Standalone Test (Canonical) Finished ---") |