SXXXXXXX_FlightMonitor/flightmonitor/data/storage.py

571 lines
23 KiB
Python

# FlightMonitor/data/storage.py
import sqlite3
import os
from datetime import datetime, timezone
import time
from typing import Optional, List, Dict
# Relative imports
# MODIFIED: Import config as app_config
# WHY: Use the application-wide configuration module.
# HOW: Added import.
from . import config as app_config
# MODIFIED: Import get_logger from the centralized logger utility.
# WHY: Use the application's standard logger.
# HOW: Added import.
from ..utils.logger import get_logger
# MODIFIED: Import CanonicalFlightState model.
# WHY: Storage layer works with this standard format.
# HOW: Added import.
from .common_models import CanonicalFlightState # Import our canonical model
# MODIFIED: Get module-level logger using the centralized get_logger.
# WHY: Adhere to the application's logging standard.
# HOW: Changed logging.getLogger(__name__) to get_logger(__name__).
module_logger = get_logger(__name__)
class DataStorage:
"""
Handles storage of flight data into daily SQLite database files.
"""
def __init__(self):
self._current_db_path: Optional[str] = None
self._connection: Optional[sqlite3.Connection] = None
self._ensure_database_directory()
module_logger.info("DataStorage initialized.")
def _ensure_database_directory(self):
# ... (implementation as before) ...
try:
abs_db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY)
if not os.path.exists(abs_db_dir):
os.makedirs(abs_db_dir)
module_logger.info(f"Database directory created: {abs_db_dir}")
if not os.path.isdir(abs_db_dir):
raise OSError(f"Path {abs_db_dir} exists but is not a directory.")
if not os.access(abs_db_dir, os.W_OK):
raise OSError(f"Directory {abs_db_dir} is not writable.")
except OSError as e:
module_logger.error(
f"Database directory error for {app_config.DATABASE_DIRECTORY}: {e}",
exc_info=True,
)
raise
def _get_db_path_for_date(self, target_date_utc: Optional[datetime] = None) -> str:
# ... (implementation as before) ...
if target_date_utc is None:
target_date_utc = datetime.now(timezone.utc)
db_filename = target_date_utc.strftime(app_config.DATABASE_FILENAME_FORMAT)
db_path = os.path.join(
os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename
)
return db_path
def _get_db_connection(
self, for_date_utc: Optional[datetime] = None
) -> Optional[sqlite3.Connection]:
# ... (implementation as before, ensuring row_factory is set) ...
target_db_path = self._get_db_path_for_date(for_date_utc)
if self._connection and self._current_db_path == target_db_path:
return self._connection
if self._connection:
try:
self._connection.close()
module_logger.info(f"Closed DB connection to: {self._current_db_path}")
except sqlite3.Error as e:
module_logger.error(
f"Error closing previous DB connection to {self._current_db_path}: {e}"
)
self._connection = None
self._current_db_path = target_db_path
try:
self._connection = sqlite3.connect(target_db_path, timeout=10.0)
self._connection.execute("PRAGMA foreign_keys = ON;")
self._connection.row_factory = (
sqlite3.Row
) # Ensures select queries return dict-like rows
module_logger.info(f"Opened DB connection to: {target_db_path}")
self._init_tables_if_not_exist(self._connection)
return self._connection
except sqlite3.Error as e:
module_logger.error(
f"Failed to connect/initialize database {target_db_path}: {e}",
exc_info=True,
)
if self._connection:
try:
self._connection.close()
except sqlite3.Error:
pass
self._connection = None
self._current_db_path = None
return None
def _init_tables_if_not_exist(self, db_conn: sqlite3.Connection):
# ... (implementation as before) ...
# Schema includes:
# flights (flight_id, icao24, callsign, origin_country, first_seen_day, last_seen_day)
# positions (position_id, flight_id, detection_timestamp, latitude, longitude,
# baro_altitude, geo_altitude, velocity, true_track, on_ground, recorded_at)
cursor = None
try:
cursor = db_conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS flights (
flight_id INTEGER PRIMARY KEY AUTOINCREMENT,
icao24 TEXT NOT NULL UNIQUE,
callsign TEXT,
origin_country TEXT,
first_seen_day REAL NOT NULL,
last_seen_day REAL NOT NULL
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS positions (
position_id INTEGER PRIMARY KEY AUTOINCREMENT,
flight_id INTEGER NOT NULL,
detection_timestamp REAL NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
baro_altitude_m REAL, /* MODIFIED: Suffix _m for clarity */
geo_altitude_m REAL, /* MODIFIED: Suffix _m for clarity */
velocity_mps REAL, /* MODIFIED: Suffix _mps for clarity */
true_track_deg REAL, /* MODIFIED: Suffix _deg for clarity */
vertical_rate_mps REAL, /* ADDED: New field from CanonicalFlightState */
on_ground BOOLEAN NOT NULL,
squawk TEXT, /* ADDED: New field */
spi BOOLEAN, /* ADDED: New field */
position_source TEXT, /* ADDED: New field (TEXT to be flexible) */
raw_data_provider TEXT, /* ADDED: New field */
recorded_at REAL NOT NULL,
FOREIGN KEY (flight_id) REFERENCES flights(flight_id) ON DELETE CASCADE
)
"""
)
# Update indexes if new columns are frequently queried
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_positions_flight_id_timestamp ON positions (flight_id, detection_timestamp);"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_flights_icao24 ON flights (icao24);"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_positions_detection_timestamp ON positions (detection_timestamp);"
)
db_conn.commit()
module_logger.debug(
f"Database tables and indexes ensured in {self._current_db_path}."
)
except sqlite3.Error as e:
module_logger.error(
f"Error initializing tables in {self._current_db_path}: {e}",
exc_info=True,
)
if db_conn:
db_conn.rollback()
finally:
if cursor:
cursor.close()
def add_or_update_flight_daily(
self,
icao24: str,
callsign: Optional[str],
origin_country: Optional[str],
detection_timestamp: float,
) -> Optional[int]:
"""
Adds a new flight to the daily DB or updates its last_seen_day.
Operates on the DB file for the UTC date of detection_timestamp.
Args:
icao24 (str): ICAO24 address.
callsign (Optional[str]): Callsign.
origin_country (Optional[str]): Origin country.
detection_timestamp (float): Unix timestamp (UTC) of flight data detection.
Returns:
Optional[int]: The flight_id, or None on error.
"""
if not icao24: # Should be caught by CanonicalFlightState constructor too
module_logger.warning("icao24 is required to add or update a flight.")
return None
event_date_utc = datetime.fromtimestamp(detection_timestamp, timezone.utc)
conn = self._get_db_connection(for_date_utc=event_date_utc)
if not conn:
module_logger.error(
"Failed to get DB connection for add_or_update_flight_daily."
)
return None
cursor = None
flight_id: Optional[int] = None
try:
cursor = conn.cursor()
cursor.execute(
"SELECT flight_id, callsign FROM flights WHERE icao24 = ?", (icao24,)
)
row = cursor.fetchone()
if row:
flight_id = row["flight_id"]
existing_callsign = row["callsign"]
# Update callsign only if the new one is provided and different,
# or if the existing one was None/empty and the new one is provided.
final_callsign = existing_callsign
if callsign and callsign != existing_callsign:
final_callsign = callsign
elif (
not existing_callsign and callsign
): # Existing was empty, new one is not
final_callsign = callsign
cursor.execute(
"""
UPDATE flights SET last_seen_day = ?, callsign = ?
WHERE flight_id = ?
""",
(detection_timestamp, final_callsign, flight_id),
)
module_logger.debug(
f"Updated flight icao24={icao24}, flight_id={flight_id}, last_seen_day={detection_timestamp:.0f} in {self._current_db_path}"
)
else:
cursor.execute(
"""
INSERT INTO flights (icao24, callsign, origin_country, first_seen_day, last_seen_day)
VALUES (?, ?, ?, ?, ?)
""",
(
icao24,
callsign,
origin_country,
detection_timestamp,
detection_timestamp,
),
)
flight_id = cursor.lastrowid
module_logger.info(
f"Added new flight icao24={icao24}, flight_id={flight_id} to {self._current_db_path}"
)
conn.commit()
except sqlite3.Error as e:
module_logger.error(
f"DB error in add_or_update_flight_daily for icao24={icao24}: {e}",
exc_info=True,
)
if conn:
conn.rollback()
flight_id = None
finally:
if cursor:
cursor.close()
return flight_id
def add_position_daily(
self, flight_id: int, flight_state_obj: CanonicalFlightState
) -> Optional[int]: # MODIFIED: Accepts CanonicalFlightState
"""
Adds a new position record using data from a CanonicalFlightState object.
Operates on the DB file for the UTC date of flight_state_obj.timestamp.
Args:
flight_id (int): The flight_id from the 'flights' table.
flight_state_obj (CanonicalFlightState): The canonical flight state object.
Returns:
Optional[int]: The position_id of the inserted record, or None on error.
"""
if not flight_id:
module_logger.warning("flight_id is required to add a position.")
return None
if not isinstance(flight_state_obj, CanonicalFlightState):
module_logger.error(
f"Invalid type for flight_state_obj: expected CanonicalFlightState, got {type(flight_state_obj)}"
)
return None
# Use the timestamp from the flight state object to determine the correct daily DB
event_date_utc = datetime.fromtimestamp(
flight_state_obj.timestamp, timezone.utc
)
conn = self._get_db_connection(for_date_utc=event_date_utc)
if not conn:
module_logger.error(
f"Failed to get DB connection for add_position_daily (flight_id={flight_id})."
)
return None
cursor = None
position_id: Optional[int] = None
recorded_at_ts = time.time() # Timestamp of actual DB record insertion
# Validate essential position data
if flight_state_obj.latitude is None or flight_state_obj.longitude is None:
module_logger.warning(
f"Skipping position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) "
f"due to missing latitude/longitude. Timestamp: {flight_state_obj.timestamp:.0f}"
)
return None
try:
cursor = conn.cursor()
# MODIFIED: SQL query to include new fields from CanonicalFlightState
sql = """
INSERT INTO positions (
flight_id, detection_timestamp, latitude, longitude,
baro_altitude_m, geo_altitude_m, velocity_mps, true_track_deg,
vertical_rate_mps, on_ground, squawk, spi, position_source,
raw_data_provider, recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
flight_id,
flight_state_obj.timestamp, # This is the primary timestamp from the canonical object
flight_state_obj.latitude,
flight_state_obj.longitude,
flight_state_obj.baro_altitude_m,
flight_state_obj.geo_altitude_m,
flight_state_obj.velocity_mps,
flight_state_obj.true_track_deg,
flight_state_obj.vertical_rate_mps, # New field
flight_state_obj.on_ground,
flight_state_obj.squawk, # New field
flight_state_obj.spi, # New field
(
str(flight_state_obj.position_source)
if flight_state_obj.position_source is not None
else None
), # Store as TEXT
flight_state_obj.raw_data_provider, # New field
recorded_at_ts,
)
cursor.execute(sql, params)
position_id = cursor.lastrowid
conn.commit()
module_logger.debug(
f"Added position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) "
f"at {flight_state_obj.timestamp:.0f} (pos_id={position_id}) in {self._current_db_path}"
)
except sqlite3.Error as e:
module_logger.error(
f"DB error in add_position_daily for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}): {e}",
exc_info=True,
)
if conn:
conn.rollback()
position_id = None
finally:
if cursor:
cursor.close()
return position_id
def close_connection(self):
# ... (implementation as before) ...
if self._connection:
try:
self._connection.close()
module_logger.info(
f"DB connection to {self._current_db_path} closed by request."
)
except sqlite3.Error as e:
module_logger.error(
f"Error explicitly closing DB connection to {self._current_db_path}: {e}"
)
finally:
self._connection = None
self._current_db_path = None
# Standalone test block
if __name__ == "__main__":
# ... (Test setup as before, using MockConfig and CanonicalFlightState) ...
import logging # Ensure logging is imported for basicConfig
from datetime import timedelta # Ensure timedelta is imported
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)-8s] %(name)-20s : %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
module_logger.info(
"--- Running DataStorage Standalone Test (with CanonicalFlightState) ---"
)
class MockConfig:
DATABASE_DIRECTORY = os.path.join(
os.getcwd(), "test_flight_data_storage_canonical"
)
DATABASE_FILENAME_FORMAT = "test_flights_canonical_%Y-%m-%d.db"
# MODIFIED: Added LOG_LEVEL to MockConfig to avoid error if it's accessed by get_logger in storage.py (though it shouldn't be after the logger refactor)
# WHY: Defensive programming, ensures MockConfig is more complete if modules unexpectedly access it.
# HOW: Added LOG_LEVEL.
LOG_LEVEL = "DEBUG"
original_app_config = app_config
# MODIFIED: Get a reference to the module-level app_config before overwriting.
# WHY: Need to restore it at the end of the test.
# HOW: Used globals() to get the module reference.
app_config_module_ref = globals()[
"app_config"
] # Get a reference to the module-level app_config
globals()[
"app_config"
] = MockConfig() # Temporarily override module-level app_config for test
storage = None
try:
storage = DataStorage()
current_ts_utc = time.time()
# Flight 1
state1_obj = CanonicalFlightState(
icao24="CANON01",
callsign="CANONFLT1",
origin_country="Canonistan",
timestamp=current_ts_utc,
last_contact_timestamp=current_ts_utc,
latitude=51.0,
longitude=11.0,
on_ground=False,
baro_altitude_m=9000.0,
velocity_mps=200.0,
true_track_deg=90.0,
vertical_rate_mps=10.0,
squawk="1234",
spi=False,
position_source=0,
raw_data_provider="TestAdapter",
)
flight_id1 = storage.add_or_update_flight_daily(
state1_obj.icao24,
state1_obj.callsign,
state1_obj.origin_country,
state1_obj.timestamp,
)
if flight_id1:
module_logger.info(
f"Flight 1 (icao={state1_obj.icao24}) added/updated, flight_id: {flight_id1}"
)
pos_id1 = storage.add_position_daily(flight_id1, state1_obj)
if pos_id1:
module_logger.info(
f"Position 1 for flight_id {flight_id1} added, pos_id: {pos_id1}"
)
# Simulate update for Flight 1
time.sleep(0.01) # Ensure different timestamp
updated_ts_utc = time.time()
state1_update_obj = CanonicalFlightState(
icao24="CANON01",
callsign="CANONFLT1A",
origin_country="Canonistan", # Callsign changed
timestamp=updated_ts_utc,
last_contact_timestamp=updated_ts_utc, # New timestamp
latitude=51.5,
longitude=11.5,
on_ground=False,
baro_altitude_m=9050.0,
velocity_mps=205.0,
true_track_deg=95.0,
vertical_rate_mps=0.0, # Leveling off
squawk="1234",
spi=False,
position_source=0,
raw_data_provider="TestAdapter",
)
flight_id1_upd = storage.add_or_update_flight_daily(
state1_update_obj.icao24,
state1_update_obj.callsign,
state1_update_obj.origin_country,
state1_update_obj.timestamp,
)
assert (
flight_id1_upd == flight_id1
), "Flight ID should remain the same on update"
module_logger.info(
f"Flight 1 (icao={state1_update_obj.icao24}) updated, flight_id: {flight_id1_upd}"
)
if flight_id1_upd:
pos_id1_upd = storage.add_position_daily(flight_id1_upd, state1_update_obj)
if pos_id1_upd:
module_logger.info(
f"Position 2 for flight_id {flight_id1_upd} added, pos_id: {pos_id1_upd}"
)
# Flight 2 - for "tomorrow"
tomorrow_dt = datetime.now(timezone.utc) + timedelta(days=1)
tomorrow_ts_utc = tomorrow_dt.timestamp()
state2_obj = CanonicalFlightState(
icao24="CANON02",
callsign="CANONFLT2",
origin_country="OtherCanonPlace",
timestamp=tomorrow_ts_utc,
last_contact_timestamp=tomorrow_ts_utc,
latitude=34.0,
longitude=-118.0,
on_ground=False,
baro_altitude_m=12000.0,
velocity_mps=250.0,
true_track_deg=270.0,
vertical_rate_mps=-5.0, # Descending
squawk="7700",
spi=True,
position_source=1,
raw_data_provider="TestAdapter", # Emergency squawk, different source
)
flight_id2 = storage.add_or_update_flight_daily(
state2_obj.icao24,
state2_obj.callsign,
state2_obj.origin_country,
state2_obj.timestamp,
)
if flight_id2:
module_logger.info(
f"Flight 2 (icao={state2_obj.icao24}) for tomorrow added, flight_id: {flight_id2}"
)
pos_id2 = storage.add_position_daily(flight_id2, state2_obj)
if pos_id2:
module_logger.info(
f"Position for flight_id {flight_id2} (tomorrow) added, pos_id: {pos_id2}"
)
except Exception as e:
module_logger.critical(
f"Error during DataStorage standalone test (Canonical): {e}", exc_info=True
)
finally:
if storage:
storage.close_connection()
# MODIFIED: Restore the original app_config after the test.
# WHY: Avoid affecting other parts of the application if this script is run as part of a larger test suite.
# HOW: Assigned the saved reference back to the module's app_config using globals().
globals()["app_config"] = original_app_config # Restore original app_config
test_dir_to_check = os.path.join(
os.getcwd(), "test_flight_data_storage_canonical"
)
if os.path.exists(test_dir_to_check):
module_logger.info(
f"Test directory {test_dir_to_check} was used. Please check/clean manually."
)
# import shutil
# shutil.rmtree(test_dir_to_check)
# module_logger.info(f"Test directory {test_dir_to_check} cleaned up.")
module_logger.info("--- DataStorage Standalone Test (Canonical) Finished ---")