# FlightMonitor/data/base_live_data_adapter.py """ Defines the abstract base class for all live data adapters. All live data adapters (e.g., OpenSky, ADS-B Exchange) should inherit from this class to provide a consistent interface for the AppController. """ import abc # Import for Abstract Base Classes import threading # All adapters will run in a separate thread from queue import Queue # For the output queue, common to all adapters from typing import Dict, Any, Optional # Type hints for clarity try: from ..utils.logger import get_logger logger = get_logger(__name__) except ImportError: import logging logger = logging.getLogger(__name__) if not logger.hasHandlers(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s [%(levelname)s] %(funcName)s (%(threadName)s): %(message)s", ) logger.warning("AircraftDatabaseManager using fallback standard Python logger.") # Define the common message format for adapter output # This ensures consistency for the LiveDataProcessor AdapterMessage = Dict[str, Any] class BaseLiveDataAdapter(abc.ABC, threading.Thread): """ Abstract Base Class for live flight data adapters. All concrete adapter implementations (e.g., OpenSkyLiveAdapter, ADSBExchangeLiveAdapter) must inherit from this class and implement its abstract methods. It combines the properties of an Abstract Base Class with those of a Thread, as all our adapters are expected to run in separate threads. """ def __init__(self, output_queue: Queue[AdapterMessage], bounding_box: Dict[str, float], polling_interval: float, daemon: bool = True, name: Optional[str] = None): """ Initializes the base live data adapter. Args: output_queue (Queue[AdapterMessage]): The queue to which processed data (or status) will be sent. bounding_box (Dict[str, float]): The geographical bounding box for data fetching. polling_interval (float): The interval in seconds between data fetches. daemon (bool): Whether the thread should be a daemon thread. name (Optional[str]): The name of the thread. """ # Call the constructor of the threading.Thread base class super().__init__(daemon=daemon, name=name) # Common attributes for all adapters self.output_queue = output_queue self.bounding_box = bounding_box self.polling_interval = polling_interval # Event to signal the thread to stop its execution loop self._stop_event = threading.Event() @abc.abstractmethod def run(self): """ The main execution method for the adapter thread. Concrete subclasses must implement this to contain the data fetching and processing loop. """ # This method is abstract and must be implemented by subclasses. # It typically contains a while loop checking self._stop_event.is_set(). pass def stop(self): """ Signals the adapter thread to stop its execution. This method is thread-safe and sets the internal stop event. """ self._stop_event.set() logger.debug(f"{self.name}: Stop signal set.") # Use module_logger if available, otherwise print def is_stopped(self) -> bool: """ Checks if the adapter thread has been signaled to stop. Returns: bool: True if the stop event has been set, False otherwise. """ return self._stop_event.is_set() # It's generally good practice to define a common _send_status_to_queue method # here if its structure is identical across all adapters, but for now, it remains # in the concrete adapters as its details might vary slightly (e.g., provider name).