SXXXXXXX_FlightMonitor/flightmonitor/data/opensky_historical_adapter.py
2025-06-13 07:42:31 +02:00

264 lines
12 KiB
Python

# FlightMonitor/data/opensky_historical_adapter.py
"""
Adapter for fetching historical flight data from the OpenSky Network API.
This adapter iterates over a specified time range, fetching data at discrete intervals.
It requires authentication to access historical data.
"""
import time
import threading
from queue import Queue, Full as QueueFull
from typing import Dict, Any, List, Optional
import requests
from . import config as app_config
from ..utils.logger import get_logger
from .common_models import CanonicalFlightState
from .base_live_data_adapter import BaseLiveDataAdapter, AdapterMessage
from .data_constants import (
MSG_TYPE_FLIGHT_DATA,
MSG_TYPE_ADAPTER_STATUS,
STATUS_STARTING,
STATUS_FETCHING,
STATUS_STOPPED,
STATUS_PERMANENT_FAILURE,
PROVIDER_NAME_OPENSKY,
)
import warnings
from urllib3.exceptions import InsecureRequestWarning
# Suppress only the InsecureRequestWarning from urllib3
warnings.simplefilter('ignore', InsecureRequestWarning)
module_logger = get_logger(__name__)
class OpenSkyHistoricalAdapter(BaseLiveDataAdapter):
"""
Fetches historical flight data from OpenSky for a given time range and bounding box.
Requires authentication via OAuth2 Client Credentials.
"""
def __init__(
self,
output_queue: Queue[AdapterMessage],
bounding_box: Dict[str, float],
start_timestamp: int,
end_timestamp: int,
sampling_interval_sec: int,
scan_rate_sec: int,
daemon: bool = True,
):
super().__init__(
output_queue,
bounding_box,
float(scan_rate_sec),
daemon,
name="OpenSkyHistoricalAdapter",
)
self.start_timestamp = int(start_timestamp)
self.end_timestamp = int(end_timestamp)
self.sampling_interval = int(sampling_interval_sec)
# Authentication attributes
self.client_id = getattr(app_config, "OPENSKY_CLIENT_ID", None)
self.client_secret = getattr(app_config, "OPENSKY_CLIENT_SECRET", None)
self.token_url = getattr(app_config, "OPENSKY_TOKEN_URL", None)
self.access_token: Optional[str] = None
self.token_expires_at: float = 0.0
self.api_timeout = getattr(app_config, "DEFAULT_API_TIMEOUT_SECONDS", 15)
if not (self.client_id and self.client_secret and self.token_url):
module_logger.critical(f"{self.name}: OAuth2 credentials/token URL missing. Historical adapter cannot function.")
# This will be handled in the run method to send a failure status.
def _get_oauth_token(self) -> bool:
"""Requests an OAuth2 access token from OpenSky Network."""
if not (self.client_id and self.client_secret and self.token_url):
module_logger.error(f"{self.name}: Cannot get OAuth token, client ID/secret/URL missing.")
return False
from urllib.parse import urlencode
payload_dict = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
encoded_payload = urlencode(payload_dict)
# --- MODIFICA CHIAVE ---
# Aggiunto l'header User-Agent per replicare esattamente l'adapter live.
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0"
}
# --- FINE MODIFICA ---
module_logger.info(f"{self.name}: Requesting new OAuth2 access token from {self.token_url}...")
try:
response = requests.post(
self.token_url,
data=encoded_payload,
headers=headers,
timeout=self.api_timeout,
verify=False
)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data.get("access_token")
expires_in = token_data.get("expires_in", 300)
self.token_expires_at = time.time() + expires_in - 60 # 60-second buffer
if self.access_token:
module_logger.info(f"{self.name}: Successfully obtained OAuth2 access token.")
return True
else:
module_logger.error(f"{self.name}: 'access_token' missing in response: {token_data}")
self.access_token = None
return False
except requests.exceptions.RequestException as e:
module_logger.error(f"{self.name}: Error obtaining OAuth token: {e}", exc_info=True)
self.access_token = None
return False
def _is_token_valid(self) -> bool:
"""Checks if the current access token is present and not expired."""
return self.access_token is not None and time.time() < self.token_expires_at
def _send_status_to_queue(self, status_code: str, message: str):
"""Helper to send status messages to the output queue."""
status_payload = {"type": MSG_TYPE_ADAPTER_STATUS, "status_code": status_code, "message": f"{self.name}: {message}"}
try:
self.output_queue.put_nowait(status_payload)
except QueueFull:
module_logger.warning(f"{self.name}: Output queue full. Could not send status: {status_code}")
def _convert_direct_api_state_to_canonical(self, raw_state_list: List[Any]) -> Optional[CanonicalFlightState]:
"""Converts a raw state vector list (from direct API call JSON) to CanonicalFlightState."""
try:
if not raw_state_list or len(raw_state_list) < 17: return None
icao24 = raw_state_list[0]
if not icao24: return None
primary_ts = raw_state_list[3] if raw_state_list[3] is not None else raw_state_list[4]
last_contact_ts = raw_state_list[4]
if primary_ts is None: primary_ts = time.time()
if last_contact_ts is None: last_contact_ts = primary_ts
return CanonicalFlightState(
icao24=str(icao24).lower().strip(),
callsign=str(raw_state_list[1]).strip() if raw_state_list[1] else None,
origin_country=str(raw_state_list[2]),
timestamp=float(primary_ts),
last_contact_timestamp=float(last_contact_ts),
longitude=float(raw_state_list[5]) if raw_state_list[5] is not None else None,
latitude=float(raw_state_list[6]) if raw_state_list[6] is not None else None,
baro_altitude_m=float(raw_state_list[7]) if raw_state_list[7] is not None else None,
on_ground=bool(raw_state_list[8]),
velocity_mps=float(raw_state_list[9]) if raw_state_list[9] is not None else None,
true_track_deg=float(raw_state_list[10]) if raw_state_list[10] is not None else None,
vertical_rate_mps=float(raw_state_list[11]) if raw_state_list[11] is not None else None,
geo_altitude_m=float(raw_state_list[13]) if raw_state_list[13] is not None else None,
squawk=str(raw_state_list[14]) if raw_state_list[14] else None,
spi=bool(raw_state_list[15]),
position_source=str(raw_state_list[16]),
raw_data_provider=PROVIDER_NAME_OPENSKY
)
except (IndexError, TypeError, ValueError) as e:
module_logger.error(f"{self.name}: Error converting direct API state list: {e}. List: {raw_state_list}", exc_info=False)
return None
return None
def run(self):
"""Main thread loop for the historical adapter."""
module_logger.info(f"{self.name}: Thread starting historical download...")
if not (self.client_id and self.client_secret and self.token_url):
err_msg = "OAuth2 credentials not configured. Historical download requires authentication."
module_logger.critical(f"{self.name}: {err_msg}")
self._send_status_to_queue(STATUS_PERMANENT_FAILURE, err_msg)
return
for current_time in range(self.start_timestamp, self.end_timestamp + 1, self.sampling_interval):
if self.is_stopped():
module_logger.info(f"{self.name}: Stop signal received. Terminating.")
break
if not self._is_token_valid():
if not self._get_oauth_token():
err_msg = "Failed to obtain a valid authentication token. Stopping download."
module_logger.critical(f"{self.name}: {err_msg}")
self._send_status_to_queue(STATUS_PERMANENT_FAILURE, err_msg)
break
try:
self._send_status_to_queue(STATUS_FETCHING, f"Fetching data for timestamp {current_time}...")
params = {
"time": current_time,
"lamin": self.bounding_box["lat_min"],
"lomin": self.bounding_box["lon_min"],
"lamax": self.bounding_box["lat_max"],
"lomax": self.bounding_box["lon_max"],
}
headers = {"Authorization": f"Bearer {self.access_token}"}
# --- INIZIO LOGGING FORENSE ---
req = requests.Request('GET', app_config.OPENSKY_API_URL, params=params, headers=headers)
prepared_req = req.prepare()
module_logger.info("-----------------------------------------------------")
module_logger.info(f"--- Making API Call for Historical Data ---")
module_logger.info(f"URL: {prepared_req.url}")
#module_logger.info(f"Headers: {prepared_req.headers}")
module_logger.info("-----------------------------------------------------")
# --- FINE LOGGING FORENSE ---
with requests.Session() as session:
response = session.send(prepared_req, timeout=self.api_timeout, verify=False)
response.raise_for_status()
json_response = response.json()
raw_states_list = json_response.get("states", [])
canonical_states: List[CanonicalFlightState] = []
if raw_states_list:
for raw_state in raw_states_list:
cs = self._convert_direct_api_state_to_canonical(raw_state)
if cs:
canonical_states.append(cs)
data_message: AdapterMessage = {"type": MSG_TYPE_FLIGHT_DATA, "timestamp": current_time, "payload": canonical_states}
self.output_queue.put(data_message)
module_logger.info(f"{self.name}: Success for timestamp {current_time}. Found {len(canonical_states)} states.")
except requests.exceptions.HTTPError as http_err:
# --- LOGGING FORENSE DELL'ERRORE ---
module_logger.error("-----------------------------------------------------")
module_logger.error(f"--- API Call FAILED ---")
module_logger.error(f"Status Code: {http_err.response.status_code}")
try:
error_details = http_err.response.json()
module_logger.error(f"Error JSON Response: {error_details}")
except requests.exceptions.JSONDecodeError:
module_logger.error(f"Error Raw Text: {http_err.response.text}")
module_logger.error("-----------------------------------------------------")
if http_err.response.status_code in [401, 403]:
self.access_token = None
else:
module_logger.error(f"{self.name}: Unhandled HTTP error for timestamp {current_time}: {http_err}", exc_info=False)
except Exception as e:
module_logger.error(f"{self.name}: Generic error fetching data for timestamp {current_time}: {e}", exc_info=True)
if self.polling_interval > 0:
if self._stop_event.wait(timeout=self.polling_interval):
module_logger.info(f"{self.name}: Stop signal during wait period. Exiting loop.")
break
self._send_status_to_queue(STATUS_STOPPED, "Historical download finished.")
module_logger.info(f"{self.name}: Historical download loop finished.")