279 lines
12 KiB
Python
279 lines
12 KiB
Python
# FlightMonitor/controller/live_data_processor.py
|
|
"""
|
|
Manages the processing of live flight data from the adapter's output queue.
|
|
It pulls data, saves it to storage, updates GUI elements, and handles adapter status messages.
|
|
"""
|
|
import tkinter as tk
|
|
from queue import Queue, Empty as QueueEmpty
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional, Dict, Any, TYPE_CHECKING
|
|
import time
|
|
|
|
from flightmonitor.utils.logger import get_logger
|
|
from flightmonitor.data.common_models import CanonicalFlightState
|
|
from flightmonitor.data.opensky_live_adapter import (
|
|
AdapterMessage,
|
|
MSG_TYPE_FLIGHT_DATA,
|
|
MSG_TYPE_ADAPTER_STATUS,
|
|
STATUS_STARTING,
|
|
STATUS_FETCHING,
|
|
STATUS_RECOVERED,
|
|
STATUS_RATE_LIMITED,
|
|
STATUS_API_ERROR_TEMPORARY,
|
|
STATUS_PERMANENT_FAILURE,
|
|
STATUS_STOPPED,
|
|
)
|
|
from flightmonitor.utils.gui_utils import (
|
|
GUI_STATUS_OK,
|
|
GUI_STATUS_WARNING,
|
|
GUI_STATUS_ERROR,
|
|
GUI_STATUS_FETCHING,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from flightmonitor.controller.app_controller import AppController
|
|
from flightmonitor.gui.main_window import MainWindow
|
|
|
|
module_logger = get_logger(__name__)
|
|
|
|
GUI_QUEUE_CHECK_INTERVAL_MS = 150
|
|
|
|
|
|
class LiveDataProcessor:
|
|
"""
|
|
Processes live flight data from a queue, dispatching updates to various
|
|
parts of the application (storage, GUI) and managing adapter status.
|
|
"""
|
|
|
|
def __init__(
|
|
self, app_controller: "AppController", flight_data_queue: Queue[AdapterMessage]
|
|
):
|
|
self.app_controller = app_controller
|
|
self.flight_data_queue = flight_data_queue
|
|
self._gui_after_id: Optional[str] = None
|
|
module_logger.debug("LiveDataProcessor initialized.")
|
|
|
|
def start_processing_queue(self):
|
|
main_window = self.app_controller.main_window
|
|
if not (main_window and main_window.root and main_window.root.winfo_exists()):
|
|
module_logger.error(
|
|
"LiveDataProcessor: Cannot start queue processing. MainWindow or root is missing."
|
|
)
|
|
return
|
|
|
|
if self._gui_after_id:
|
|
try:
|
|
main_window.root.after_cancel(self._gui_after_id)
|
|
except Exception:
|
|
pass
|
|
|
|
module_logger.info(
|
|
"LiveDataProcessor: Starting live data queue processing loop."
|
|
)
|
|
self._gui_after_id = main_window.root.after(
|
|
GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle
|
|
)
|
|
|
|
def stop_processing_queue(self):
|
|
main_window = self.app_controller.main_window
|
|
if (
|
|
self._gui_after_id
|
|
and main_window
|
|
and main_window.root
|
|
and main_window.root.winfo_exists()
|
|
):
|
|
try:
|
|
main_window.root.after_cancel(self._gui_after_id)
|
|
module_logger.info(
|
|
"LiveDataProcessor: Stopped live data queue processing loop."
|
|
)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
self._gui_after_id = None
|
|
|
|
self._process_queue_cycle(is_final_flush=True)
|
|
|
|
def _process_queue_cycle(self, is_final_flush: bool = False):
|
|
main_window: Optional["MainWindow"] = self.app_controller.main_window
|
|
data_storage = self.app_controller.data_storage
|
|
aircraft_db_manager = self.app_controller.aircraft_db_manager
|
|
is_live_monitoring_active = self.app_controller.is_live_monitoring_active
|
|
active_detail_window_ref = self.app_controller.active_detail_window_ref
|
|
active_detail_window_icao = self.app_controller.active_detail_window_icao
|
|
|
|
if not (main_window and main_window.root and main_window.root.winfo_exists()):
|
|
self._gui_after_id = None
|
|
return
|
|
|
|
flight_payloads_this_cycle: List[CanonicalFlightState] = []
|
|
try:
|
|
while not self.flight_data_queue.empty():
|
|
message = self.flight_data_queue.get(block=False)
|
|
|
|
try:
|
|
message_type = message.get("type")
|
|
if message_type == MSG_TYPE_FLIGHT_DATA:
|
|
# MODIFICATO: Estrae il payload strutturato (canonico + grezzo)
|
|
payload_dict = message.get("payload", {})
|
|
flight_states_payload: Optional[List[CanonicalFlightState]] = (
|
|
payload_dict.get("canonical")
|
|
)
|
|
raw_json_payload: str = payload_dict.get("raw_json", "{}")
|
|
|
|
if flight_states_payload is not None:
|
|
# MODIFICATO: Notifica il controller per il salvataggio dei dati grezzi
|
|
if hasattr(self.app_controller, "process_raw_data_logging"):
|
|
self.app_controller.process_raw_data_logging(
|
|
raw_json_payload, flight_states_payload
|
|
)
|
|
|
|
# MODIFICATO: Notifica il controller per aggiornare la tabella di riepilogo
|
|
if hasattr(
|
|
self.app_controller, "update_live_summary_table"
|
|
):
|
|
self.app_controller.update_live_summary_table(
|
|
time.time(), len(flight_states_payload)
|
|
)
|
|
|
|
flight_payloads_this_cycle.extend(flight_states_payload)
|
|
if data_storage:
|
|
for state in flight_states_payload:
|
|
if not isinstance(state, CanonicalFlightState):
|
|
continue
|
|
try:
|
|
flight_id = (
|
|
data_storage.add_or_update_flight_daily(
|
|
icao24=state.icao24,
|
|
callsign=state.callsign,
|
|
origin_country=state.origin_country,
|
|
detection_timestamp=state.timestamp,
|
|
)
|
|
)
|
|
if flight_id:
|
|
data_storage.add_position_daily(
|
|
flight_id, state
|
|
)
|
|
except Exception as e_db_add:
|
|
module_logger.error(
|
|
f"LiveDataProcessor: Error saving flight/position to DB for ICAO {state.icao24}: {e_db_add}",
|
|
exc_info=True,
|
|
)
|
|
|
|
if (
|
|
main_window.map_manager_instance
|
|
and self.app_controller.is_live_monitoring_active
|
|
):
|
|
main_window.map_manager_instance.update_flights_on_map(
|
|
flight_states_payload
|
|
)
|
|
|
|
gui_message = (
|
|
f"Live data: {len(flight_states_payload)} aircraft in area."
|
|
if flight_states_payload
|
|
else "Live data: No aircraft in area."
|
|
)
|
|
main_window.update_semaphore_and_status(
|
|
GUI_STATUS_OK, gui_message
|
|
)
|
|
else:
|
|
main_window.update_semaphore_and_status(
|
|
GUI_STATUS_WARNING,
|
|
"Received empty data payload from adapter.",
|
|
)
|
|
|
|
elif message_type == MSG_TYPE_ADAPTER_STATUS:
|
|
status_code = message.get("status_code")
|
|
gui_message_from_adapter = message.get(
|
|
"message", f"Adapter status: {status_code}"
|
|
)
|
|
|
|
gui_status_level = GUI_STATUS_OK
|
|
if status_code == STATUS_PERMANENT_FAILURE:
|
|
gui_status_level = GUI_STATUS_ERROR
|
|
self.app_controller.stop_live_monitoring(from_error=True)
|
|
elif status_code == STATUS_API_ERROR_TEMPORARY:
|
|
gui_status_level = GUI_STATUS_ERROR
|
|
elif status_code == STATUS_RATE_LIMITED:
|
|
gui_status_level = GUI_STATUS_WARNING
|
|
elif status_code == STATUS_FETCHING:
|
|
gui_status_level = GUI_STATUS_FETCHING
|
|
|
|
main_window.update_semaphore_and_status(
|
|
gui_status_level, gui_message_from_adapter
|
|
)
|
|
|
|
except Exception as e_msg_proc:
|
|
module_logger.error(
|
|
f"LiveDataProcessor: Error processing adapter message: {e_msg_proc}",
|
|
exc_info=True,
|
|
)
|
|
finally:
|
|
self.flight_data_queue.task_done()
|
|
|
|
if (
|
|
active_detail_window_ref
|
|
and active_detail_window_icao
|
|
and active_detail_window_ref.winfo_exists()
|
|
):
|
|
latest_live_data_for_detail_icao: Optional[Dict[str, Any]] = None
|
|
for state_obj in flight_payloads_this_cycle:
|
|
if state_obj.icao24 == active_detail_window_icao:
|
|
latest_live_data_for_detail_icao = state_obj.to_dict()
|
|
|
|
if latest_live_data_for_detail_icao:
|
|
static_data_upd = (
|
|
aircraft_db_manager.get_aircraft_details(
|
|
active_detail_window_icao
|
|
)
|
|
if aircraft_db_manager
|
|
else None
|
|
)
|
|
full_track_data_list_upd: List[Dict[str, Any]] = []
|
|
if data_storage:
|
|
try:
|
|
track_states_upd = (
|
|
data_storage.get_flight_track_for_icao_on_date(
|
|
active_detail_window_icao,
|
|
datetime.now(timezone.utc),
|
|
)
|
|
)
|
|
if track_states_upd:
|
|
full_track_data_list_upd = [
|
|
s.to_dict() for s in track_states_upd
|
|
]
|
|
except Exception as e_track_upd:
|
|
module_logger.error(
|
|
f"LiveDataProcessor: Error retrieving updated track for detail view {active_detail_window_icao}: {e_track_upd}"
|
|
)
|
|
try:
|
|
active_detail_window_ref.update_details(
|
|
static_data_upd,
|
|
latest_live_data_for_detail_icao,
|
|
full_track_data_list_upd,
|
|
)
|
|
except Exception as e_upd_detail_win:
|
|
module_logger.error(
|
|
f"LiveDataProcessor: Error updating detail window for {active_detail_window_icao}: {e_upd_detail_win}",
|
|
exc_info=True,
|
|
)
|
|
|
|
except QueueEmpty:
|
|
pass # No messages to process, which is normal
|
|
except Exception as e_outer:
|
|
module_logger.error(
|
|
f"LiveDataProcessor: Outer error in _process_queue_cycle: {e_outer}",
|
|
exc_info=True,
|
|
)
|
|
finally:
|
|
if (
|
|
not is_final_flush
|
|
and is_live_monitoring_active
|
|
and main_window.root.winfo_exists()
|
|
):
|
|
self._gui_after_id = main_window.root.after(
|
|
GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle
|
|
)
|
|
else:
|
|
self._gui_after_id = None
|