SXXXXXXX_FlightMonitor/flightmonitor/data/aircraft_database_manager.py
2025-05-30 14:13:58 +02:00

744 lines
30 KiB
Python

# FlightMonitor/data/aircraft_database_manager.py
import sqlite3
import csv
import os
import threading # For identifying the thread
from typing import Optional, Dict, Any, Tuple, Callable
try:
from ..utils.logger import get_logger
logger = get_logger(__name__)
except ImportError:
import logging
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s [%(levelname)s] %(funcName)s (%(threadName)s): %(message)s",
)
logger.warning("AircraftDatabaseManager using fallback standard Python logger.")
DEFAULT_AIRCRAFT_DB_FILENAME = "aircraft_database.db"
try:
from . import config as app_config
AIRCRAFT_DB_ROOT_DIRECTORY = getattr(
app_config, "DATABASE_DIRECTORY", "flight_data_history"
)
logger.info(
f"AircraftDatabaseManager will use directory from app_config: {AIRCRAFT_DB_ROOT_DIRECTORY}"
)
except ImportError:
AIRCRAFT_DB_ROOT_DIRECTORY = "flight_data_history"
logger.warning(
f"AircraftDatabaseManager: app_config not found. Using fallback directory: {AIRCRAFT_DB_ROOT_DIRECTORY}"
)
class AircraftDatabaseManager:
"""
Manages the local SQLite database containing static aircraft details.
Handles thread-safe connections for operations that might be called from different threads.
"""
def __init__(
self,
db_name: str = DEFAULT_AIRCRAFT_DB_FILENAME,
db_root_dir: Optional[str] = None,
):
actual_db_root_dir = (
db_root_dir if db_root_dir is not None else AIRCRAFT_DB_ROOT_DIRECTORY
)
self.db_path: str = os.path.join(os.path.abspath(actual_db_root_dir), db_name)
self.conn: Optional[sqlite3.Connection] = None
self._main_thread_id: int = threading.get_ident()
logger.info(f"Aircraft database path set to: {self.db_path}")
db_parent_dir = os.path.dirname(self.db_path)
if not os.path.exists(db_parent_dir):
try:
os.makedirs(db_parent_dir, exist_ok=True)
logger.info(f"Aircraft database directory created: {db_parent_dir}")
except OSError as e:
logger.error(
f"Critical error creating directory {db_parent_dir}: {e}. DB might be inaccessible.",
exc_info=True,
)
raise RuntimeError(
f"Unable to create database directory: {db_parent_dir}"
) from e
if threading.get_ident() == self._main_thread_id:
self._connect_safely()
if self.conn:
self._create_table()
else:
logger.error(
"Main database connection failed on init. Cannot ensure aircraft_details table exists."
)
logger.info(
f"AircraftDatabaseManager initialized for DB: {self.db_path}. Main thread ID: {self._main_thread_id}"
)
def _get_thread_safe_connection(self) -> Optional[sqlite3.Connection]:
current_thread_id = threading.get_ident()
if current_thread_id == self._main_thread_id:
if self.conn is None or not self._is_connection_alive(self.conn):
logger.debug(
f"Main thread ({current_thread_id}): Re-establishing or initial main connection."
)
self._connect_safely()
return self.conn
else:
logger.debug(
f"Worker thread ({current_thread_id}): Creating new local, temporary DB connection to {self.db_path}"
)
try:
local_conn = sqlite3.connect(self.db_path, timeout=10.0)
local_conn.row_factory = sqlite3.Row
return local_conn
except sqlite3.Error as e:
logger.error(
f"Worker thread ({current_thread_id}): Error creating local DB connection: {e}",
exc_info=True,
)
return None
def _connect_safely(self):
if threading.get_ident() != self._main_thread_id:
logger.error(
"_connect_safely called from non-main thread. This is for self.conn only."
)
return
try:
if self.conn and not self._is_connection_alive(self.conn):
try:
self.conn.close()
logger.debug("Closed invalid main DB connection.")
except sqlite3.Error:
pass
self.conn = None
if self.conn is None:
self.conn = sqlite3.connect(self.db_path, timeout=10.0)
self.conn.row_factory = sqlite3.Row
logger.info(
f"Main thread: Connected/Reconnected to aircraft database: {self.db_path}"
)
except sqlite3.Error as e:
logger.error(
f"Main thread: Error connecting/reconnecting to aircraft database {self.db_path}: {e}",
exc_info=True,
)
self.conn = None
def _is_connection_alive(self, conn_to_check: Optional[sqlite3.Connection]) -> bool:
if conn_to_check is None:
return False
try:
conn_to_check.execute("SELECT 1").fetchone()
return True
except sqlite3.Error:
return False
def _create_table(self):
if threading.get_ident() != self._main_thread_id:
logger.warning("Table creation attempt from a non-main thread. Aborting.")
return
if not self.conn:
logger.error(
"No main database connection to create aircraft_details table."
)
self._connect_safely()
if not self.conn:
logger.critical(
"Failed to establish main DB connection for table creation."
)
return
try:
with self.conn:
cursor = self.conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS aircraft_details (
icao24 TEXT PRIMARY KEY NOT NULL,
registration TEXT,
manufacturericao TEXT,
manufacturername TEXT,
model TEXT,
typecode TEXT,
serialnumber TEXT,
operator TEXT,
operatorcallsign TEXT,
operatoricao TEXT,
operatoriata TEXT,
owner TEXT,
country TEXT,
built_year INTEGER,
firstflightdate TEXT,
categorydescription TEXT,
engines TEXT,
icaoclass TEXT,
linenumber TEXT,
modes INTEGER,
notes TEXT,
status TEXT,
timestamp_metadata TEXT
)
"""
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_aircraft_registration ON aircraft_details(registration);"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_aircraft_typecode ON aircraft_details(typecode);"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_aircraft_operator ON aircraft_details(operator);"
)
logger.info(
"Table 'aircraft_details' and indexes ensured in DB (by main thread)."
)
except sqlite3.Error as e:
logger.error(
f"Error creating 'aircraft_details' table (by main thread): {e}",
exc_info=True,
)
def get_aircraft_details(self, icao24: str) -> Optional[Dict[str, Any]]:
conn = self._get_thread_safe_connection()
if not conn:
logger.error(
f"Could not get DB connection for get_aircraft_details (ICAO: {icao24})."
)
return None
icao24_clean = icao24.lower().strip()
details = None
try:
with conn:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM aircraft_details WHERE icao24 = ?", (icao24_clean,)
)
row = cursor.fetchone()
if row:
details = dict(row)
except sqlite3.Error as e:
logger.error(
f"DB error in get_aircraft_details for {icao24_clean}: {e}",
exc_info=True,
)
except Exception as e_generic:
logger.error(
f"Generic error in get_aircraft_details for {icao24_clean}: {e_generic}",
exc_info=True,
)
finally:
if conn is not self.conn and conn is not None:
try:
conn.close()
logger.debug(
f"Closed temporary DB connection for get_aircraft_details (Thread: {threading.get_ident()})."
)
except sqlite3.Error:
pass
return details
def _clean_csv_value(self, value: Optional[str]) -> Optional[str]:
"""Cleans a string value read from CSV: strips whitespace and removes surrounding single quotes."""
if value is None:
return None
cleaned = value.strip()
if len(cleaned) >= 2 and cleaned.startswith("'") and cleaned.endswith("'"):
cleaned = cleaned[1:-1]
return (
cleaned if cleaned else None
) # Return None if string becomes empty after cleaning
def import_from_csv(
self,
csv_filepath: str,
replace_existing: bool = True,
progress_callback: Optional[Callable[[int, int, Optional[int]], None]] = None,
total_rows_for_callback: Optional[int] = None,
) -> Tuple[int, int]:
conn = self._get_thread_safe_connection()
if not conn:
logger.error(
f"Could not get DB connection for import_from_csv (File: {csv_filepath})."
)
if progress_callback:
try:
progress_callback(0, 0, total_rows_for_callback)
except Exception as e_cb_fail:
logger.error(
f"Error in progress_callback during connection failure: {e_cb_fail}"
)
return 0, 0
current_thread_id = threading.get_ident()
logger.info(
f"Thread ({current_thread_id}) - Starting CSV import: {csv_filepath}. Replace mode: {replace_existing}"
)
try:
current_csv_limit = csv.field_size_limit()
desired_csv_limit = 10 * 1024 * 1024
if desired_csv_limit > current_csv_limit:
csv.field_size_limit(desired_csv_limit)
logger.info(
f"Thread ({current_thread_id}): Increased csv.field_size_limit from {current_csv_limit} to {desired_csv_limit} for this import."
)
except Exception as e_limit:
logger.error(
f"Thread ({current_thread_id}): Error attempting to increase csv.field_size_limit: {e_limit}"
)
processed_data_rows = 0
imported_or_updated_rows = 0
db_columns = [
"icao24",
"registration",
"manufacturericao",
"manufacturername",
"model",
"typecode",
"serialnumber",
"operator",
"operatorcallsign",
"operatoricao",
"operatoriata",
"owner",
"country",
"built_year",
"firstflightdate",
"categorydescription",
"engines",
"icaoclass",
"linenumber",
"modes",
"notes",
"status",
"timestamp_metadata",
]
csv_to_db_column_map = {
"icao24": "icao24",
"registration": "registration",
"manufacturericao": "manufacturericao",
"manufacturername": "manufacturername",
"model": "model",
"typecode": "typecode",
"serialnumber": "serialnumber",
"operator": "operator",
"operatorcallsign": "operatorcallsign",
"operatoricao": "operatoricao",
"operatoriata": "operatoriata",
"owner": "owner",
"country": "country",
"built": "built_year",
"firstflightdate": "firstflightdate",
"categorydescription": "categorydescription",
"engines": "engines",
"icaoaircraftclass": "icaoclass",
"linenumber": "linenumber",
"modes": "modes",
"notes": "notes",
"status": "status",
"timestamp": "timestamp_metadata",
}
callback_interval = 500
debug_first_n_rows = 0
try:
with conn:
cursor = conn.cursor()
if replace_existing:
cursor.execute("DELETE FROM aircraft_details")
logger.info(
f"Thread ({current_thread_id}): Existing data deleted from aircraft_details."
)
with open(csv_filepath, "r", encoding="utf-8-sig") as csvfile:
reader = csv.DictReader(csvfile)
if not reader.fieldnames:
logger.error(
f"Thread ({current_thread_id}): CSV file {csv_filepath} has no valid header or is empty."
)
if progress_callback:
progress_callback(0, 0, total_rows_for_callback)
return 0, 0
cleaned_fieldnames = []
for fn in reader.fieldnames:
if isinstance(fn, str):
temp_fn = fn.strip()
if temp_fn.startswith("'") and temp_fn.endswith("'"):
temp_fn = temp_fn[1:-1]
cleaned_fieldnames.append(temp_fn)
else:
cleaned_fieldnames.append(fn)
reader.fieldnames = cleaned_fieldnames
logger.debug(
f"Thread ({current_thread_id}): Cleaned CSV Header for DictReader: {reader.fieldnames}"
)
for csv_row_index, raw_row_data in enumerate(reader):
processed_data_rows += 1
if processed_data_rows <= debug_first_n_rows:
logger.info(
f"--- Debugging CSV Data Row {processed_data_rows} (Index {csv_row_index}) ---"
)
logger.info(
f"Raw row data from DictReader (cleaned keys): {raw_row_data}"
)
row_data_lower_keys = {}
for k, v_raw in raw_row_data.items():
key_clean = (
str(k).lower().strip()
) # Keys from DictReader (after fieldname cleaning) should be clean strings
# Clean the value by stripping whitespace and then surrounding single quotes
value_cleaned = self._clean_csv_value(
v_raw
if isinstance(v_raw, str)
else str(v_raw) if v_raw is not None else None
)
row_data_lower_keys[key_clean] = value_cleaned
if processed_data_rows <= debug_first_n_rows:
logger.info(
f"Row {processed_data_rows}: Normalized & Cleaned Value Keys: {row_data_lower_keys}"
)
icao24_val = (
row_data_lower_keys.get("icao24", "").strip().lower()
if row_data_lower_keys.get("icao24")
else ""
)
if processed_data_rows <= debug_first_n_rows:
logger.info(
f"Row {processed_data_rows}: Attempting to get 'icao24'. Value: '{row_data_lower_keys.get('icao24', 'KEY_NOT_FOUND')}' -> Final ICAO24: '{icao24_val}'"
)
if not icao24_val:
logger.warning(
f"Thread ({current_thread_id}) CSV Data Row {processed_data_rows}: Missing or empty icao24 after processing. Skipping row."
)
if (
progress_callback
and processed_data_rows % callback_interval == 0
):
try:
progress_callback(
processed_data_rows,
imported_or_updated_rows,
total_rows_for_callback,
)
except Exception as e_cb:
logger.error(
f"Thread ({current_thread_id}) Error in progress_callback (skipped row): {e_cb}"
)
continue
data_for_db = {"icao24": icao24_val}
for csv_col_lower, db_col_name in csv_to_db_column_map.items():
if csv_col_lower == "icao24":
continue
# Value already cleaned (including surrounding single quotes)
cleaned_value_from_dict = row_data_lower_keys.get(
csv_col_lower
)
final_value = None
if (
cleaned_value_from_dict is not None
): # Process if not None (empty string was converted to None by _clean_csv_value)
if db_col_name == "built_year":
final_value = (
int(cleaned_value_from_dict)
if str(cleaned_value_from_dict).isdigit()
else None
)
elif db_col_name == "modes":
final_value = (
int(cleaned_value_from_dict)
if str(cleaned_value_from_dict).isdigit()
else None
)
else:
final_value = (
cleaned_value_from_dict # Already cleaned
)
data_for_db[db_col_name] = final_value
cols_for_sql = []
vals_for_sql = []
for db_col_key in db_columns:
if db_col_key in data_for_db:
cols_for_sql.append(db_col_key)
vals_for_sql.append(data_for_db[db_col_key])
if not cols_for_sql or "icao24" not in cols_for_sql:
logger.warning(
f"Thread ({current_thread_id}) CSV Data Row {processed_data_rows}: No valid columns for SQL for ICAO {icao24_val}. Skipping."
)
continue
placeholders = ", ".join(["?"] * len(cols_for_sql))
sql_cols_str = ", ".join(cols_for_sql)
sql = f"INSERT OR REPLACE INTO aircraft_details ({sql_cols_str}) VALUES ({placeholders})"
try:
cursor.execute(sql, vals_for_sql)
imported_or_updated_rows += 1
except sqlite3.Error as e_sql:
logger.error(
f"Thread ({current_thread_id}) CSV Data Row {processed_data_rows}: SQL error for ICAO24 '{icao24_val}': {e_sql}. Data: {dict(zip(cols_for_sql, vals_for_sql))}"
)
if (
progress_callback
and processed_data_rows % callback_interval == 0
):
try:
progress_callback(
processed_data_rows,
imported_or_updated_rows,
total_rows_for_callback,
)
except Exception as e_cb:
logger.error(
f"Thread ({current_thread_id}) Error during progress_callback (loop): {e_cb}"
)
logger.info(
f"Thread ({current_thread_id}): CSV import finished. Processed data rows: {processed_data_rows}, DB rows imported/updated: {imported_or_updated_rows}"
)
except FileNotFoundError:
logger.error(
f"Thread ({current_thread_id}): CSV file not found: {csv_filepath}"
)
except csv.Error as e_csv_format:
logger.error(
f"Thread ({current_thread_id}): CSV format error in {csv_filepath} (around data row {processed_data_rows+1}): {e_csv_format}",
exc_info=True,
)
except Exception as e_general_import:
logger.error(
f"Thread ({current_thread_id}): General error during CSV import {csv_filepath}: {e_general_import}",
exc_info=True,
)
finally:
if progress_callback:
try:
progress_callback(
processed_data_rows,
imported_or_updated_rows,
total_rows_for_callback,
)
except Exception as e_cb_final:
logger.error(
f"Thread ({current_thread_id}) Error during final progress_callback: {e_cb_final}"
)
if conn is not self.conn and conn is not None:
try:
conn.close()
logger.debug(
f"Closed temporary DB connection for import_from_csv (Thread: {threading.get_ident()})."
)
except sqlite3.Error:
pass
return processed_data_rows, imported_or_updated_rows
def close_connection(self):
if self.conn and threading.get_ident() == self._main_thread_id:
try:
if self._is_connection_alive(self.conn):
self.conn.close()
logger.info(
f"Main aircraft database connection closed: {self.db_path}"
)
except sqlite3.Error as e:
logger.error(
f"Error closing main aircraft DB connection: {e}", exc_info=True
)
finally:
self.conn = None
elif self.conn and threading.get_ident() != self._main_thread_id:
logger.warning(
f"Attempt to close main connection from non-main thread ({threading.get_ident()}). Ignored."
)
if __name__ == "__main__":
# ... (Il blocco if __name__ == '__main__' rimane come nella versione precedente completa per i test,
# assicurati che il logger sia configurato e che i percorsi dei file di test siano corretti.
# Il test per le chiavi con apici dovrebbe ora funzionare meglio con la pulizia delle chiavi
# e dei valori implementata.)
if not logging.getLogger(__name__).hasHandlers():
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s [%(levelname)s] %(funcName)s (%(threadName)s): %(message)s",
)
logger.info(
"--- Starting Standalone AircraftDatabaseManager Test (Thread-Safe with Callback & Cleaned Keys/Values) ---"
)
current_script_dir = os.path.dirname(os.path.abspath(__file__))
test_data_base_dir = os.path.join(
os.path.dirname(current_script_dir), "test_artefacts"
)
test_data_dir = os.path.join(
test_data_base_dir, "aircraft_db_tests_standalone_cv"
) # cv for cleaned values
if not os.path.exists(test_data_dir):
os.makedirs(test_data_dir, exist_ok=True)
test_db_name = "test_aircraft_db_standalone_cv.sqlite"
test_csv_filename = "sample_aircraft_data_standalone_cv.csv"
test_csv_path = os.path.join(test_data_dir, test_csv_filename)
header_with_quotes = [
"'icao24'",
"'timestamp'",
"'built'",
"'categoryDescription'",
"'country'",
"'registration'",
"'model'",
"'manufacturerName'",
]
num_test_rows = 10 # Ridotto per test più veloci, aumenta se vuoi testare la callback di progresso
sample_data_for_cb_test = []
for i in range(num_test_rows):
icao = f"cv{i:04d}"
year = 2000 + (i % 20)
# Simula valori con e senza apici singoli attorno
reg_val = f"'N-CV{i:03}'" if i % 2 == 0 else f"F-CV{i:03}"
manu_val = f"'ManuCV_{i}'" if i % 3 == 0 else f"ManuCV_{i}_NoQuote"
sample_data_for_cb_test.append(
[
f"'{icao}'",
f"'2023-01-01 10:{i//60:02d}:{i%60:02d}'",
f"'{year}'",
"'Test Cat CV'",
"'US'",
reg_val,
f"'ModelCV_{i}'",
manu_val,
]
)
with open(test_csv_path, "w", newline="", encoding="utf-8-sig") as f_test:
writer = csv.writer(f_test)
writer.writerow(header_with_quotes)
writer.writerows(sample_data_for_cb_test)
logger.info(
f"Test CSV file with {num_test_rows} data rows and quoted headers/values created: {test_csv_path}"
)
db_manager_main = AircraftDatabaseManager(
db_name=test_db_name, db_root_dir=test_data_dir
)
def import_job_test(csv_p, db_mgr_instance, replace, cb=None):
thread_id = threading.get_ident()
logger.info(f"Starting import in THREAD: {thread_id} for file {csv_p}")
total_rows_in_file = 0
try:
with open(csv_p, "r", encoding="utf-8-sig") as f_count_thread:
r_thread = csv.reader(f_count_thread)
next(r_thread)
total_rows_in_file = sum(1 for _ in r_thread)
except:
total_rows_in_file = 0
p, i = db_mgr_instance.import_from_csv(
csv_p,
replace_existing=replace,
progress_callback=cb,
total_rows_for_callback=total_rows_in_file,
)
logger.info(
f"THREAD ({thread_id}) Import finished. Processed: {p}, Imported: {i}"
)
if total_rows_in_file > 0:
assert (
p == total_rows_in_file
), f"Thread {thread_id}: Processed rows ({p}) mismatch total ({total_rows_in_file})"
assert (
i == total_rows_in_file
), f"Thread {thread_id}: Imported rows ({i}) mismatch total ({total_rows_in_file})"
if db_manager_main.conn is not None:
logger.info(
"--- Starting CSV Import Test (replace=True) from THREAD with Quoted Headers/Values & Callback ---"
)
def test_progress_update(processed, imported, total):
logger.info(
f"Test Standalone Progress CB: Processed={processed}, Imported={imported}, Total={total if total is not None else 'N/A'}"
)
import_thread_cb = threading.Thread(
target=import_job_test,
args=(test_csv_path, db_manager_main, True, test_progress_update),
)
import_thread_cb.start()
import_thread_cb.join()
details_cv_test_0 = db_manager_main.get_aircraft_details("cv0000")
assert details_cv_test_0 is not None, "'cv0000' not found after import"
if details_cv_test_0:
logger.info(f"DB content for cv0000: {dict(details_cv_test_0)}")
assert (
details_cv_test_0["icao24"] == "cv0000"
), "ICAO24 mismatch or quotes still present in DB"
assert (
details_cv_test_0["registration"] == "N-CV000"
), "Registration value mismatch or quotes not handled"
assert (
details_cv_test_0["manufacturername"] == "ManuCV_0"
), "Manufacturer name mismatch or quotes not handled"
details_cv_test_1 = db_manager_main.get_aircraft_details(
"cv0001"
) # registration and manu name senza apici nel CSV
assert details_cv_test_1 is not None
if details_cv_test_1:
logger.info(f"DB content for cv0001: {dict(details_cv_test_1)}")
assert details_cv_test_1["registration"] == "F-CV001"
assert details_cv_test_1["manufacturername"] == "ManuCV_1_NoQuote"
db_manager_main.close_connection()
logger.info(
"Standalone AircraftDatabaseManager tests (with cleaned keys/values & callback) completed successfully."
)
else:
logger.error(
"Initial main DB connection failed in standalone test. Aborting tests."
)