SXXXXXXX_FlightMonitor/flightmonitor/controller/live_data_processor.py

186 lines
10 KiB
Python

# FlightMonitor/controller/live_data_processor.py
"""
Manages the processing of live flight data from the adapter's output queue.
It pulls data, saves it to storage, updates GUI elements, and handles adapter status messages.
"""
import tkinter as tk
from queue import Queue, Empty as QueueEmpty
from datetime import datetime, timezone
from typing import List, Optional, Dict, Any, TYPE_CHECKING
import time
from flightmonitor.utils.logger import get_logger
from flightmonitor.data.common_models import CanonicalFlightState
from flightmonitor.data.opensky_live_adapter import (
AdapterMessage,
MSG_TYPE_FLIGHT_DATA,
MSG_TYPE_ADAPTER_STATUS,
STATUS_STARTING,
STATUS_FETCHING,
STATUS_RECOVERED,
STATUS_RATE_LIMITED,
STATUS_API_ERROR_TEMPORARY,
STATUS_PERMANENT_FAILURE,
STATUS_STOPPED,
)
from flightmonitor.utils.gui_utils import (
GUI_STATUS_OK,
GUI_STATUS_WARNING,
GUI_STATUS_ERROR,
GUI_STATUS_FETCHING,
)
if TYPE_CHECKING:
from flightmonitor.controller.app_controller import AppController
from flightmonitor.gui.main_window import MainWindow
module_logger = get_logger(__name__)
GUI_QUEUE_CHECK_INTERVAL_MS = 150
class LiveDataProcessor:
"""
Processes live flight data from a queue, dispatching updates to various
parts of the application (storage, GUI) and managing adapter status.
"""
def __init__(self, app_controller: "AppController", flight_data_queue: Queue[AdapterMessage]):
self.app_controller = app_controller
self.flight_data_queue = flight_data_queue
self._gui_after_id: Optional[str] = None
module_logger.debug("LiveDataProcessor initialized.")
def start_processing_queue(self):
main_window = self.app_controller.main_window
if not (main_window and main_window.root and main_window.root.winfo_exists()):
module_logger.error("LiveDataProcessor: Cannot start queue processing. MainWindow or root is missing.")
return
if self._gui_after_id:
try:
main_window.root.after_cancel(self._gui_after_id)
except Exception: pass
module_logger.info("LiveDataProcessor: Starting live data queue processing loop.")
self._gui_after_id = main_window.root.after(GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle)
def stop_processing_queue(self):
main_window = self.app_controller.main_window
if self._gui_after_id and main_window and main_window.root and main_window.root.winfo_exists():
try:
main_window.root.after_cancel(self._gui_after_id)
module_logger.info("LiveDataProcessor: Stopped live data queue processing loop.")
except Exception: pass
finally:
self._gui_after_id = None
self._process_queue_cycle(is_final_flush=True)
def _process_queue_cycle(self, is_final_flush: bool = False):
main_window: Optional["MainWindow"] = self.app_controller.main_window
data_storage = self.app_controller.data_storage
aircraft_db_manager = self.app_controller.aircraft_db_manager
is_live_monitoring_active = self.app_controller.is_live_monitoring_active
active_detail_window_ref = self.app_controller.active_detail_window_ref
active_detail_window_icao = self.app_controller.active_detail_window_icao
if not (main_window and main_window.root and main_window.root.winfo_exists()):
self._gui_after_id = None
return
flight_payloads_this_cycle: List[CanonicalFlightState] = []
try:
while not self.flight_data_queue.empty():
message = self.flight_data_queue.get(block=False)
try:
message_type = message.get("type")
if message_type == MSG_TYPE_FLIGHT_DATA:
# MODIFICATO: Estrae il payload strutturato (canonico + grezzo)
payload_dict = message.get("payload", {})
flight_states_payload: Optional[List[CanonicalFlightState]] = payload_dict.get("canonical")
raw_json_payload: str = payload_dict.get("raw_json", "{}")
if flight_states_payload is not None:
# MODIFICATO: Notifica il controller per il salvataggio dei dati grezzi
if hasattr(self.app_controller, 'process_raw_data_logging'):
self.app_controller.process_raw_data_logging(raw_json_payload, flight_states_payload)
# MODIFICATO: Notifica il controller per aggiornare la tabella di riepilogo
if hasattr(self.app_controller, 'update_live_summary_table'):
self.app_controller.update_live_summary_table(time.time(), len(flight_states_payload))
flight_payloads_this_cycle.extend(flight_states_payload)
if data_storage:
for state in flight_states_payload:
if not isinstance(state, CanonicalFlightState): continue
try:
flight_id = data_storage.add_or_update_flight_daily(
icao24=state.icao24, callsign=state.callsign,
origin_country=state.origin_country, detection_timestamp=state.timestamp
)
if flight_id:
data_storage.add_position_daily(flight_id, state)
except Exception as e_db_add:
module_logger.error(f"LiveDataProcessor: Error saving flight/position to DB for ICAO {state.icao24}: {e_db_add}", exc_info=True)
if main_window.map_manager_instance and self.app_controller.is_live_monitoring_active:
main_window.map_manager_instance.update_flights_on_map(flight_states_payload)
gui_message = f"Live data: {len(flight_states_payload)} aircraft in area." if flight_states_payload else "Live data: No aircraft in area."
main_window.update_semaphore_and_status(GUI_STATUS_OK, gui_message)
else:
main_window.update_semaphore_and_status(GUI_STATUS_WARNING, "Received empty data payload from adapter.")
elif message_type == MSG_TYPE_ADAPTER_STATUS:
status_code = message.get("status_code")
gui_message_from_adapter = message.get("message", f"Adapter status: {status_code}")
gui_status_level = GUI_STATUS_OK
if status_code == STATUS_PERMANENT_FAILURE:
gui_status_level = GUI_STATUS_ERROR
self.app_controller.stop_live_monitoring(from_error=True)
elif status_code == STATUS_API_ERROR_TEMPORARY:
gui_status_level = GUI_STATUS_ERROR
elif status_code == STATUS_RATE_LIMITED:
gui_status_level = GUI_STATUS_WARNING
elif status_code == STATUS_FETCHING:
gui_status_level = GUI_STATUS_FETCHING
main_window.update_semaphore_and_status(gui_status_level, gui_message_from_adapter)
except Exception as e_msg_proc:
module_logger.error(f"LiveDataProcessor: Error processing adapter message: {e_msg_proc}", exc_info=True)
finally:
self.flight_data_queue.task_done()
if active_detail_window_ref and active_detail_window_icao and active_detail_window_ref.winfo_exists():
latest_live_data_for_detail_icao: Optional[Dict[str, Any]] = None
for state_obj in flight_payloads_this_cycle:
if state_obj.icao24 == active_detail_window_icao:
latest_live_data_for_detail_icao = state_obj.to_dict()
if latest_live_data_for_detail_icao:
static_data_upd = aircraft_db_manager.get_aircraft_details(active_detail_window_icao) if aircraft_db_manager else None
full_track_data_list_upd: List[Dict[str, Any]] = []
if data_storage:
try:
track_states_upd = data_storage.get_flight_track_for_icao_on_date(active_detail_window_icao, datetime.now(timezone.utc))
if track_states_upd:
full_track_data_list_upd = [s.to_dict() for s in track_states_upd]
except Exception as e_track_upd:
module_logger.error(f"LiveDataProcessor: Error retrieving updated track for detail view {active_detail_window_icao}: {e_track_upd}")
try:
active_detail_window_ref.update_details(static_data_upd, latest_live_data_for_detail_icao, full_track_data_list_upd)
except Exception as e_upd_detail_win:
module_logger.error(f"LiveDataProcessor: Error updating detail window for {active_detail_window_icao}: {e_upd_detail_win}", exc_info=True)
except QueueEmpty:
pass # No messages to process, which is normal
except Exception as e_outer:
module_logger.error(f"LiveDataProcessor: Outer error in _process_queue_cycle: {e_outer}", exc_info=True)
finally:
if not is_final_flush and is_live_monitoring_active and main_window.root.winfo_exists():
self._gui_after_id = main_window.root.after(GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle)
else:
self._gui_after_id = None