SXXXXXXX_FlightMonitor/flightmonitor/data/opensky_live_adapter.py

524 lines
22 KiB
Python

# FlightMonitor/data/opensky_live_adapter.py
import requests
import json
import time
import threading
import queue
from typing import List, Optional, Dict, Any, Union
import random
from . import config as app_config
from ..utils.logger import get_logger
from .common_models import CanonicalFlightState
module_logger = get_logger(__name__)
PROVIDER_NAME = "OpenSkyNetwork"
INITIAL_BACKOFF_DELAY_SECONDS = 20.0
MAX_BACKOFF_DELAY_SECONDS = 300.0
BACKOFF_FACTOR = 1.8
MAX_CONSECUTIVE_ERRORS_THRESHOLD = 5
AdapterMessage = Dict[str, Any]
FlightDataPayload = List[CanonicalFlightState]
MSG_TYPE_FLIGHT_DATA = "flight_data"
MSG_TYPE_ADAPTER_STATUS = "adapter_status"
STATUS_STARTING = "STARTING"
STATUS_FETCHING = "FETCHING"
STATUS_RECOVERED = "RECOVERED"
STATUS_RATE_LIMITED = "RATE_LIMITED"
STATUS_API_ERROR_TEMPORARY = "API_ERROR_TEMPORARY"
STATUS_PERMANENT_FAILURE = "PERMANENT_FAILURE"
STATUS_STOPPING = "STOPPING"
STATUS_STOPPED = "STOPPED"
class OpenSkyLiveAdapter(threading.Thread):
def __init__(
self,
output_queue: queue.Queue[AdapterMessage],
bounding_box: Dict[str, float],
polling_interval: int = app_config.LIVE_POLLING_INTERVAL_SECONDS,
api_timeout: int = app_config.DEFAULT_API_TIMEOUT_SECONDS,
daemon: bool = True,
):
thread_name = f"OpenSkyLiveAdapter-bbox-{bounding_box.get('lat_min',0):.1f}"
super().__init__(daemon=daemon, name=thread_name)
if output_queue is None:
module_logger.critical(
f"{self.name}: Output queue cannot be None during initialization."
)
raise ValueError("Output queue must be provided to OpenSkyLiveAdapter.")
self.output_queue = output_queue
self.bounding_box = bounding_box
self.base_polling_interval = float(
polling_interval
)
self.api_timeout = float(api_timeout)
self._stop_event = threading.Event()
self._consecutive_api_errors: int = 0
self._current_backoff_delay: float = 0.0
self._in_backoff_mode: bool = False
module_logger.debug(
f"{self.name} initialized. BBox: {self.bounding_box}, "
f"Base Interval: {self.base_polling_interval}s, API Timeout: {self.api_timeout}s"
)
def _generate_mock_flight_state(self, icao_suffix: int) -> CanonicalFlightState:
now = time.time()
lat_center = (self.bounding_box["lat_min"] + self.bounding_box["lat_max"]) / 2
lon_center = (self.bounding_box["lon_min"] + self.bounding_box["lon_max"]) / 2
lat_span = self.bounding_box["lat_max"] - self.bounding_box["lat_min"]
lon_span = self.bounding_box["lon_max"] - self.bounding_box["lon_min"]
return CanonicalFlightState(
icao24=f"mock{icao_suffix:02x}",
callsign=f"MOCK{icao_suffix:02X}",
origin_country="Mockland",
timestamp=now,
last_contact_timestamp=now,
latitude=round(
lat_center + random.uniform(-lat_span / 2.1, lat_span / 2.1), 4
),
longitude=round(
lon_center + random.uniform(-lon_span / 2.1, lon_span / 2.1), 4
),
baro_altitude_m=random.uniform(1000, 12000),
on_ground=random.choice([True, False]),
velocity_mps=random.uniform(50, 250),
true_track_deg=random.uniform(0, 360),
vertical_rate_mps=random.uniform(-10, 10),
squawk=str(random.randint(1000, 7777)),
spi=random.choice([True, False]),
position_source="MOCK_GENERATOR",
raw_data_provider=f"{PROVIDER_NAME}-Mock",
)
def _send_status_to_queue(
self, status_code: str, message: str, details: Optional[Dict] = None
):
status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": status_code,
"message": f"{self.name}: {message}",
}
if details:
status_payload["details"] = details
try:
self.output_queue.put_nowait(status_payload)
module_logger.debug(
f"{self.name}: Sent status '{status_code}' to queue. Msg: {message}"
)
except queue.Full:
module_logger.warning(
f"{self.name}: Output queue full. Could not send status: {status_code}"
)
except Exception as e:
module_logger.error(
f"{self.name}: Error sending status '{status_code}' to queue: {e}",
exc_info=True,
)
def stop(self):
self._stop_event.set()
module_logger.info(
f"Stop signal received for {self.name}. Signaling stop event."
)
def _parse_state_vector(self, raw_sv: list) -> Optional[CanonicalFlightState]:
try:
if not raw_sv or len(raw_sv) < 17:
module_logger.warning(
f"Skipping incomplete state vector (length {len(raw_sv)}): {raw_sv}"
)
return None
icao24 = raw_sv[0]
if not isinstance(icao24, str) or not icao24.strip():
module_logger.warning(
f"Skipping state vector due to invalid/missing ICAO24: '{icao24}' in {raw_sv}"
)
return None
api_time_position = float(raw_sv[3]) if raw_sv[3] is not None else None
api_last_contact = float(raw_sv[4]) if raw_sv[4] is not None else None
primary_timestamp: float
if api_last_contact is not None and api_time_position is not None:
primary_timestamp = max(api_last_contact, api_time_position)
elif api_last_contact is not None:
primary_timestamp = api_last_contact
elif api_time_position is not None:
primary_timestamp = api_time_position
else:
primary_timestamp = time.time()
module_logger.warning(
f"ICAO {icao24}: Using current time as primary_timestamp due to missing API timestamps (last_contact={api_last_contact}, time_position={api_time_position})."
)
last_contact_for_model = (
api_last_contact if api_last_contact is not None else primary_timestamp
)
state = CanonicalFlightState(
icao24=icao24,
callsign=raw_sv[1],
origin_country=raw_sv[2],
timestamp=primary_timestamp,
last_contact_timestamp=last_contact_for_model,
latitude=float(raw_sv[5]) if raw_sv[5] is not None else None,
longitude=float(raw_sv[6]) if raw_sv[6] is not None else None,
baro_altitude_m=float(raw_sv[7]) if raw_sv[7] is not None else None,
on_ground=bool(raw_sv[8]),
velocity_mps=float(raw_sv[9]) if raw_sv[9] is not None else None,
true_track_deg=float(raw_sv[10]) if raw_sv[10] is not None else None,
vertical_rate_mps=float(raw_sv[11]) if raw_sv[11] is not None else None,
geo_altitude_m=float(raw_sv[13]) if raw_sv[13] is not None else None,
squawk=raw_sv[14],
spi=bool(raw_sv[15]),
position_source=int(raw_sv[16]) if raw_sv[16] is not None else None,
raw_data_provider=PROVIDER_NAME,
)
return state
except (TypeError, ValueError, IndexError) as e:
module_logger.error(
f"Error parsing state vector for ICAO '{raw_sv[0] if raw_sv else 'UNKNOWN'}': {e}. Vector: {raw_sv}",
exc_info=False,
)
return None
def _perform_api_request(self) -> Dict[str, Any]:
if app_config.USE_MOCK_OPENSKY_API:
module_logger.info(
f"{self.name}: Using MOCK API data as per configuration."
)
self._send_status_to_queue(
STATUS_FETCHING, "Generating mock flight data..."
)
if app_config.MOCK_API_ERROR_SIMULATION == "RATE_LIMITED":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
mock_retry_after = str(getattr(app_config, 'MOCK_RETRY_AFTER_SECONDS', 60))
delay = self._calculate_next_backoff_delay(
provided_retry_after=mock_retry_after
)
err_msg = f"MOCK: Rate limited. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.warning(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_RATE_LIMITED,
"delay": delay,
"message": err_msg,
"consecutive_errors": self._consecutive_api_errors,
"status_code": 429,
}
if app_config.MOCK_API_ERROR_SIMULATION == "HTTP_ERROR":
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
mock_status_code = getattr(app_config, 'MOCK_HTTP_ERROR_STATUS', 500)
err_msg = f"MOCK: HTTP error {mock_status_code}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": mock_status_code,
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
mock_states: List[CanonicalFlightState] = []
mock_count = getattr(app_config, 'MOCK_API_FLIGHT_COUNT', 5)
for i in range(mock_count):
mock_states.append(self._generate_mock_flight_state(i + 1))
module_logger.info(
f"{self.name}: Generated {len(mock_states)} mock flight states."
)
self._reset_error_state()
return {"data": mock_states}
if not self.bounding_box:
err_msg = "Bounding box not set for REAL API request."
module_logger.error(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_PERMANENT_FAILURE,
"message": err_msg,
"status_code": "NO_BBOX",
"delay": 0.0,
"consecutive_errors": self._consecutive_api_errors,
}
params = {
"lamin": self.bounding_box["lat_min"],
"lomin": self.bounding_box["lon_min"],
"lamax": self.bounding_box["lat_max"],
"lomax": self.bounding_box["lon_max"],
}
self._send_status_to_queue(
STATUS_FETCHING, f"Requesting REAL data for bbox: {self.bounding_box}"
)
response: Optional[requests.Response] = None
try:
response = requests.get(
app_config.OPENSKY_API_URL, params=params, timeout=self.api_timeout
)
module_logger.debug(
f"{self.name}: API Response Status: {response.status_code} {response.reason}"
)
if response.status_code == 429:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay(
response.headers.get("Retry-After")
)
err_msg = f"Rate limited (429). Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.warning(f"{self.name}: {err_msg}")
return {
"error_type": STATUS_RATE_LIMITED,
"delay": delay,
"message": err_msg,
"consecutive_errors": self._consecutive_api_errors,
"status_code": 429,
}
response.raise_for_status()
self._reset_error_state()
response_data = response.json()
raw_states_list = response_data.get("states")
canonical_states: List[CanonicalFlightState] = []
if raw_states_list is not None:
for raw_sv in raw_states_list:
parsed_state = self._parse_state_vector(raw_sv)
if parsed_state:
canonical_states.append(parsed_state)
module_logger.info(
f"{self.name}: Fetched and parsed {len(canonical_states)} flight states."
)
else:
module_logger.info(
f"{self.name}: API returned no flight states ('states' is null or empty)."
)
return {"data": canonical_states}
except requests.exceptions.HTTPError as http_err:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
status_code = http_err.response.status_code if http_err.response else "N/A"
err_msg = f"HTTP error {status_code}: {http_err}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": status_code,
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except requests.exceptions.Timeout:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"API request timed out. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=False)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "TIMEOUT",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except (requests.exceptions.RequestException, json.JSONDecodeError) as e:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"Request/JSON error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.error(f"{self.name}: {err_msg}", exc_info=True)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "REQUEST_JSON_ERROR",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
except Exception as e:
self._consecutive_api_errors += 1
self._in_backoff_mode = True
delay = self._calculate_next_backoff_delay()
err_msg = f"Unexpected critical error: {e}. Errors: {self._consecutive_api_errors}. Retrying in {delay:.1f}s."
module_logger.critical(f"{self.name}: {err_msg}", exc_info=True)
return {
"error_type": STATUS_API_ERROR_TEMPORARY,
"status_code": "UNEXPECTED_ADAPTER_ERROR",
"message": err_msg,
"delay": delay,
"consecutive_errors": self._consecutive_api_errors,
}
def _calculate_next_backoff_delay(
self, provided_retry_after: Optional[str] = None
) -> float:
calculated_delay = INITIAL_BACKOFF_DELAY_SECONDS * (
BACKOFF_FACTOR ** (self._consecutive_api_errors - 1)
)
api_delay_from_header = 0.0
if provided_retry_after is not None and provided_retry_after.isdigit():
try:
api_delay_from_header = float(provided_retry_after)
module_logger.debug(f"{self.name}: Found Retry-After header: {provided_retry_after}s")
except ValueError:
module_logger.warning(f"{self.name}: Could not parse Retry-After header value: '{provided_retry_after}'")
api_delay_from_header = 0.0
except Exception as e:
module_logger.warning(f"{self.name}: Unexpected error parsing Retry-After header: {e}", exc_info=False)
api_delay_from_header = 0.0
self._current_backoff_delay = max(
api_delay_from_header, calculated_delay
)
self._current_backoff_delay = min(
self._current_backoff_delay, MAX_BACKOFF_DELAY_SECONDS
)
self._current_backoff_delay = max(
self._current_backoff_delay, 1.0
)
module_logger.debug(f"{self.name}: Calculated next backoff delay: {self._current_backoff_delay:.1f}s")
return self._current_backoff_delay
def _reset_error_state(self):
if self._in_backoff_mode or self._consecutive_api_errors > 0:
module_logger.info(
f"{self.name}: API connection successful after {self._consecutive_api_errors} error(s). Resetting backoff."
)
self._send_status_to_queue(
STATUS_RECOVERED, "API connection recovered."
)
self._consecutive_api_errors = 0
self._current_backoff_delay = 0.0
self._in_backoff_mode = False
def run(self):
initial_settle_delay_seconds = 0.1
module_logger.debug(f"{self.name} thread starting initial settle delay ({initial_settle_delay_seconds}s)...")
try:
if self._stop_event.wait(timeout=initial_settle_delay_seconds):
module_logger.info(f"{self.name} thread received stop signal during initial settle delay. Terminating early.")
return
except Exception as e:
module_logger.error(f"{self.name} unexpected error during initial settle delay: {e}", exc_info=True)
return
module_logger.info(f"{self.name} thread is fully operational. Base polling interval: {self.base_polling_interval:.1f}s.")
self._send_status_to_queue(
STATUS_STARTING, "Adapter thread started, preparing initial fetch."
)
while not self._stop_event.is_set():
if self._consecutive_api_errors >= MAX_CONSECUTIVE_ERRORS_THRESHOLD:
perm_fail_msg = f"Reached max ({self._consecutive_api_errors}) consecutive API errors. Stopping live updates."
module_logger.critical(f"{self.name}: {perm_fail_msg}")
self._send_status_to_queue(
STATUS_PERMANENT_FAILURE, perm_fail_msg
)
break
api_result = self._perform_api_request()
if self._stop_event.is_set():
module_logger.info(f"{self.name}: Stop event detected after API request. Exiting loop.")
break
if "data" in api_result:
flight_data_payload: List[CanonicalFlightState] = api_result["data"]
try:
self.output_queue.put_nowait(
{
"type": MSG_TYPE_FLIGHT_DATA,
"payload": flight_data_payload,
}
)
module_logger.debug(f"{self.name}: Sent {len(flight_data_payload)} flight states to queue.")
except queue.Full:
module_logger.warning(
f"{self.name}: Output queue full. Discarding {len(flight_data_payload)} flight states."
)
except Exception as e:
module_logger.error(
f"{self.name}: Error putting flight data into queue: {e}",
exc_info=True,
)
elif "error_type" in api_result:
error_details_for_controller = api_result.copy()
error_details_for_controller["type"] = MSG_TYPE_ADAPTER_STATUS
error_details_for_controller["status_code"] = api_result["error_type"]
else:
module_logger.error(
f"{self.name}: Unknown result structure from _perform_api_request: {api_result}"
)
time_to_wait_seconds: float
if self._in_backoff_mode:
time_to_wait_seconds = self._current_backoff_delay
module_logger.debug(f"{self.name}: In backoff, next attempt in {time_to_wait_seconds:.1f}s.")
else:
time_to_wait_seconds = self.base_polling_interval
module_logger.debug(f"{self.name}: Next fetch cycle in {time_to_wait_seconds:.1f}s.")
if time_to_wait_seconds > 0:
if self._stop_event.wait(timeout=time_to_wait_seconds):
print(f"{self.name}: Stop event received during wait period. Exiting loop.")
break
else:
if self._stop_event.is_set():
print(f"{self.name}: Stop event detected before waiting (wait time <= 0). Exiting loop.")
break
print(f"{self.name}: Exited main loop. Attempting final actions.")
try:
final_status_payload = {
"type": MSG_TYPE_ADAPTER_STATUS,
"status_code": STATUS_STOPPED,
"message": f"{self.name}: Adapter thread stopped."
}
try:
self.output_queue.put_nowait(final_status_payload)
print(f"{self.name}: Successfully sent final STATUS_STOPPED to queue.")
except queue.Full:
print(f"{self.name}: Output queue full. Could not send final STATUS_STOPPED message.")
except Exception as e_outer_put:
print(f"{self.name}: Unexpected error during final status message attempt: {e_outer_put}", exc_info=True)
print(f"{self.name}: RUN method is terminating now.")