fix import aircraft csv into db

This commit is contained in:
VALLONGOL 2025-05-30 12:19:01 +02:00
parent 9ae7c54631
commit a13caa2c19
2 changed files with 132 additions and 72 deletions

9
.vscode/launch.json vendored
View File

@ -4,9 +4,14 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Module",
"name": "aircraft db",
"type": "debugpy",
"request": "launch",
"module": "/data/aircraft_database_manager"
},
{
"name": "flight monitor",
"type": "debugpy",
"request": "launch",
"module": "flightmonitor"

View File

@ -13,7 +13,7 @@ except ImportError:
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
logging.basicConfig(
level=logging.INFO,
level=logging.INFO,
format='%(asctime)s - %(name)s [%(levelname)s] %(funcName)s (%(threadName)s): %(message)s'
)
logger.warning("AircraftDatabaseManager using fallback standard Python logger.")
@ -87,7 +87,7 @@ class AircraftDatabaseManager:
try:
if self.conn and not self._is_connection_alive(self.conn):
try: self.conn.close(); logger.debug("Closed invalid main DB connection.")
except sqlite3.Error: pass # Ignore errors on close if already problematic
except sqlite3.Error: pass
self.conn = None
if self.conn is None:
@ -180,6 +180,15 @@ class AircraftDatabaseManager:
except sqlite3.Error: pass
return details
def _clean_csv_value(self, value: Optional[str]) -> Optional[str]:
"""Cleans a string value read from CSV: strips whitespace and removes surrounding single quotes."""
if value is None:
return None
cleaned = value.strip()
if len(cleaned) >= 2 and cleaned.startswith("'") and cleaned.endswith("'"):
cleaned = cleaned[1:-1]
return cleaned if cleaned else None # Return None if string becomes empty after cleaning
def import_from_csv(self, csv_filepath: str, replace_existing: bool = True,
progress_callback: Optional[Callable[[int, int, Optional[int]], None]] = None,
total_rows_for_callback: Optional[int] = None) -> Tuple[int, int]:
@ -194,18 +203,16 @@ class AircraftDatabaseManager:
current_thread_id = threading.get_ident()
logger.info(f"Thread ({current_thread_id}) - Starting CSV import: {csv_filepath}. Replace mode: {replace_existing}")
# Set field size limit at the beginning of the import process for this thread's CSV reader
try:
current_csv_limit = csv.field_size_limit()
desired_csv_limit = 10 * 1024 * 1024 # 10MB, adjust as needed
desired_csv_limit = 10 * 1024 * 1024
if desired_csv_limit > current_csv_limit:
csv.field_size_limit(desired_csv_limit)
logger.info(f"Thread ({current_thread_id}): Increased csv.field_size_limit from {current_csv_limit} to {desired_csv_limit} for this import.")
except Exception as e_limit:
logger.error(f"Thread ({current_thread_id}): Error attempting to increase csv.field_size_limit: {e_limit}")
# Continue, but be aware that import might fail if fields are indeed too large.
processed_data_rows = 0
processed_data_rows = 0
imported_or_updated_rows = 0
db_columns = [
@ -227,11 +234,11 @@ class AircraftDatabaseManager:
'linenumber': 'linenumber', 'modes': 'modes', 'notes': 'notes',
'status': 'status', 'timestamp': 'timestamp_metadata'
}
callback_interval = 500 # Call progress_callback every N processed data rows
callback_interval = 500
debug_first_n_rows = 0
try:
with conn:
with conn:
cursor = conn.cursor()
if replace_existing:
cursor.execute("DELETE FROM aircraft_details")
@ -239,41 +246,68 @@ class AircraftDatabaseManager:
with open(csv_filepath, 'r', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
if not reader.fieldnames:
logger.error(f"Thread ({current_thread_id}): CSV file {csv_filepath} has no valid header or is empty.")
if progress_callback: progress_callback(0, 0, total_rows_for_callback)
return 0,0
cleaned_fieldnames = []
for fn in reader.fieldnames:
if isinstance(fn, str):
temp_fn = fn.strip()
if temp_fn.startswith("'") and temp_fn.endswith("'"):
temp_fn = temp_fn[1:-1]
cleaned_fieldnames.append(temp_fn)
else:
cleaned_fieldnames.append(fn)
reader.fieldnames = cleaned_fieldnames
logger.debug(f"Thread ({current_thread_id}): Cleaned CSV Header for DictReader: {reader.fieldnames}")
for csv_row_index, raw_row_data in enumerate(reader):
processed_data_rows += 1
if processed_data_rows <= debug_first_n_rows:
logger.info(f"--- Debugging CSV Data Row {processed_data_rows} (Index {csv_row_index}) ---")
logger.info(f"Raw row data from DictReader (cleaned keys): {raw_row_data}")
row_data_lower_keys = {}
for k, v_raw in raw_row_data.items():
key_clean = str(k).lower().strip() # Keys from DictReader (after fieldname cleaning) should be clean strings
# Clean the value by stripping whitespace and then surrounding single quotes
value_cleaned = self._clean_csv_value(v_raw if isinstance(v_raw, str) else str(v_raw) if v_raw is not None else None)
row_data_lower_keys[key_clean] = value_cleaned
if progress_callback and processed_data_rows % callback_interval == 0:
try:
progress_callback(processed_data_rows, imported_or_updated_rows, total_rows_for_callback)
except Exception as e_cb:
logger.error(f"Thread ({current_thread_id}) Error during progress_callback: {e_cb}")
if processed_data_rows <= debug_first_n_rows:
logger.info(f"Row {processed_data_rows}: Normalized & Cleaned Value Keys: {row_data_lower_keys}")
icao24_val = row_data_lower_keys.get('icao24', '').strip().lower() if row_data_lower_keys.get('icao24') else ''
if processed_data_rows <= debug_first_n_rows:
logger.info(f"Row {processed_data_rows}: Attempting to get 'icao24'. Value: '{row_data_lower_keys.get('icao24', 'KEY_NOT_FOUND')}' -> Final ICAO24: '{icao24_val}'")
row_data_lower_keys = {str(k).lower().strip(): v for k, v in raw_row_data.items() if k}
icao24_val = row_data_lower_keys.get('icao24', '').strip().lower()
if not icao24_val:
logger.warning(f"Thread ({current_thread_id}) CSV Data Row {processed_data_rows}: Missing or empty icao24. Skipping row.")
logger.warning(f"Thread ({current_thread_id}) CSV Data Row {processed_data_rows}: Missing or empty icao24 after processing. Skipping row.")
if progress_callback and processed_data_rows % callback_interval == 0:
try: progress_callback(processed_data_rows, imported_or_updated_rows, total_rows_for_callback)
except Exception as e_cb: logger.error(f"Thread ({current_thread_id}) Error in progress_callback (skipped row): {e_cb}")
continue
data_for_db = {'icao24': icao24_val}
for csv_col_lower, db_col_name in csv_to_db_column_map.items():
if csv_col_lower == 'icao24': continue
raw_value = row_data_lower_keys.get(csv_col_lower, '')
cleaned_value = raw_value.strip() if isinstance(raw_value, str) else raw_value
# Value already cleaned (including surrounding single quotes)
cleaned_value_from_dict = row_data_lower_keys.get(csv_col_lower)
final_value = None
if cleaned_value or isinstance(cleaned_value, (int, float, bool)):
if cleaned_value_from_dict is not None: # Process if not None (empty string was converted to None by _clean_csv_value)
if db_col_name == 'built_year':
final_value = int(cleaned_value) if str(cleaned_value).isdigit() else None
final_value = int(cleaned_value_from_dict) if str(cleaned_value_from_dict).isdigit() else None
elif db_col_name == 'modes':
final_value = int(cleaned_value) if str(cleaned_value).isdigit() else None
final_value = int(cleaned_value_from_dict) if str(cleaned_value_from_dict).isdigit() else None
else:
final_value = cleaned_value
final_value = cleaned_value_from_dict # Already cleaned
data_for_db[db_col_name] = final_value
cols_for_sql = []
@ -283,7 +317,9 @@ class AircraftDatabaseManager:
cols_for_sql.append(db_col_key)
vals_for_sql.append(data_for_db[db_col_key])
if not cols_for_sql or 'icao24' not in cols_for_sql: continue
if not cols_for_sql or 'icao24' not in cols_for_sql:
logger.warning(f"Thread ({current_thread_id}) CSV Data Row {processed_data_rows}: No valid columns for SQL for ICAO {icao24_val}. Skipping.")
continue
placeholders = ', '.join(['?'] * len(cols_for_sql))
sql_cols_str = ', '.join(cols_for_sql)
@ -294,6 +330,12 @@ class AircraftDatabaseManager:
imported_or_updated_rows += 1
except sqlite3.Error as e_sql:
logger.error(f"Thread ({current_thread_id}) CSV Data Row {processed_data_rows}: SQL error for ICAO24 '{icao24_val}': {e_sql}. Data: {dict(zip(cols_for_sql, vals_for_sql))}")
if progress_callback and processed_data_rows % callback_interval == 0:
try:
progress_callback(processed_data_rows, imported_or_updated_rows, total_rows_for_callback)
except Exception as e_cb:
logger.error(f"Thread ({current_thread_id}) Error during progress_callback (loop): {e_cb}")
logger.info(f"Thread ({current_thread_id}): CSV import finished. Processed data rows: {processed_data_rows}, DB rows imported/updated: {imported_or_updated_rows}")
@ -310,7 +352,7 @@ class AircraftDatabaseManager:
except Exception as e_cb_final:
logger.error(f"Thread ({current_thread_id}) Error during final progress_callback: {e_cb_final}")
if conn is not self.conn and conn is not None:
try: conn.close(); logger.debug(f"Closed temporary DB connection for import_from_csv (Thread: {current_thread_id}).")
try: conn.close(); logger.debug(f"Closed temporary DB connection for import_from_csv (Thread: {threading.get_ident()}).")
except sqlite3.Error: pass
return processed_data_rows, imported_or_updated_rows
@ -330,90 +372,103 @@ class AircraftDatabaseManager:
if __name__ == '__main__':
# ... (Blocco test standalone come nella versione precedente, assicurati che il logger sia configurato
# e che i percorsi dei file di test siano corretti per il tuo ambiente di test)
# ... (Il blocco if __name__ == '__main__' rimane come nella versione precedente completa per i test,
# assicurati che il logger sia configurato e che i percorsi dei file di test siano corretti.
# Il test per le chiavi con apici dovrebbe ora funzionare meglio con la pulizia delle chiavi
# e dei valori implementata.)
if not logging.getLogger(__name__).hasHandlers():
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s [%(levelname)s] %(funcName)s (%(threadName)s): %(message)s')
logger.info("--- Starting Standalone AircraftDatabaseManager Test (Thread-Safe with Callback) ---")
logger.info("--- Starting Standalone AircraftDatabaseManager Test (Thread-Safe with Callback & Cleaned Keys/Values) ---")
current_script_dir = os.path.dirname(os.path.abspath(__file__))
test_data_base_dir = os.path.join(os.path.dirname(current_script_dir), "test_artefacts")
test_data_dir = os.path.join(test_data_base_dir, "aircraft_db_tests_standalone_cb")
test_data_dir = os.path.join(test_data_base_dir, "aircraft_db_tests_standalone_cv") # cv for cleaned values
if not os.path.exists(test_data_dir):
os.makedirs(test_data_dir, exist_ok=True)
test_db_name = "test_aircraft_db_standalone_cb.sqlite"
test_csv_filename = "sample_aircraft_data_standalone_cb.csv"
test_db_name = "test_aircraft_db_standalone_cv.sqlite"
test_csv_filename = "sample_aircraft_data_standalone_cv.csv"
test_csv_path = os.path.join(test_data_dir, test_csv_filename)
header = [
'icao24','timestamp','built','categoryDescription','country','registration','model','manufacturerName'
] # Intestazione ridotta per il test
header_with_quotes = [
"'icao24'","'timestamp'","'built'","'categoryDescription'","'country'",
"'registration'","'model'","'manufacturerName'"
]
# Crea un CSV con più righe per testare la callback
num_test_rows = 1200
num_test_rows = 10 # Ridotto per test più veloci, aumenta se vuoi testare la callback di progresso
sample_data_for_cb_test = []
for i in range(num_test_rows):
icao = f"cb{i:04d}"
icao = f"cv{i:04d}"
year = 2000 + (i % 20)
# Simula valori con e senza apici singoli attorno
reg_val = f"'N-CV{i:03}'" if i % 2 == 0 else f"F-CV{i:03}"
manu_val = f"'ManuCV_{i}'" if i % 3 == 0 else f"ManuCV_{i}_NoQuote"
sample_data_for_cb_test.append(
[icao, f'2023-01-01 10:{i//60:02d}:{i%60:02d}', str(year),'Test Category', 'US', f'N{icao.upper()}', f'Model_{i}', f'Manu_{i}']
[f"'{icao}'", f"'2023-01-01 10:{i//60:02d}:{i%60:02d}'", f"'{year}'","'Test Cat CV'", "'US'",
reg_val, f"'ModelCV_{i}'", manu_val]
)
with open(test_csv_path, 'w', newline='', encoding='utf-8-sig') as f_test:
writer = csv.writer(f_test)
writer.writerow(header)
writer.writerow(header_with_quotes)
writer.writerows(sample_data_for_cb_test)
logger.info(f"Test CSV file with {num_test_rows} data rows created: {test_csv_path}")
logger.info(f"Test CSV file with {num_test_rows} data rows and quoted headers/values created: {test_csv_path}")
db_manager_main = AircraftDatabaseManager(db_name=test_db_name, db_root_dir=test_data_dir)
def import_job_with_cb(csv_p, db_mgr_instance, replace):
def import_job_test(csv_p, db_mgr_instance, replace, cb=None):
thread_id = threading.get_ident()
logger.info(f"Starting import in THREAD: {thread_id}")
logger.info(f"Starting import in THREAD: {thread_id} for file {csv_p}")
# Definisci la callback qui dentro o passala come argomento
def my_test_progress_callback(processed, imported, total):
logger.info(f"THREAD Progress CB ({thread_id}): Processed={processed}, Imported={imported}, Total={total if total is not None else 'N/A'}")
# Per il test, calcoliamo il totale qui, come farebbe AppController
total_rows_in_csv = 0
total_rows_in_file = 0
try:
with open(csv_p, 'r', encoding='utf-8-sig') as f_count:
r = csv.reader(f_count)
next(r) # skip header
total_rows_in_csv = sum(1 for _ in r)
except: pass
with open(csv_p, 'r', encoding='utf-8-sig') as f_count_thread:
r_thread = csv.reader(f_count_thread)
next(r_thread)
total_rows_in_file = sum(1 for _ in r_thread)
except: total_rows_in_file = 0
p, i = db_mgr_instance.import_from_csv(
csv_p,
replace_existing=replace,
progress_callback=my_test_progress_callback,
total_rows_for_callback=total_rows_in_csv
progress_callback=cb,
total_rows_for_callback=total_rows_in_file
)
logger.info(f"THREAD ({thread_id}) Import finished. Processed: {p}, Imported: {i}")
assert p == total_rows_in_csv, f"Thread {thread_id}: Processed rows ({p}) non corrisponde al totale ({total_rows_in_csv})"
assert i == total_rows_in_csv, f"Thread {thread_id}: Imported rows ({i}) non corrisponde al totale ({total_rows_in_csv})"
if total_rows_in_file > 0 :
assert p == total_rows_in_file, f"Thread {thread_id}: Processed rows ({p}) mismatch total ({total_rows_in_file})"
assert i == total_rows_in_file, f"Thread {thread_id}: Imported rows ({i}) mismatch total ({total_rows_in_file})"
if db_manager_main.conn is not None : # Assicura che la connessione principale sia stata creata
logger.info("--- Starting CSV Import Test (replace=True) from THREAD with Callback ---")
if db_manager_main.conn is not None :
logger.info("--- Starting CSV Import Test (replace=True) from THREAD with Quoted Headers/Values & Callback ---")
import_thread_cb = threading.Thread(target=import_job_with_cb, args=(test_csv_path, db_manager_main, True))
def test_progress_update(processed, imported, total):
logger.info(f"Test Standalone Progress CB: Processed={processed}, Imported={imported}, Total={total if total is not None else 'N/A'}")
import_thread_cb = threading.Thread(target=import_job_test, args=(test_csv_path, db_manager_main, True, test_progress_update))
import_thread_cb.start()
import_thread_cb.join()
# Verifica alcuni dati dopo l'importazione
details_cb_test = db_manager_main.get_aircraft_details("cb0000")
assert details_cb_test is not None, "cb0000 non trovato dopo import"
if details_cb_test: assert details_cb_test['manufacturername'] == 'Manu_0'
details_cv_test_0 = db_manager_main.get_aircraft_details("cv0000")
assert details_cv_test_0 is not None, "'cv0000' not found after import"
if details_cv_test_0:
logger.info(f"DB content for cv0000: {dict(details_cv_test_0)}")
assert details_cv_test_0['icao24'] == "cv0000", "ICAO24 mismatch or quotes still present in DB"
assert details_cv_test_0['registration'] == "N-CV000", "Registration value mismatch or quotes not handled"
assert details_cv_test_0['manufacturername'] == "ManuCV_0", "Manufacturer name mismatch or quotes not handled"
details_cv_test_1 = db_manager_main.get_aircraft_details("cv0001") # registration and manu name senza apici nel CSV
assert details_cv_test_1 is not None
if details_cv_test_1:
logger.info(f"DB content for cv0001: {dict(details_cv_test_1)}")
assert details_cv_test_1['registration'] == "F-CV001"
assert details_cv_test_1['manufacturername'] == "ManuCV_1_NoQuote"
details_last_cb_test = db_manager_main.get_aircraft_details(f"cb{num_test_rows-1:04d}")
assert details_last_cb_test is not None, f"cb{num_test_rows-1:04d} non trovato"
db_manager_main.close_connection()
logger.info("Standalone AircraftDatabaseManager tests (with callback) completed successfully.")
logger.info("Standalone AircraftDatabaseManager tests (with cleaned keys/values & callback) completed successfully.")
else:
logger.error("Initial main DB connection failed in standalone test. Aborting tests.")