308 lines
17 KiB
Python
308 lines
17 KiB
Python
# FlightMonitor/controller/live_data_processor.py
|
|
"""
|
|
Manages the processing of live flight data from the adapter's output queue.
|
|
It pulls data, saves it to storage, updates GUI elements, and handles adapter status messages.
|
|
"""
|
|
import tkinter as tk
|
|
from queue import Queue, Empty as QueueEmpty
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional, Dict, Any, TYPE_CHECKING
|
|
|
|
from ..utils.logger import get_logger
|
|
from ..data.common_models import CanonicalFlightState
|
|
from ..data.opensky_live_adapter import (
|
|
AdapterMessage,
|
|
MSG_TYPE_FLIGHT_DATA,
|
|
MSG_TYPE_ADAPTER_STATUS,
|
|
STATUS_STARTING,
|
|
STATUS_FETCHING,
|
|
STATUS_RECOVERED,
|
|
STATUS_RATE_LIMITED,
|
|
STATUS_API_ERROR_TEMPORARY,
|
|
STATUS_PERMANENT_FAILURE,
|
|
STATUS_STOPPED,
|
|
)
|
|
from ..utils.gui_utils import (
|
|
GUI_STATUS_OK,
|
|
GUI_STATUS_WARNING,
|
|
GUI_STATUS_ERROR,
|
|
GUI_STATUS_FETCHING,
|
|
GUI_STATUS_UNKNOWN,
|
|
)
|
|
|
|
# Type checking imports to avoid circular dependencies at runtime
|
|
if TYPE_CHECKING:
|
|
from .app_controller import AppController # Import for type hinting AppController
|
|
from ..gui.main_window import MainWindow # For specific MainWindow methods/attributes
|
|
|
|
|
|
module_logger = get_logger(__name__)
|
|
|
|
# Constants for queue processing interval
|
|
GUI_QUEUE_CHECK_INTERVAL_MS = 150
|
|
|
|
|
|
class LiveDataProcessor:
|
|
"""
|
|
Processes live flight data from a queue, dispatching updates to various
|
|
parts of the application (storage, GUI) and managing adapter status.
|
|
"""
|
|
|
|
def __init__(self, app_controller: "AppController", flight_data_queue: Queue[AdapterMessage]):
|
|
"""
|
|
Initializes the LiveDataProcessor.
|
|
|
|
Args:
|
|
app_controller: The main AppController instance. This processor accesses
|
|
various components (MainWindow, DataStorage, etc.) via the controller.
|
|
flight_data_queue: The queue from which to read AdapterMessages (flight data or status).
|
|
"""
|
|
self.app_controller = app_controller
|
|
self.flight_data_queue = flight_data_queue
|
|
self._gui_after_id: Optional[str] = None # Stores the ID for the Tkinter `after` loop
|
|
module_logger.debug("LiveDataProcessor initialized.")
|
|
|
|
def start_processing_queue(self):
|
|
"""
|
|
Starts the periodic processing of the flight data queue.
|
|
Schedules the _process_queue_cycle method to run on the Tkinter main loop.
|
|
"""
|
|
main_window = self.app_controller.main_window
|
|
if not (main_window and main_window.root and main_window.root.winfo_exists()):
|
|
module_logger.error("LiveDataProcessor: Cannot start queue processing. MainWindow or root is missing or destroyed.")
|
|
return
|
|
|
|
if self._gui_after_id: # Cancel any existing scheduled processing
|
|
try:
|
|
main_window.root.after_cancel(self._gui_after_id)
|
|
module_logger.debug("LiveDataProcessor: Cancelled existing queue processing loop.")
|
|
except Exception:
|
|
pass # Ignore if ID is already invalid
|
|
|
|
module_logger.info("LiveDataProcessor: Starting live data queue processing loop.")
|
|
self._gui_after_id = main_window.root.after(GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle)
|
|
|
|
def stop_processing_queue(self):
|
|
"""
|
|
Stops the periodic processing of the flight data queue.
|
|
Cancels the scheduled _process_queue_cycle method.
|
|
"""
|
|
main_window = self.app_controller.main_window
|
|
if self._gui_after_id and main_window and main_window.root and main_window.root.winfo_exists():
|
|
try:
|
|
main_window.root.after_cancel(self._gui_after_id)
|
|
module_logger.info("LiveDataProcessor: Stopped live data queue processing loop.")
|
|
except Exception:
|
|
module_logger.debug("LiveDataProcessor: Failed to cancel queue processing loop (might be already stopped or invalid ID).")
|
|
finally:
|
|
self._gui_after_id = None
|
|
else:
|
|
module_logger.debug("LiveDataProcessor: Queue processing loop already stopped or main window not available.")
|
|
self._gui_after_id = None # Ensure it's cleared if root is gone
|
|
|
|
# Process any remaining items in the queue one last time
|
|
self._process_queue_cycle(is_final_flush=True)
|
|
|
|
|
|
def _process_queue_cycle(self, is_final_flush: bool = False):
|
|
"""
|
|
Processes a batch of messages from the flight data queue in a single Tkinter cycle.
|
|
This method is scheduled by Tkinter's `after` method.
|
|
"""
|
|
# Get necessary references from the app_controller
|
|
main_window: Optional["MainWindow"] = self.app_controller.main_window
|
|
data_storage = self.app_controller.data_storage
|
|
aircraft_db_manager = self.app_controller.aircraft_db_manager
|
|
is_live_monitoring_active = self.app_controller.is_live_monitoring_active # Check state for re-scheduling
|
|
active_bounding_box = self.app_controller._active_bounding_box # For map updates
|
|
active_detail_window_ref = self.app_controller.active_detail_window_ref
|
|
active_detail_window_icao = self.app_controller.active_detail_window_icao
|
|
|
|
if not (main_window and main_window.root and main_window.root.winfo_exists()):
|
|
module_logger.warning("LiveDataProcessor: Main window or root destroyed during queue processing. Halting loop.")
|
|
self._gui_after_id = None
|
|
return
|
|
|
|
flight_payloads_this_cycle: List[CanonicalFlightState] = []
|
|
messages_processed_this_cycle = 0
|
|
|
|
try:
|
|
while not self.flight_data_queue.empty():
|
|
message = None
|
|
try:
|
|
# Get message without blocking to keep GUI responsive
|
|
message = self.flight_data_queue.get(block=False)
|
|
messages_processed_this_cycle += 1
|
|
except QueueEmpty:
|
|
break # No more messages in queue
|
|
except Exception as e_q_get:
|
|
module_logger.warning(f"LiveDataProcessor: Error getting from flight_data_queue: {e_q_get}")
|
|
continue
|
|
|
|
if message is None:
|
|
continue
|
|
|
|
try:
|
|
message_type = message.get("type")
|
|
if message_type == MSG_TYPE_FLIGHT_DATA:
|
|
flight_states_payload_chunk: Optional[List[CanonicalFlightState]] = message.get("payload")
|
|
if flight_states_payload_chunk is not None:
|
|
flight_payloads_this_cycle.extend(flight_states_payload_chunk)
|
|
|
|
# --- Save to Data Storage ---
|
|
if data_storage:
|
|
saved_count = 0
|
|
for state in flight_states_payload_chunk:
|
|
if not isinstance(state, CanonicalFlightState):
|
|
module_logger.warning(f"LiveDataProcessor: Skipping non-CanonicalFlightState object in payload: {type(state)}")
|
|
continue
|
|
try:
|
|
flight_id = data_storage.add_or_update_flight_daily(
|
|
icao24=state.icao24,
|
|
callsign=state.callsign,
|
|
origin_country=state.origin_country,
|
|
detection_timestamp=state.timestamp,
|
|
)
|
|
if flight_id:
|
|
pos_id = data_storage.add_position_daily(
|
|
flight_id, state
|
|
)
|
|
if pos_id:
|
|
saved_count += 1
|
|
except Exception as e_db_add:
|
|
module_logger.error(f"LiveDataProcessor: Error saving flight/position to DB for ICAO {state.icao24}: {e_db_add}", exc_info=False)
|
|
if saved_count > 0:
|
|
module_logger.info(f"LiveDataProcessor: Saved {saved_count} position updates to DB from this chunk.")
|
|
|
|
# --- Update Map Display ---
|
|
if (
|
|
hasattr(main_window, "map_manager_instance")
|
|
and main_window.map_manager_instance is not None
|
|
and hasattr(main_window.map_manager_instance, "update_flights_on_map")
|
|
and self.app_controller.is_live_monitoring_active # Check AppController's live state
|
|
and active_bounding_box # Map is active only if bbox is set
|
|
):
|
|
main_window.map_manager_instance.update_flights_on_map(flight_states_payload_chunk)
|
|
|
|
# --- Update GUI Status Bar (Semaphore) ---
|
|
gui_message = f"Live data: {len(flight_states_payload_chunk)} aircraft in chunk." if flight_states_payload_chunk else "Live data: No aircraft in area (chunk)."
|
|
if hasattr(main_window, "update_semaphore_and_status"):
|
|
try:
|
|
main_window.update_semaphore_and_status(GUI_STATUS_OK, gui_message)
|
|
except tk.TclError:
|
|
module_logger.debug("LiveDataProcessor: TclError updating semaphore, MainWindow likely closing.")
|
|
# Don't reschedule if GUI is closing
|
|
self._gui_after_id = None
|
|
return
|
|
else:
|
|
if hasattr(main_window, "update_semaphore_and_status"):
|
|
try:
|
|
main_window.update_semaphore_and_status(GUI_STATUS_WARNING, "Received empty data payload from adapter.")
|
|
except tk.TclError:
|
|
module_logger.debug("LiveDataProcessor: TclError updating semaphore (empty payload), MainWindow likely closing.")
|
|
self._gui_after_id = None
|
|
return
|
|
|
|
elif message_type == MSG_TYPE_ADAPTER_STATUS:
|
|
status_code = message.get("status_code")
|
|
gui_message_from_adapter = message.get("message", f"Adapter status: {status_code}")
|
|
gui_status_level_to_set = GUI_STATUS_UNKNOWN
|
|
action_required = None
|
|
|
|
if status_code == STATUS_PERMANENT_FAILURE:
|
|
action_required = "STOP_MONITORING"
|
|
gui_status_level_to_set = GUI_STATUS_ERROR # Permanent failures are critical errors
|
|
elif status_code == STATUS_API_ERROR_TEMPORARY:
|
|
gui_status_level_to_set = GUI_STATUS_ERROR
|
|
elif status_code == STATUS_RATE_LIMITED:
|
|
gui_status_level_to_set = GUI_STATUS_WARNING
|
|
elif status_code == STATUS_FETCHING:
|
|
gui_status_level_to_set = GUI_STATUS_FETCHING
|
|
elif status_code in [STATUS_STARTING, STATUS_RECOVERED, STATUS_STOPPED]:
|
|
gui_status_level_to_set = GUI_STATUS_OK
|
|
|
|
if hasattr(main_window, "update_semaphore_and_status"):
|
|
try:
|
|
main_window.update_semaphore_and_status(gui_status_level_to_set, gui_message_from_adapter)
|
|
except tk.TclError:
|
|
module_logger.debug("LiveDataProcessor: TclError updating semaphore (adapter status), MainWindow likely closing.")
|
|
self._gui_after_id = None
|
|
return
|
|
|
|
if action_required == "STOP_MONITORING":
|
|
# Delegate stop action back to AppController
|
|
module_logger.critical(f"LiveDataProcessor: Adapter reported permanent failure. Requesting AppController to stop monitoring.")
|
|
self.app_controller.stop_live_monitoring(from_error=True)
|
|
break # Break out of inner queue processing loop to allow main loop to terminate if needed
|
|
|
|
except Exception as e_msg_proc:
|
|
module_logger.error(f"LiveDataProcessor: Error processing adapter message: {e_msg_proc}", exc_info=True)
|
|
finally:
|
|
try:
|
|
self.flight_data_queue.task_done() # Mark task as done regardless of processing success
|
|
except ValueError:
|
|
pass # Queue might be empty already if break was called
|
|
except Exception as e_task_done:
|
|
module_logger.error(f"LiveDataProcessor: Error calling task_done on flight_data_queue: {e_task_done}")
|
|
|
|
# --- Live Update for Full Flight Details Window (if open) ---
|
|
if (
|
|
active_detail_window_ref
|
|
and active_detail_window_icao
|
|
and active_detail_window_ref.winfo_exists()
|
|
):
|
|
flight_of_interest_updated_this_cycle = False
|
|
latest_live_data_for_detail_icao: Optional[Dict[str, Any]] = None
|
|
|
|
# Find the most recent live data for the actively viewed flight
|
|
for state_obj in flight_payloads_this_cycle:
|
|
if state_obj.icao24 == active_detail_window_icao:
|
|
flight_of_interest_updated_this_cycle = True
|
|
latest_live_data_for_detail_icao = state_obj.to_dict()
|
|
break # Found the latest update for this flight in the current chunk
|
|
|
|
if flight_of_interest_updated_this_cycle:
|
|
module_logger.debug(f"LiveDataProcessor: Flight {active_detail_window_icao} in detail view was in the latest data batch. Refreshing detail window.")
|
|
|
|
# Retrieve static data (if available) and full track data (if storage is active)
|
|
static_data_upd: Optional[Dict[str, Any]] = None
|
|
if aircraft_db_manager:
|
|
static_data_upd = aircraft_db_manager.get_aircraft_details(active_detail_window_icao)
|
|
|
|
full_track_data_list_upd: List[Dict[str, Any]] = []
|
|
if data_storage:
|
|
try:
|
|
current_utc_date = datetime.now(timezone.utc)
|
|
track_states_upd = data_storage.get_flight_track_for_icao_on_date(active_detail_window_icao, current_utc_date)
|
|
if track_states_upd:
|
|
full_track_data_list_upd = [s.to_dict() for s in track_states_upd]
|
|
except Exception as e_track_upd:
|
|
module_logger.error(f"LiveDataProcessor: Error retrieving updated track for detail view {active_detail_window_icao}: {e_track_upd}")
|
|
|
|
try:
|
|
active_detail_window_ref.update_details(
|
|
static_data_upd,
|
|
latest_live_data_for_detail_icao,
|
|
full_track_data_list_upd,
|
|
)
|
|
except tk.TclError:
|
|
module_logger.warning(f"LiveDataProcessor: TclError trying to update detail window for {active_detail_window_icao}, likely closed.")
|
|
self.app_controller.details_window_closed(active_detail_window_icao) # Notify controller to clear its references
|
|
except Exception as e_upd_detail_win:
|
|
module_logger.error(f"LiveDataProcessor: Error updating detail window for {active_detail_window_icao}: {e_upd_detail_win}", exc_info=True)
|
|
|
|
except Exception as e_outer:
|
|
module_logger.error(f"LiveDataProcessor: Outer error in _process_queue_cycle: {e_outer}", exc_info=True)
|
|
finally:
|
|
# Reschedule the next processing cycle ONLY if not a final flush and monitoring is still active
|
|
if not is_final_flush and is_live_monitoring_active and main_window.root.winfo_exists():
|
|
try:
|
|
self._gui_after_id = main_window.root.after(GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle)
|
|
except tk.TclError:
|
|
module_logger.debug("LiveDataProcessor: TclError rescheduling _process_queue_cycle, MainWindow likely closing.")
|
|
self._gui_after_id = None
|
|
except Exception as e_after_schedule:
|
|
module_logger.error(f"LiveDataProcessor: Error rescheduling _process_queue_cycle: {e_after_schedule}")
|
|
self._gui_after_id = None
|
|
else:
|
|
self._gui_after_id = None # Ensure ID is cleared if not rescheduling |