SXXXXXXX_FlightMonitor/flightmonitor/controller/live_data_processor.py
2025-06-04 12:38:41 +02:00

308 lines
17 KiB
Python

# FlightMonitor/controller/live_data_processor.py
"""
Manages the processing of live flight data from the adapter's output queue.
It pulls data, saves it to storage, updates GUI elements, and handles adapter status messages.
"""
import tkinter as tk
from queue import Queue, Empty as QueueEmpty
from datetime import datetime, timezone
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from ..utils.logger import get_logger
from ..data.common_models import CanonicalFlightState
from ..data.opensky_live_adapter import (
AdapterMessage,
MSG_TYPE_FLIGHT_DATA,
MSG_TYPE_ADAPTER_STATUS,
STATUS_STARTING,
STATUS_FETCHING,
STATUS_RECOVERED,
STATUS_RATE_LIMITED,
STATUS_API_ERROR_TEMPORARY,
STATUS_PERMANENT_FAILURE,
STATUS_STOPPED,
)
from ..utils.gui_utils import (
GUI_STATUS_OK,
GUI_STATUS_WARNING,
GUI_STATUS_ERROR,
GUI_STATUS_FETCHING,
GUI_STATUS_UNKNOWN,
)
# Type checking imports to avoid circular dependencies at runtime
if TYPE_CHECKING:
from .app_controller import AppController # Import for type hinting AppController
from ..gui.main_window import MainWindow # For specific MainWindow methods/attributes
module_logger = get_logger(__name__)
# Constants for queue processing interval
GUI_QUEUE_CHECK_INTERVAL_MS = 150
class LiveDataProcessor:
"""
Processes live flight data from a queue, dispatching updates to various
parts of the application (storage, GUI) and managing adapter status.
"""
def __init__(self, app_controller: "AppController", flight_data_queue: Queue[AdapterMessage]):
"""
Initializes the LiveDataProcessor.
Args:
app_controller: The main AppController instance. This processor accesses
various components (MainWindow, DataStorage, etc.) via the controller.
flight_data_queue: The queue from which to read AdapterMessages (flight data or status).
"""
self.app_controller = app_controller
self.flight_data_queue = flight_data_queue
self._gui_after_id: Optional[str] = None # Stores the ID for the Tkinter `after` loop
module_logger.debug("LiveDataProcessor initialized.")
def start_processing_queue(self):
"""
Starts the periodic processing of the flight data queue.
Schedules the _process_queue_cycle method to run on the Tkinter main loop.
"""
main_window = self.app_controller.main_window
if not (main_window and main_window.root and main_window.root.winfo_exists()):
module_logger.error("LiveDataProcessor: Cannot start queue processing. MainWindow or root is missing or destroyed.")
return
if self._gui_after_id: # Cancel any existing scheduled processing
try:
main_window.root.after_cancel(self._gui_after_id)
module_logger.debug("LiveDataProcessor: Cancelled existing queue processing loop.")
except Exception:
pass # Ignore if ID is already invalid
module_logger.info("LiveDataProcessor: Starting live data queue processing loop.")
self._gui_after_id = main_window.root.after(GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle)
def stop_processing_queue(self):
"""
Stops the periodic processing of the flight data queue.
Cancels the scheduled _process_queue_cycle method.
"""
main_window = self.app_controller.main_window
if self._gui_after_id and main_window and main_window.root and main_window.root.winfo_exists():
try:
main_window.root.after_cancel(self._gui_after_id)
module_logger.info("LiveDataProcessor: Stopped live data queue processing loop.")
except Exception:
module_logger.debug("LiveDataProcessor: Failed to cancel queue processing loop (might be already stopped or invalid ID).")
finally:
self._gui_after_id = None
else:
module_logger.debug("LiveDataProcessor: Queue processing loop already stopped or main window not available.")
self._gui_after_id = None # Ensure it's cleared if root is gone
# Process any remaining items in the queue one last time
self._process_queue_cycle(is_final_flush=True)
def _process_queue_cycle(self, is_final_flush: bool = False):
"""
Processes a batch of messages from the flight data queue in a single Tkinter cycle.
This method is scheduled by Tkinter's `after` method.
"""
# Get necessary references from the app_controller
main_window: Optional["MainWindow"] = self.app_controller.main_window
data_storage = self.app_controller.data_storage
aircraft_db_manager = self.app_controller.aircraft_db_manager
is_live_monitoring_active = self.app_controller.is_live_monitoring_active # Check state for re-scheduling
active_bounding_box = self.app_controller._active_bounding_box # For map updates
active_detail_window_ref = self.app_controller.active_detail_window_ref
active_detail_window_icao = self.app_controller.active_detail_window_icao
if not (main_window and main_window.root and main_window.root.winfo_exists()):
module_logger.warning("LiveDataProcessor: Main window or root destroyed during queue processing. Halting loop.")
self._gui_after_id = None
return
flight_payloads_this_cycle: List[CanonicalFlightState] = []
messages_processed_this_cycle = 0
try:
while not self.flight_data_queue.empty():
message = None
try:
# Get message without blocking to keep GUI responsive
message = self.flight_data_queue.get(block=False)
messages_processed_this_cycle += 1
except QueueEmpty:
break # No more messages in queue
except Exception as e_q_get:
module_logger.warning(f"LiveDataProcessor: Error getting from flight_data_queue: {e_q_get}")
continue
if message is None:
continue
try:
message_type = message.get("type")
if message_type == MSG_TYPE_FLIGHT_DATA:
flight_states_payload_chunk: Optional[List[CanonicalFlightState]] = message.get("payload")
if flight_states_payload_chunk is not None:
flight_payloads_this_cycle.extend(flight_states_payload_chunk)
# --- Save to Data Storage ---
if data_storage:
saved_count = 0
for state in flight_states_payload_chunk:
if not isinstance(state, CanonicalFlightState):
module_logger.warning(f"LiveDataProcessor: Skipping non-CanonicalFlightState object in payload: {type(state)}")
continue
try:
flight_id = data_storage.add_or_update_flight_daily(
icao24=state.icao24,
callsign=state.callsign,
origin_country=state.origin_country,
detection_timestamp=state.timestamp,
)
if flight_id:
pos_id = data_storage.add_position_daily(
flight_id, state
)
if pos_id:
saved_count += 1
except Exception as e_db_add:
module_logger.error(f"LiveDataProcessor: Error saving flight/position to DB for ICAO {state.icao24}: {e_db_add}", exc_info=False)
if saved_count > 0:
module_logger.info(f"LiveDataProcessor: Saved {saved_count} position updates to DB from this chunk.")
# --- Update Map Display ---
if (
hasattr(main_window, "map_manager_instance")
and main_window.map_manager_instance is not None
and hasattr(main_window.map_manager_instance, "update_flights_on_map")
and self.app_controller.is_live_monitoring_active # Check AppController's live state
and active_bounding_box # Map is active only if bbox is set
):
main_window.map_manager_instance.update_flights_on_map(flight_states_payload_chunk)
# --- Update GUI Status Bar (Semaphore) ---
gui_message = f"Live data: {len(flight_states_payload_chunk)} aircraft in chunk." if flight_states_payload_chunk else "Live data: No aircraft in area (chunk)."
if hasattr(main_window, "update_semaphore_and_status"):
try:
main_window.update_semaphore_and_status(GUI_STATUS_OK, gui_message)
except tk.TclError:
module_logger.debug("LiveDataProcessor: TclError updating semaphore, MainWindow likely closing.")
# Don't reschedule if GUI is closing
self._gui_after_id = None
return
else:
if hasattr(main_window, "update_semaphore_and_status"):
try:
main_window.update_semaphore_and_status(GUI_STATUS_WARNING, "Received empty data payload from adapter.")
except tk.TclError:
module_logger.debug("LiveDataProcessor: TclError updating semaphore (empty payload), MainWindow likely closing.")
self._gui_after_id = None
return
elif message_type == MSG_TYPE_ADAPTER_STATUS:
status_code = message.get("status_code")
gui_message_from_adapter = message.get("message", f"Adapter status: {status_code}")
gui_status_level_to_set = GUI_STATUS_UNKNOWN
action_required = None
if status_code == STATUS_PERMANENT_FAILURE:
action_required = "STOP_MONITORING"
gui_status_level_to_set = GUI_STATUS_ERROR # Permanent failures are critical errors
elif status_code == STATUS_API_ERROR_TEMPORARY:
gui_status_level_to_set = GUI_STATUS_ERROR
elif status_code == STATUS_RATE_LIMITED:
gui_status_level_to_set = GUI_STATUS_WARNING
elif status_code == STATUS_FETCHING:
gui_status_level_to_set = GUI_STATUS_FETCHING
elif status_code in [STATUS_STARTING, STATUS_RECOVERED, STATUS_STOPPED]:
gui_status_level_to_set = GUI_STATUS_OK
if hasattr(main_window, "update_semaphore_and_status"):
try:
main_window.update_semaphore_and_status(gui_status_level_to_set, gui_message_from_adapter)
except tk.TclError:
module_logger.debug("LiveDataProcessor: TclError updating semaphore (adapter status), MainWindow likely closing.")
self._gui_after_id = None
return
if action_required == "STOP_MONITORING":
# Delegate stop action back to AppController
module_logger.critical(f"LiveDataProcessor: Adapter reported permanent failure. Requesting AppController to stop monitoring.")
self.app_controller.stop_live_monitoring(from_error=True)
break # Break out of inner queue processing loop to allow main loop to terminate if needed
except Exception as e_msg_proc:
module_logger.error(f"LiveDataProcessor: Error processing adapter message: {e_msg_proc}", exc_info=True)
finally:
try:
self.flight_data_queue.task_done() # Mark task as done regardless of processing success
except ValueError:
pass # Queue might be empty already if break was called
except Exception as e_task_done:
module_logger.error(f"LiveDataProcessor: Error calling task_done on flight_data_queue: {e_task_done}")
# --- Live Update for Full Flight Details Window (if open) ---
if (
active_detail_window_ref
and active_detail_window_icao
and active_detail_window_ref.winfo_exists()
):
flight_of_interest_updated_this_cycle = False
latest_live_data_for_detail_icao: Optional[Dict[str, Any]] = None
# Find the most recent live data for the actively viewed flight
for state_obj in flight_payloads_this_cycle:
if state_obj.icao24 == active_detail_window_icao:
flight_of_interest_updated_this_cycle = True
latest_live_data_for_detail_icao = state_obj.to_dict()
break # Found the latest update for this flight in the current chunk
if flight_of_interest_updated_this_cycle:
module_logger.debug(f"LiveDataProcessor: Flight {active_detail_window_icao} in detail view was in the latest data batch. Refreshing detail window.")
# Retrieve static data (if available) and full track data (if storage is active)
static_data_upd: Optional[Dict[str, Any]] = None
if aircraft_db_manager:
static_data_upd = aircraft_db_manager.get_aircraft_details(active_detail_window_icao)
full_track_data_list_upd: List[Dict[str, Any]] = []
if data_storage:
try:
current_utc_date = datetime.now(timezone.utc)
track_states_upd = data_storage.get_flight_track_for_icao_on_date(active_detail_window_icao, current_utc_date)
if track_states_upd:
full_track_data_list_upd = [s.to_dict() for s in track_states_upd]
except Exception as e_track_upd:
module_logger.error(f"LiveDataProcessor: Error retrieving updated track for detail view {active_detail_window_icao}: {e_track_upd}")
try:
active_detail_window_ref.update_details(
static_data_upd,
latest_live_data_for_detail_icao,
full_track_data_list_upd,
)
except tk.TclError:
module_logger.warning(f"LiveDataProcessor: TclError trying to update detail window for {active_detail_window_icao}, likely closed.")
self.app_controller.details_window_closed(active_detail_window_icao) # Notify controller to clear its references
except Exception as e_upd_detail_win:
module_logger.error(f"LiveDataProcessor: Error updating detail window for {active_detail_window_icao}: {e_upd_detail_win}", exc_info=True)
except Exception as e_outer:
module_logger.error(f"LiveDataProcessor: Outer error in _process_queue_cycle: {e_outer}", exc_info=True)
finally:
# Reschedule the next processing cycle ONLY if not a final flush and monitoring is still active
if not is_final_flush and is_live_monitoring_active and main_window.root.winfo_exists():
try:
self._gui_after_id = main_window.root.after(GUI_QUEUE_CHECK_INTERVAL_MS, self._process_queue_cycle)
except tk.TclError:
module_logger.debug("LiveDataProcessor: TclError rescheduling _process_queue_cycle, MainWindow likely closing.")
self._gui_after_id = None
except Exception as e_after_schedule:
module_logger.error(f"LiveDataProcessor: Error rescheduling _process_queue_cycle: {e_after_schedule}")
self._gui_after_id = None
else:
self._gui_after_id = None # Ensure ID is cleared if not rescheduling