SXXXXXXX_FlightMonitor/flightmonitor/data/storage.py
2025-06-03 12:10:39 +02:00

686 lines
26 KiB
Python

# FlightMonitor/data/storage.py
import sqlite3
import os
from datetime import (
datetime,
timezone,
timedelta,
) # Aggiunto timedelta per il blocco di test
import time
from typing import Optional, List, Dict, Any # Aggiunto Any per il blocco di test
# Relative imports
from . import config as app_config
from ..utils.logger import get_logger
from .common_models import CanonicalFlightState
module_logger = get_logger(__name__)
class DataStorage:
"""
Handles storage of flight data into daily SQLite database files.
"""
def __init__(self):
self._current_db_path: Optional[str] = None
self._connection: Optional[sqlite3.Connection] = None
self._ensure_database_directory()
module_logger.info("DataStorage initialized.")
def _ensure_database_directory(self):
try:
abs_db_dir = os.path.abspath(app_config.DATABASE_DIRECTORY)
if not os.path.exists(abs_db_dir):
os.makedirs(abs_db_dir)
module_logger.info(f"Database directory created: {abs_db_dir}")
if not os.path.isdir(abs_db_dir):
raise OSError(f"Path {abs_db_dir} exists but is not a directory.")
if not os.access(abs_db_dir, os.W_OK):
raise OSError(f"Directory {abs_db_dir} is not writable.")
except OSError as e:
module_logger.error(
f"Database directory error for {app_config.DATABASE_DIRECTORY}: {e}",
exc_info=True,
)
raise
def _get_db_path_for_date(self, target_date_utc: Optional[datetime] = None) -> str:
if target_date_utc is None:
target_date_utc = datetime.now(timezone.utc)
db_filename = target_date_utc.strftime(app_config.DATABASE_FILENAME_FORMAT)
db_path = os.path.join(
os.path.abspath(app_config.DATABASE_DIRECTORY), db_filename
)
return db_path
def _get_db_connection(
self, for_date_utc: Optional[datetime] = None
) -> Optional[sqlite3.Connection]:
target_db_path = self._get_db_path_for_date(for_date_utc)
if self._connection and self._current_db_path == target_db_path:
return self._connection
if self._connection:
try:
self._connection.close()
module_logger.info(f"Closed DB connection to: {self._current_db_path}")
except sqlite3.Error as e:
module_logger.error(
f"Error closing previous DB connection to {self._current_db_path}: {e}"
)
self._connection = None
self._current_db_path = target_db_path
try:
self._connection = sqlite3.connect(target_db_path, timeout=10.0)
self._connection.execute("PRAGMA foreign_keys = ON;")
self._connection.row_factory = sqlite3.Row
module_logger.info(f"Opened DB connection to: {target_db_path}")
self._init_tables_if_not_exist(self._connection)
return self._connection
except sqlite3.Error as e:
module_logger.error(
f"Failed to connect/initialize database {target_db_path}: {e}",
exc_info=True,
)
if self._connection:
try:
self._connection.close()
except sqlite3.Error:
pass
self._connection = None
self._current_db_path = None
return None
def _init_tables_if_not_exist(self, db_conn: sqlite3.Connection):
cursor = None
try:
cursor = db_conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS flights (
flight_id INTEGER PRIMARY KEY AUTOINCREMENT,
icao24 TEXT NOT NULL UNIQUE,
callsign TEXT,
origin_country TEXT,
first_seen_day REAL NOT NULL,
last_seen_day REAL NOT NULL
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS positions (
position_id INTEGER PRIMARY KEY AUTOINCREMENT,
flight_id INTEGER NOT NULL,
detection_timestamp REAL NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
baro_altitude_m REAL,
geo_altitude_m REAL,
velocity_mps REAL,
true_track_deg REAL,
vertical_rate_mps REAL,
on_ground BOOLEAN NOT NULL,
squawk TEXT,
spi BOOLEAN,
position_source TEXT,
raw_data_provider TEXT,
recorded_at REAL NOT NULL,
FOREIGN KEY (flight_id) REFERENCES flights(flight_id) ON DELETE CASCADE
)
"""
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_positions_flight_id_timestamp ON positions (flight_id, detection_timestamp);"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_flights_icao24 ON flights (icao24);"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_positions_detection_timestamp ON positions (detection_timestamp);"
)
db_conn.commit()
module_logger.debug(
f"Database tables and indexes ensured in {self._current_db_path}."
)
except sqlite3.Error as e:
module_logger.error(
f"Error initializing tables in {self._current_db_path}: {e}",
exc_info=True,
)
if db_conn:
db_conn.rollback()
finally:
if cursor:
cursor.close()
def add_or_update_flight_daily(
self,
icao24: str,
callsign: Optional[str],
origin_country: Optional[str],
detection_timestamp: float,
) -> Optional[int]:
if not icao24:
module_logger.warning("icao24 is required to add or update a flight.")
return None
event_date_utc = datetime.fromtimestamp(detection_timestamp, timezone.utc)
conn = self._get_db_connection(for_date_utc=event_date_utc)
if not conn:
module_logger.error(
"Failed to get DB connection for add_or_update_flight_daily."
)
return None
cursor = None
flight_id: Optional[int] = None
try:
cursor = conn.cursor()
cursor.execute(
"SELECT flight_id, callsign FROM flights WHERE icao24 = ?", (icao24,)
)
row = cursor.fetchone()
if row:
flight_id = row["flight_id"]
existing_callsign = row["callsign"]
final_callsign = existing_callsign
if callsign and callsign != existing_callsign:
final_callsign = callsign
elif not existing_callsign and callsign:
final_callsign = callsign
cursor.execute(
"""
UPDATE flights SET last_seen_day = ?, callsign = ?
WHERE flight_id = ?
""",
(detection_timestamp, final_callsign, flight_id),
)
module_logger.debug(
f"Updated flight icao24={icao24}, flight_id={flight_id}, last_seen_day={detection_timestamp:.0f} in {self._current_db_path}"
)
else:
cursor.execute(
"""
INSERT INTO flights (icao24, callsign, origin_country, first_seen_day, last_seen_day)
VALUES (?, ?, ?, ?, ?)
""",
(
icao24,
callsign,
origin_country,
detection_timestamp,
detection_timestamp,
),
)
flight_id = cursor.lastrowid
module_logger.info(
f"Added new flight icao24={icao24}, flight_id={flight_id} to {self._current_db_path}"
)
conn.commit()
except sqlite3.Error as e:
module_logger.error(
f"DB error in add_or_update_flight_daily for icao24={icao24}: {e}",
exc_info=True,
)
if conn:
conn.rollback()
flight_id = None
finally:
if cursor:
cursor.close()
return flight_id
def add_position_daily(
self, flight_id: int, flight_state_obj: CanonicalFlightState
) -> Optional[int]:
if not flight_id:
module_logger.warning("flight_id is required to add a position.")
return None
if not isinstance(flight_state_obj, CanonicalFlightState):
module_logger.error(
f"Invalid type for flight_state_obj: expected CanonicalFlightState, got {type(flight_state_obj)}"
)
return None
event_date_utc = datetime.fromtimestamp(
flight_state_obj.timestamp, timezone.utc
)
conn = self._get_db_connection(for_date_utc=event_date_utc)
if not conn:
module_logger.error(
f"Failed to get DB connection for add_position_daily (flight_id={flight_id})."
)
return None
cursor = None
position_id: Optional[int] = None
recorded_at_ts = time.time()
if flight_state_obj.latitude is None or flight_state_obj.longitude is None:
module_logger.warning(
f"Skipping position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) "
f"due to missing latitude/longitude. Timestamp: {flight_state_obj.timestamp:.0f}"
)
return None
try:
cursor = conn.cursor()
sql = """
INSERT INTO positions (
flight_id, detection_timestamp, latitude, longitude,
baro_altitude_m, geo_altitude_m, velocity_mps, true_track_deg,
vertical_rate_mps, on_ground, squawk, spi, position_source,
raw_data_provider, recorded_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
flight_id,
flight_state_obj.timestamp,
flight_state_obj.latitude,
flight_state_obj.longitude,
flight_state_obj.baro_altitude_m,
flight_state_obj.geo_altitude_m,
flight_state_obj.velocity_mps,
flight_state_obj.true_track_deg,
flight_state_obj.vertical_rate_mps,
flight_state_obj.on_ground,
flight_state_obj.squawk,
flight_state_obj.spi,
(
str(flight_state_obj.position_source)
if flight_state_obj.position_source is not None
else None
),
flight_state_obj.raw_data_provider,
recorded_at_ts,
)
cursor.execute(sql, params)
position_id = cursor.lastrowid
conn.commit()
module_logger.debug(
f"Added position for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}) "
f"at {flight_state_obj.timestamp:.0f} (pos_id={position_id}) in {self._current_db_path}"
)
except sqlite3.Error as e:
module_logger.error(
f"DB error in add_position_daily for flight_id={flight_id} (ICAO: {flight_state_obj.icao24}): {e}",
exc_info=True,
)
if conn:
conn.rollback()
position_id = None
finally:
if cursor:
cursor.close()
return position_id
def get_flight_track_for_icao_on_date(
self, icao24_to_find: str, target_date_utc: Optional[datetime] = None
) -> List[CanonicalFlightState]:
"""
Retrieves all recorded positions for a given ICAO24 on a specific UTC date.
Returns a list of CanonicalFlightState objects, ordered by timestamp.
"""
if not icao24_to_find:
module_logger.warning("ICAO24 is required to get flight track.")
return []
if target_date_utc is None:
target_date_utc = datetime.now(timezone.utc)
conn = self._get_db_connection(for_date_utc=target_date_utc)
if not conn:
module_logger.error(
f"Failed to get DB connection for date {target_date_utc.strftime('%Y-%m-%d')} to retrieve track for {icao24_to_find}."
)
return []
track_points: List[CanonicalFlightState] = []
cursor = None
try:
cursor = conn.cursor()
cursor.execute(
"SELECT flight_id FROM flights WHERE icao24 = ?",
(icao24_to_find.lower(),),
)
flight_row = cursor.fetchone()
if not flight_row:
module_logger.info(
f"No flight record found for ICAO {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}."
)
return []
flight_id = flight_row["flight_id"]
module_logger.debug(
f"Found flight_id {flight_id} for ICAO {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}."
)
cursor.execute(
"""
SELECT * FROM positions
WHERE flight_id = ?
ORDER BY detection_timestamp ASC
""",
(flight_id,),
)
position_rows = cursor.fetchall()
if not position_rows:
module_logger.info(
f"No position records found for flight_id {flight_id} (ICAO {icao24_to_find}) on {target_date_utc.strftime('%Y-%m-%d')}."
)
return []
# Get column names from cursor.description to robustly check for key existence
# This is an alternative to pos_row.keys() if issues persist or for older sqlite3 versions
# col_names = [description[0] for description in cursor.description]
for pos_row in position_rows:
try:
state = CanonicalFlightState(
icao24=icao24_to_find,
timestamp=pos_row["detection_timestamp"],
last_contact_timestamp=pos_row["detection_timestamp"],
latitude=pos_row["latitude"],
longitude=pos_row["longitude"],
on_ground=bool(pos_row["on_ground"]),
callsign=None,
origin_country=None,
baro_altitude_m=(
pos_row["baro_altitude_m"]
if "baro_altitude_m" in pos_row.keys()
and pos_row["baro_altitude_m"] is not None
else None
),
geo_altitude_m=(
pos_row["geo_altitude_m"]
if "geo_altitude_m" in pos_row.keys()
and pos_row["geo_altitude_m"] is not None
else None
),
velocity_mps=(
pos_row["velocity_mps"]
if "velocity_mps" in pos_row.keys()
and pos_row["velocity_mps"] is not None
else None
),
true_track_deg=(
pos_row["true_track_deg"]
if "true_track_deg" in pos_row.keys()
and pos_row["true_track_deg"] is not None
else None
),
vertical_rate_mps=(
pos_row["vertical_rate_mps"]
if "vertical_rate_mps" in pos_row.keys()
and pos_row["vertical_rate_mps"] is not None
else None
),
squawk=(
pos_row["squawk"]
if "squawk" in pos_row.keys()
and pos_row["squawk"] is not None
else None
),
spi=(
bool(pos_row["spi"])
if "spi" in pos_row.keys() and pos_row["spi"] is not None
else None
),
position_source=(
pos_row["position_source"]
if "position_source" in pos_row.keys()
and pos_row["position_source"] is not None
else None
),
raw_data_provider=(
pos_row["raw_data_provider"]
if "raw_data_provider" in pos_row.keys()
and pos_row["raw_data_provider"] is not None
else None
),
)
track_points.append(state)
except KeyError as e:
module_logger.warning(
f"KeyError creating CanonicalFlightState from DB row for ICAO {icao24_to_find}: {e}. Row keys: {pos_row.keys()}"
)
except Exception as e_state_create:
module_logger.error(
f"Error creating CanonicalFlightState from DB row for ICAO {icao24_to_find}: {e_state_create}",
exc_info=True,
)
continue
module_logger.info(
f"Retrieved {len(track_points)} track points for ICAO {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}."
)
except sqlite3.Error as e:
module_logger.error(
f"DB error retrieving track for {icao24_to_find} on {target_date_utc.strftime('%Y-%m-%d')}: {e}",
exc_info=True,
)
except (
Exception
) as e_gen: # Catch-all for other potential errors during processing
module_logger.error(
f"Generic error retrieving track for {icao24_to_find}: {e_gen}",
exc_info=True,
)
finally:
if cursor:
cursor.close()
return track_points
def close_connection(self):
if self._connection:
try:
self._connection.close()
module_logger.info(
f"DB connection to {self._current_db_path} closed by request."
)
except sqlite3.Error as e:
module_logger.error(
f"Error explicitly closing DB connection to {self._current_db_path}: {e}"
)
finally:
self._connection = None
self._current_db_path = None
if __name__ == "__main__":
import logging # Ensure logging is imported for basicConfig
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)-8s] %(name)-20s : %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
module_logger.info(
"--- Running DataStorage Standalone Test (with CanonicalFlightState) ---"
)
class MockConfig:
DATABASE_DIRECTORY = os.path.join(
os.getcwd(), "test_flight_data_storage_canonical"
)
DATABASE_FILENAME_FORMAT = "test_flights_canonical_%Y-%m-%d.db"
LOG_LEVEL = "DEBUG" # Added for completeness if accessed
original_app_config = app_config
globals()["app_config"] = MockConfig()
storage = None
try:
storage = DataStorage()
current_ts_utc = time.time()
# Flight 1
state1_obj = CanonicalFlightState(
icao24="CANON01",
callsign="CANONFLT1",
origin_country="Canonistan",
timestamp=current_ts_utc,
last_contact_timestamp=current_ts_utc,
latitude=51.0,
longitude=11.0,
on_ground=False,
baro_altitude_m=9000.0,
geo_altitude_m=9020.0, # Added for test
velocity_mps=200.0,
true_track_deg=90.0,
vertical_rate_mps=10.0,
squawk="1234",
spi=False,
position_source=0,
raw_data_provider="TestAdapter",
)
flight_id1 = storage.add_or_update_flight_daily(
state1_obj.icao24,
state1_obj.callsign,
state1_obj.origin_country,
state1_obj.timestamp,
)
if flight_id1:
module_logger.info(
f"Flight 1 (icao={state1_obj.icao24}) added/updated, flight_id: {flight_id1}"
)
pos_id1 = storage.add_position_daily(flight_id1, state1_obj)
if pos_id1:
module_logger.info(
f"Position 1 for flight_id {flight_id1} added, pos_id: {pos_id1}"
)
time.sleep(0.01)
updated_ts_utc = time.time()
state1_update_obj = CanonicalFlightState(
icao24="CANON01",
callsign="CANONFLT1A",
origin_country="Canonistan",
timestamp=updated_ts_utc,
last_contact_timestamp=updated_ts_utc,
latitude=51.5,
longitude=11.5,
on_ground=False,
baro_altitude_m=9050.0,
geo_altitude_m=9070.0, # Added for test
velocity_mps=205.0,
true_track_deg=95.0,
vertical_rate_mps=0.0,
squawk="1234",
spi=False,
position_source=0,
raw_data_provider="TestAdapter",
)
flight_id1_upd = storage.add_or_update_flight_daily(
state1_update_obj.icao24,
state1_update_obj.callsign,
state1_update_obj.origin_country,
state1_update_obj.timestamp,
)
assert flight_id1_upd == flight_id1
module_logger.info(
f"Flight 1 (icao={state1_update_obj.icao24}) updated, flight_id: {flight_id1_upd}"
)
if flight_id1_upd:
pos_id1_upd = storage.add_position_daily(flight_id1_upd, state1_update_obj)
if pos_id1_upd:
module_logger.info(
f"Position 2 for flight_id {flight_id1_upd} added, pos_id: {pos_id1_upd}"
)
# Test get_flight_track_for_icao_on_date
module_logger.info(
f"--- Testing get_flight_track_for_icao_on_date for CANON01 ---"
)
retrieved_track = storage.get_flight_track_for_icao_on_date("CANON01")
assert (
len(retrieved_track) == 2
), f"Expected 2 track points for CANON01, got {len(retrieved_track)}"
if len(retrieved_track) == 2:
module_logger.info(
f"Retrieved {len(retrieved_track)} points for CANON01. First point: {retrieved_track[0]}"
)
assert retrieved_track[0].latitude == 51.0
assert retrieved_track[1].latitude == 51.5
assert retrieved_track[0].baro_altitude_m == 9000.0
assert (
retrieved_track[1].geo_altitude_m == 9070.0
) # Check a field from the update
# Flight 2 - for "tomorrow"
tomorrow_dt = datetime.now(timezone.utc) + timedelta(days=1)
tomorrow_ts_utc = tomorrow_dt.timestamp()
state2_obj = CanonicalFlightState(
icao24="CANON02",
callsign="CANONFLT2",
origin_country="OtherCanonPlace",
timestamp=tomorrow_ts_utc,
last_contact_timestamp=tomorrow_ts_utc,
latitude=34.0,
longitude=-118.0,
on_ground=False,
baro_altitude_m=12000.0,
geo_altitude_m=None, # Test with None
velocity_mps=250.0,
true_track_deg=270.0,
vertical_rate_mps=-5.0,
squawk="7700",
spi=True,
position_source=1,
raw_data_provider="TestAdapter",
)
flight_id2 = storage.add_or_update_flight_daily(
state2_obj.icao24,
state2_obj.callsign,
state2_obj.origin_country,
state2_obj.timestamp,
)
if flight_id2:
module_logger.info(
f"Flight 2 (icao={state2_obj.icao24}) for tomorrow added, flight_id: {flight_id2}"
)
pos_id2 = storage.add_position_daily(flight_id2, state2_obj)
if pos_id2:
module_logger.info(
f"Position for flight_id {flight_id2} (tomorrow) added, pos_id: {pos_id2}"
)
module_logger.info(
f"--- Testing get_flight_track_for_icao_on_date for CANON02 (tomorrow) ---"
)
retrieved_track_tmr = storage.get_flight_track_for_icao_on_date(
"CANON02", target_date_utc=tomorrow_dt
)
assert (
len(retrieved_track_tmr) == 1
), f"Expected 1 track point for CANON02, got {len(retrieved_track_tmr)}"
if retrieved_track_tmr:
module_logger.info(f"Retrieved point for CANON02: {retrieved_track_tmr[0]}")
assert retrieved_track_tmr[0].geo_altitude_m is None # Check None value
except Exception as e:
module_logger.critical(
f"Error during DataStorage standalone test (Canonical): {e}", exc_info=True
)
finally:
if storage:
storage.close_connection()
globals()["app_config"] = original_app_config
test_dir_to_check = os.path.join(
os.getcwd(), "test_flight_data_storage_canonical"
)
if os.path.exists(test_dir_to_check):
module_logger.info(
f"Test directory {test_dir_to_check} was used. Please check/clean manually."
)
module_logger.info("--- DataStorage Standalone Test (Canonical) Finished ---")