SXXXXXXX_FlightMonitor/flightmonitor/data/aircraft_database_manager.py
2025-05-30 09:42:05 +02:00

378 lines
20 KiB
Python

# FlightMonitor/data/aircraft_database_manager.py
import sqlite3
import csv
import os
from typing import Optional, Dict, Any, List
try:
from ..utils.logger import get_logger # Tentativo di import relativo
logger = get_logger(__name__)
except ImportError:
import logging # Fallback se l'import relativo fallisce (es. test standalone)
logger = logging.getLogger(__name__)
# Potresti voler configurare un handler di base qui se stai testando standalone
# logging.basicConfig(level=logging.INFO)
DEFAULT_AIRCRAFT_DB_FILENAME = "aircraft_database.db"
# Assumiamo che il DB vada nella stessa directory degli altri dati storici
# o in una directory definita in app_config
try:
from . import config as app_config
AIRCRAFT_DB_DIRECTORY = getattr(app_config, "DATABASE_DIRECTORY", "flight_data_history")
except ImportError:
AIRCRAFT_DB_DIRECTORY = "flight_data_history" # Fallback
class AircraftDatabaseManager:
def __init__(self, db_name: str = DEFAULT_AIRCRAFT_DB_FILENAME):
# Costruisci il percorso completo del database
# Usiamo os.path.abspath per essere sicuri del percorso
# e os.path.join per la compatibilità cross-platform.
db_dir_abs = os.path.abspath(AIRCRAFT_DB_DIRECTORY)
if not os.path.exists(db_dir_abs):
try:
os.makedirs(db_dir_abs)
logger.info(f"Directory per database aeromobili creata: {db_dir_abs}")
except OSError as e:
logger.error(f"Errore creazione directory {db_dir_abs}: {e}")
# Potresti voler sollevare un'eccezione qui o gestire diversamente
self.db_path: str = os.path.join(db_dir_abs, db_name)
self.conn: Optional[sqlite3.Connection] = None
self._connect()
self._create_table()
logger.info(f"AircraftDatabaseManager inizializzato per DB: {self.db_path}")
def _connect(self):
try:
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row # Per accedere ai risultati come dizionari
logger.info(f"Connesso al database aeromobili: {self.db_path}")
except sqlite3.Error as e:
logger.error(f"Errore connessione al database aeromobili {self.db_path}: {e}", exc_info=True)
self.conn = None # Assicura che conn sia None se la connessione fallisce
def _create_table(self):
if not self.conn:
logger.error("Nessuna connessione al database per creare la tabella.")
return
try:
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS aircraft_details (
icao24 TEXT PRIMARY KEY NOT NULL,
registration TEXT,
manufacturericao TEXT,
manufacturername TEXT,
model TEXT,
typecode TEXT,
serialnumber TEXT,
operator TEXT,
operatorcallsign TEXT,
operatoricao TEXT,
operatoriata TEXT,
owner TEXT,
country TEXT,
built_year INTEGER,
firstflightdate TEXT,
categorydescription TEXT,
engines TEXT,
icaoclass TEXT,
linenumber TEXT,
modes INTEGER,
notes TEXT,
status TEXT,
timestamp_metadata TEXT
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_aircraft_registration ON aircraft_details(registration);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_aircraft_typecode ON aircraft_details(typecode);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_aircraft_operator ON aircraft_details(operator);")
self.conn.commit()
logger.info("Tabella 'aircraft_details' e indici assicurati.")
except sqlite3.Error as e:
logger.error(f"Errore creazione tabella 'aircraft_details': {e}", exc_info=True)
def get_aircraft_details(self, icao24: str) -> Optional[Dict[str, Any]]:
if not self.conn:
logger.error("Nessuna connessione al database per get_aircraft_details.")
return None
if not icao24 or not isinstance(icao24, str):
logger.warning(f"Tentativo di get_aircraft_details con ICAO24 non valido: {icao24}")
return None
try:
cursor = self.conn.cursor()
cursor.execute("SELECT * FROM aircraft_details WHERE icao24 = ?", (icao24.lower(),)) # ICAO24 è case-insensitive nel CSV ma meglio normalizzare
row = cursor.fetchone()
if row:
# Converti sqlite3.Row in un dizionario standard
return dict(row)
return None
except sqlite3.Error as e:
logger.error(f"Errore DB in get_aircraft_details per {icao24}: {e}", exc_info=True)
return None
except Exception as e_generic:
logger.error(f"Errore generico in get_aircraft_details per {icao24}: {e_generic}", exc_info=True)
return None
def import_from_csv(self, csv_filepath: str, replace_existing: bool = True) -> Tuple[int, int]:
"""
Importa dati da un file CSV nella tabella aircraft_details.
Restituisce (righe processate, righe importate/aggiornate).
"""
if not self.conn:
logger.error("Nessuna connessione al database per import_from_csv.")
return 0, 0
if not os.path.exists(csv_filepath):
logger.error(f"File CSV non trovato: {csv_filepath}")
return 0, 0
logger.info(f"Inizio importazione da CSV: {csv_filepath}. Sostituzione: {replace_existing}")
processed_rows = 0
imported_rows = 0
try:
cursor = self.conn.cursor()
if replace_existing:
cursor.execute("DELETE FROM aircraft_details")
logger.info("Dati esistenti cancellati da aircraft_details per la sostituzione.")
with open(csv_filepath, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
# Mappa i nomi delle colonne del CSV ai nomi delle colonne del DB
# Questo è cruciale se i nomi non corrispondono esattamente
# o se vuoi escludere/rinominare alcune colonne.
# Per ora, assumiamo che i nomi delle colonne del CSV (dopo .lower())
# corrispondano ai nomi delle colonne del DB.
db_columns = [
'icao24', 'registration', 'manufacturericao', 'manufacturername',
'model', 'typecode', 'serialnumber', 'operator', 'operatorcallsign',
'operatoricao', 'operatoriata', 'owner', 'country', 'built_year',
'firstflightdate', 'categorydescription', 'engines', 'icaoclass',
'linenumber', 'modes', 'notes', 'status', 'timestamp_metadata'
]
# Nomi colonne CSV (basati sul tuo estratto)
# 'icao24','timestamp','acars','adsb','built','categoryDescription','country','engines',
# 'firstFlightDate','firstSeen','icaoAircraftClass','lineNumber','manufacturerIcao',
# 'manufacturerName','model','modes','nextReg','notes','operator','operatorCallsign',
# 'operatorIata','operatorIcao','owner','prevReg','regUntil','registered',
# 'registration','selCal','serialNumber','status','typecode','vdl'
for row_idx, row_data in enumerate(reader):
processed_rows += 1
try:
# Normalizza i dati e prepara i valori per l'inserimento
# Assicurati che i nomi delle chiavi in row_data corrispondano (case-insensitive)
# e gestisci i valori mancanti o vuoti.
icao24_val = row_data.get('icao24', '').strip().lower()
if not icao24_val:
logger.warning(f"Riga {row_idx+2} CSV: icao24 mancante o vuoto. Riga saltata.")
continue
# Costruisci il dizionario dei valori per l'SQL
# Converti built_year a INTEGER, gestendo stringhe vuote
built_year_str = row_data.get('built', '').strip()
built_year_val = int(built_year_str) if built_year_str.isdigit() else None
# Rinomina 'timestamp' del CSV in 'timestamp_metadata' per il DB
# e 'icaoAircraftClass' in 'icaoclass'
data_to_insert = {
'icao24': icao24_val,
'registration': row_data.get('registration', '').strip() or None,
'manufacturericao': row_data.get('manufacturerIcao', '').strip() or None,
'manufacturername': row_data.get('manufacturerName', '').strip() or None,
'model': row_data.get('model', '').strip() or None,
'typecode': row_data.get('typecode', '').strip() or None,
'serialnumber': row_data.get('serialNumber', '').strip() or None,
'operator': row_data.get('operator', '').strip() or None,
'operatorcallsign': row_data.get('operatorCallsign', '').strip() or None,
'operatoricao': row_data.get('operatorIcao', '').strip() or None,
'operatoriata': row_data.get('operatorIata', '').strip() or None,
'owner': row_data.get('owner', '').strip() or None,
'country': row_data.get('country', '').strip() or None,
'built_year': built_year_val,
'firstflightdate': row_data.get('firstFlightDate', '').strip() or None,
'categorydescription': row_data.get('categoryDescription', '').strip() or None,
'engines': row_data.get('engines', '').strip() or None,
'icaoclass': row_data.get('icaoAircraftClass', '').strip() or None,
'linenumber': row_data.get('lineNumber', '').strip() or None,
'modes': int(row_data.get('modes', '0')) if row_data.get('modes', '').isdigit() else None,
'notes': row_data.get('notes', '').strip() or None,
'status': row_data.get('status', '').strip() or None,
'timestamp_metadata': row_data.get('timestamp', '').strip() or None
}
# Crea la query SQL dinamicamente basata sulle chiavi presenti
cols_for_sql = []
vals_for_sql = []
for col_name in db_columns:
if col_name in data_to_insert: # Assicura che la colonna sia una di quelle che vogliamo inserire
cols_for_sql.append(col_name)
vals_for_sql.append(data_to_insert[col_name])
if not cols_for_sql: # Non dovrebbe accadere se icao24 è presente
logger.warning(f"Riga {row_idx+2} CSV: Nessuna colonna valida trovata per l'inserimento.")
continue
placeholders = ', '.join(['?'] * len(cols_for_sql))
sql_cols_str = ', '.join(cols_for_sql)
# Se replace_existing è True, facciamo INSERT OR REPLACE
# Altrimenti, potremmo fare INSERT OR IGNORE o gestire l'aggiornamento
# Per ora, con replace_existing=True, un semplice INSERT (dopo DELETE) va bene.
# Se replace_existing=False, dovremmo usare INSERT OR IGNORE o UPDATE
if replace_existing: # DELETE è stato fatto prima
sql = f"INSERT INTO aircraft_details ({sql_cols_str}) VALUES ({placeholders})"
else: # Logica di UPSERT (INSERT OR REPLACE o INSERT...ON CONFLICT)
# SQLite UPSERT (richiede SQLite 3.24.0+)
sql = f"""
INSERT INTO aircraft_details ({sql_cols_str}) VALUES ({placeholders})
ON CONFLICT(icao24) DO UPDATE SET
{', '.join([f'{col} = excluded.{col}' for col in cols_for_sql if col != 'icao24'])}
"""
# Se la versione di SQLite è più vecchia, dovresti fare un SELECT e poi INSERT o UPDATE.
# Per semplicità, usiamo INSERT OR REPLACE qui, che funziona come UPSERT.
sql = f"INSERT OR REPLACE INTO aircraft_details ({sql_cols_str}) VALUES ({placeholders})"
cursor.execute(sql, vals_for_sql)
imported_rows += 1
if imported_rows % 1000 == 0: # Log ogni 1000 righe
logger.info(f"Importate {imported_rows} righe...")
self.conn.commit() # Commit parziale per grandi file
except sqlite3.Error as e_sql:
logger.error(f"Riga {row_idx+2} CSV: Errore SQL durante l'inserimento di {row_data.get('icao24', 'N/A')}: {e_sql}")
except ValueError as e_val:
logger.error(f"Riga {row_idx+2} CSV: Errore di valore durante la preparazione dei dati per {row_data.get('icao24', 'N/A')}: {e_val}")
except Exception as e_row:
logger.error(f"Riga {row_idx+2} CSV: Errore generico durante l'elaborazione della riga {row_data.get('icao24', 'N/A')}: {e_row}", exc_info=False)
self.conn.commit() # Commit finale
logger.info(f"Importazione CSV completata. Righe processate: {processed_rows}, Righe importate/aggiornate: {imported_rows}")
except FileNotFoundError:
logger.error(f"File CSV non trovato durante l'apertura: {csv_filepath}")
except Exception as e_csv:
logger.error(f"Errore durante l'importazione del CSV {csv_filepath}: {e_csv}", exc_info=True)
if self.conn: self.conn.rollback() # Rollback in caso di errore grave
return processed_rows, imported_rows
def close_connection(self):
if self.conn:
try:
self.conn.close()
logger.info(f"Connessione al database aeromobili chiusa: {self.db_path}")
except sqlite3.Error as e:
logger.error(f"Errore chiusura connessione DB aeromobili: {e}")
finally:
self.conn = None
# Esempio di utilizzo standalone per testare l'importazione:
if __name__ == '__main__':
# Crea un logger di base per i test standalone
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Crea un finto file CSV per il test
test_csv_path = "test_aircraft_data.csv"
with open(test_csv_path, 'w', newline='', encoding='utf-8') as f_test:
writer = csv.writer(f_test)
writer.writerow([
'icao24','timestamp','built','categoryDescription','country','engines',
'firstFlightDate','icaoAircraftClass','lineNumber','manufacturerIcao',
'manufacturerName','model','modes','notes','operator','operatorCallsign',
'operatorIata','operatorIcao','owner','registration','serialNumber','status','typecode'
])
writer.writerow([
'a1b2c3','2023-01-01 10:00:00','2005','Light (< 15500 lbs)','USA','2',
'2005-05-10','L1P','123','BOEING',
'Boeing','737-800','1','Test notes','TestAir','TESTER',
'T1','TST','Test Owner','N123TA','SN12345','Active','B738'
])
writer.writerow([
'd4e5f6','2023-01-02 11:00:00','','No ADS-B Emitter Category Information','CAN','1',
'','L2J','456','AIRBUS',
'Airbus','A320','0','','CoolAir','COOLER',
'C2','CLR','Cool Owner','C-COOL','SN67890','','A320'
])
writer.writerow([ # Riga con ICAO24 duplicato per testare replace
'a1b2c3','2023-02-01 12:00:00','2006','Heavy (> 300000 lbs)','DE','4',
'2006-06-15','H1P','123A','BOEING_NEW',
'Boeing Corp','747-400','1','Updated notes','TestAirways','TESTER_UPD',
'T2','TSU','New Owner','N123TB','SN12345A','Inactive','B744'
])
db_manager = AircraftDatabaseManager(db_name="test_aircraft_db.sqlite")
if db_manager.conn: # Procede solo se la connessione è riuscita
logger.info("--- Inizio Test Importazione CSV ---")
# Test con replace_existing = True
p_rows, i_rows = db_manager.import_from_csv(test_csv_path, replace_existing=True)
logger.info(f"Importazione (replace=True) completata. Processate: {p_rows}, Importate: {i_rows}")
details1 = db_manager.get_aircraft_details("a1b2c3")
logger.info(f"Dettagli per a1b2c3 (dopo replace=True): {details1}")
assert details1 is not None
assert details1['built_year'] == 2006 # Dovrebbe avere i dati dell'ultima riga 'a1b2c3'
details2 = db_manager.get_aircraft_details("d4e5f6")
logger.info(f"Dettagli per d4e5f6: {details2}")
assert details2 is not None
assert details2['manufacturername'] == 'Airbus'
# Test con replace_existing = False (dovrebbe aggiornare a1b2c3 e inserire/ignorare d4e5f6)
# Modifichiamo il file CSV per il secondo test
with open(test_csv_path, 'w', newline='', encoding='utf-8') as f_test_update:
writer = csv.writer(f_test_update)
writer.writerow([
'icao24','timestamp','built','categoryDescription','country','registration','model'
]) # Meno colonne per testare
writer.writerow([
'a1b2c3','2023-03-01 10:00:00','2007','Super Heavy','UK','G-ABCD','747-8F' # Dati aggiornati per a1b2c3
])
writer.writerow([
'g7h8i9','2023-03-02 11:00:00','2010','Rotorcraft','FR','F-GHIJ','EC135' # Nuovo aereo
])
logger.info("\n--- Inizio Test Importazione CSV (replace=False) ---")
p_rows_upd, i_rows_upd = db_manager.import_from_csv(test_csv_path, replace_existing=False)
logger.info(f"Importazione (replace=False) completata. Processate: {p_rows_upd}, Importate/Aggiornate: {i_rows_upd}")
details1_upd = db_manager.get_aircraft_details("a1b2c3")
logger.info(f"Dettagli per a1b2c3 (dopo replace=False): {details1_upd}")
assert details1_upd is not None
assert details1_upd['built_year'] == 2007 # Aggiornato
assert details1_upd['registration'] == 'G-ABCD'
assert details1_upd['model'] == '747-8F'
assert details1_upd['manufacturername'] is None # Questo campo non era nel CSV di aggiornamento, quindi se l'UPSERT ha funzionato dovrebbe averlo mantenuto (o reso NULL se non c'era prima)
details_new = db_manager.get_aircraft_details("g7h8i9")
logger.info(f"Dettagli per g7h8i9 (nuovo): {details_new}")
assert details_new is not None
assert details_new['model'] == 'EC135'
details_d4 = db_manager.get_aircraft_details("d4e5f6")
logger.info(f"Dettagli per d4e5f6 (dovrebbe essere rimasto invariato): {details_d4}")
assert details_d4 is not None # Non dovrebbe essere stato toccato dall'import con replace=False
db_manager.close_connection()
logger.info("Test completati.")
# Rimuovi i file di test
#os.remove(test_csv_path)
#db_test_file_path = os.path.join(os.path.abspath(AIRCRAFT_DB_DIRECTORY), "test_aircraft_db.sqlite")
#if os.path.exists(db_test_file_path):
# os.remove(db_test_file_path)
#logger.info("File di test rimossi.")
else:
logger.error("Connessione al DB di test fallita. Test non eseguito.")