764 lines
30 KiB
Python
764 lines
30 KiB
Python
# FlightMonitor/map/map_canvas_manager.py
|
|
|
|
import tkinter as tk
|
|
import math
|
|
import time
|
|
from typing import Optional, Tuple, List, Dict, Any
|
|
from collections import deque
|
|
import queue
|
|
import threading
|
|
import copy
|
|
import os
|
|
|
|
try:
|
|
from PIL import Image, ImageTk, ImageDraw, ImageFont
|
|
|
|
PIL_IMAGE_LIB_AVAILABLE = True
|
|
except ImportError:
|
|
Image, ImageTk, ImageDraw, ImageFont = None, None, None, None
|
|
PIL_IMAGE_LIB_AVAILABLE = False
|
|
import logging
|
|
|
|
logging.error(
|
|
"MapCanvasManager: Pillow (Image, ImageTk, ImageDraw, ImageFont) not found. Map disabled."
|
|
)
|
|
|
|
from flightmonitor.map.map_utils import (
|
|
PYPROJ_MODULE_LOCALLY_AVAILABLE,
|
|
MERCANTILE_MODULE_LOCALLY_AVAILABLE,
|
|
geo_to_pixel_on_unscaled_map,
|
|
)
|
|
from flightmonitor.map import map_utils
|
|
from flightmonitor.map import map_constants
|
|
from flightmonitor.data import config as app_config
|
|
from flightmonitor.data.common_models import CanonicalFlightState
|
|
from flightmonitor.map.map_services import BaseMapService, OpenStreetMapService
|
|
from flightmonitor.map.map_tile_manager import MapTileManager
|
|
from flightmonitor.map.map_render_manager import (
|
|
MapRenderManager,
|
|
RENDER_REQUEST_TYPE_MAP,
|
|
)
|
|
from flightmonitor.map import map_drawing
|
|
|
|
try:
|
|
from flightmonitor.utils.logger import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
except ImportError:
|
|
logger = logging.getLogger(__name__)
|
|
if not logger.hasHandlers():
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger.warning("MapCanvasManager using fallback standard Python logger.")
|
|
|
|
|
|
CANVAS_SIZE_HARD_FALLBACK_PX = 800
|
|
RESIZE_DEBOUNCE_DELAY_MS = 250
|
|
PAN_STEP_FRACTION = 0.25
|
|
GUI_RESULT_POLL_INTERVAL_MS = 50
|
|
RENDER_OVERSIZE_FACTOR = 1.5
|
|
CANVAS_MARGIN_FACTOR_FOR_BBOX_FIT = 0.10
|
|
AIRCRAFT_VISIBILITY_TIMEOUT_SECONDS = 20.0
|
|
|
|
|
|
class MapCanvasManager:
|
|
|
|
def __init__(
|
|
self,
|
|
app_controller: Any,
|
|
tk_canvas: tk.Canvas,
|
|
initial_bbox_dict: Optional[Dict[str, float]],
|
|
is_detail_map: bool = False,
|
|
):
|
|
self.is_detail_map = is_detail_map
|
|
self.log_prefix = f"MCM (detail={self.is_detail_map})"
|
|
|
|
if not (
|
|
PIL_IMAGE_LIB_AVAILABLE
|
|
and MERCANTILE_MODULE_LOCALLY_AVAILABLE
|
|
and map_utils.mercantile
|
|
):
|
|
raise ImportError(f"{self.log_prefix}: Critical dependencies missing.")
|
|
|
|
self.app_controller = app_controller
|
|
self.canvas = tk_canvas
|
|
self.canvas_width = tk_canvas.winfo_width() or CANVAS_SIZE_HARD_FALLBACK_PX
|
|
self.canvas_height = tk_canvas.winfo_height() or getattr(
|
|
app_config, "DEFAULT_CANVAS_HEIGHT", 600
|
|
)
|
|
|
|
self._current_center_lat_gui: Optional[float] = None
|
|
self._current_center_lon_gui: Optional[float] = None
|
|
self._current_zoom_gui: int = map_constants.DEFAULT_INITIAL_ZOOM
|
|
self._current_map_geo_bounds_gui: Optional[
|
|
Tuple[float, float, float, float]
|
|
] = None
|
|
self._target_bbox_input_gui: Optional[Dict[str, float]] = None
|
|
self._map_photo_image: Optional[ImageTk.PhotoImage] = None
|
|
self._canvas_image_id: Optional[int] = None
|
|
self._placeholder_text_id: Optional[int] = None
|
|
self.map_service: BaseMapService = OpenStreetMapService()
|
|
self.tile_manager: MapTileManager = MapTileManager(map_service=self.map_service)
|
|
|
|
self._current_flights_to_display_gui: List[CanonicalFlightState] = []
|
|
self._active_aircraft_states: Dict[str, CanonicalFlightState] = {}
|
|
self._last_playback_timestamp: float = 0.0
|
|
|
|
self.flight_tracks_gui: Dict[str, deque] = {}
|
|
self.max_track_points: int = getattr(
|
|
app_config, "DEFAULT_TRACK_HISTORY_POINTS", 20
|
|
)
|
|
|
|
self._resize_debounce_job_id: Optional[str] = None
|
|
self.map_render_manager = MapRenderManager()
|
|
self.map_render_manager.set_render_pipeline_callable(
|
|
self._execute_render_pipeline
|
|
)
|
|
self._gui_after_id_result_processor: Optional[str] = None
|
|
self._map_data_lock: threading.Lock = threading.Lock()
|
|
|
|
self.map_render_manager.start_worker()
|
|
|
|
if initial_bbox_dict and map_utils._is_valid_bbox_dict(initial_bbox_dict):
|
|
self.set_target_bbox(initial_bbox_dict)
|
|
else:
|
|
self._display_placeholder_text("Ready")
|
|
|
|
self._setup_event_bindings()
|
|
self._start_gui_result_processing()
|
|
logger.info(f">>> {self.log_prefix} __init__ FINISHED <<<")
|
|
|
|
def _execute_render_pipeline(
|
|
self,
|
|
center_lat,
|
|
center_lon,
|
|
zoom,
|
|
canvas_w,
|
|
canvas_h,
|
|
target_bbox,
|
|
draw_target_bbox,
|
|
flights,
|
|
tracks,
|
|
max_track_points,
|
|
):
|
|
render_w = int(canvas_w * RENDER_OVERSIZE_FACTOR)
|
|
render_h = int(canvas_h * RENDER_OVERSIZE_FACTOR)
|
|
|
|
oversized_bbox = map_utils.calculate_geographic_bbox_from_pixel_size_and_zoom(
|
|
center_lat,
|
|
center_lon,
|
|
render_w,
|
|
render_h,
|
|
zoom,
|
|
self.tile_manager.tile_size,
|
|
)
|
|
if not oversized_bbox:
|
|
return None, None, "Failed to calculate oversized bbox for rendering"
|
|
|
|
tile_ranges = map_utils.get_tile_ranges_for_bbox(oversized_bbox, zoom)
|
|
if not tile_ranges:
|
|
return None, None, "Failed to get tile ranges for bbox"
|
|
|
|
stitched_map = self.tile_manager.stitch_map_image(
|
|
zoom, tile_ranges[0], tile_ranges[1]
|
|
)
|
|
if not stitched_map:
|
|
return None, None, "Failed to stitch map"
|
|
|
|
stitched_bounds = self.tile_manager._get_bounds_for_tile_range(
|
|
zoom, tile_ranges
|
|
)
|
|
if not stitched_bounds:
|
|
return None, None, "Failed to get bounds for stitched map"
|
|
|
|
center_px = geo_to_pixel_on_unscaled_map(
|
|
center_lat, center_lon, stitched_bounds, stitched_map.size
|
|
)
|
|
if not center_px:
|
|
center_px = (stitched_map.width / 2, stitched_map.height / 2)
|
|
|
|
crop_x0 = round(center_px[0] - canvas_w / 2)
|
|
crop_y0 = round(center_px[1] - canvas_h / 2)
|
|
final_image = stitched_map.crop(
|
|
(crop_x0, crop_y0, crop_x0 + canvas_w, crop_y0 + canvas_h)
|
|
)
|
|
|
|
final_bounds = map_utils.calculate_geographic_bbox_from_pixel_size_and_zoom(
|
|
center_lat,
|
|
center_lon,
|
|
canvas_w,
|
|
canvas_h,
|
|
zoom,
|
|
self.tile_manager.tile_size,
|
|
)
|
|
if not final_bounds:
|
|
return None, None, "Failed to calculate final map bounds"
|
|
|
|
overlay_rgba = Image.new("RGBA", final_image.size, (0, 0, 0, 0))
|
|
draw = ImageDraw.Draw(overlay_rgba)
|
|
|
|
if map_constants.GRATICULE_ENABLED and final_bounds:
|
|
graticule_font = map_drawing._load_label_font(0, is_graticule=True)
|
|
map_drawing.draw_graticules(
|
|
draw, final_bounds, overlay_rgba.size, graticule_font
|
|
)
|
|
|
|
if (
|
|
draw_target_bbox
|
|
and target_bbox
|
|
and map_utils._is_valid_bbox_dict(target_bbox)
|
|
):
|
|
bbox_wesn = (
|
|
target_bbox["lon_min"],
|
|
target_bbox["lat_min"],
|
|
target_bbox["lon_max"],
|
|
target_bbox["lat_max"],
|
|
)
|
|
map_drawing.draw_area_bounding_box(
|
|
draw, bbox_wesn, final_bounds, overlay_rgba.size, "blue", 2
|
|
)
|
|
if flights:
|
|
label_font_size = 10 + (zoom - 10)
|
|
flight_label_font = map_drawing._load_label_font(label_font_size)
|
|
for flight in flights:
|
|
if flight.latitude is not None and flight.longitude is not None:
|
|
pixel_coords = map_drawing.geo_to_pixel_on_unscaled_map(
|
|
flight.latitude,
|
|
flight.longitude,
|
|
final_bounds,
|
|
overlay_rgba.size,
|
|
)
|
|
if pixel_coords:
|
|
map_drawing._draw_single_flight(
|
|
draw,
|
|
pixel_coords,
|
|
flight,
|
|
flight_label_font,
|
|
tracks.get(flight.icao24),
|
|
final_bounds,
|
|
overlay_rgba.size,
|
|
)
|
|
|
|
base_map_rgba = final_image.convert("RGBA")
|
|
composite_image = Image.alpha_composite(base_map_rgba, overlay_rgba)
|
|
|
|
return ImageTk.PhotoImage(composite_image), final_bounds, None
|
|
|
|
def _start_gui_result_processing(self):
|
|
if self._gui_after_id_result_processor and self.canvas.winfo_exists():
|
|
self.canvas.after_cancel(self._gui_after_id_result_processor)
|
|
if self.canvas.winfo_exists():
|
|
self._gui_after_id_result_processor = self.canvas.after(
|
|
GUI_RESULT_POLL_INTERVAL_MS, self._process_map_render_results
|
|
)
|
|
|
|
def _process_map_render_results(self):
|
|
if not self.canvas.winfo_exists():
|
|
self._gui_after_id_result_processor = None
|
|
return
|
|
try:
|
|
while True:
|
|
result = self.map_render_manager.get_render_result()
|
|
if not result:
|
|
break
|
|
req_id, photo, bounds, err = (
|
|
result.get("request_id"),
|
|
result.get("photo_image"),
|
|
result.get("map_geo_bounds"),
|
|
result.get("error"),
|
|
)
|
|
if req_id < self.map_render_manager.get_expected_gui_render_id():
|
|
continue
|
|
if err:
|
|
self._display_placeholder_text(f"Map Error:\n{err[:100]}")
|
|
elif photo:
|
|
self._clear_canvas_display_elements()
|
|
self._map_photo_image = photo
|
|
self._current_map_geo_bounds_gui = bounds
|
|
self._canvas_image_id = self.canvas.create_image(
|
|
0, 0, anchor=tk.NW, image=self._map_photo_image
|
|
)
|
|
if not self.is_detail_map:
|
|
self.app_controller.update_general_map_info()
|
|
except Exception as e:
|
|
logger.exception(f"Error processing map results: {e}")
|
|
if self.canvas.winfo_exists():
|
|
self._gui_after_id_result_processor = self.canvas.after(
|
|
GUI_RESULT_POLL_INTERVAL_MS, self._process_map_render_results
|
|
)
|
|
|
|
def _request_map_render(
|
|
self, center_lat: float, center_lon: float, zoom_level: int
|
|
):
|
|
if not self.map_render_manager.is_worker_alive():
|
|
self._display_placeholder_text("Map Worker Offline")
|
|
return
|
|
|
|
with self._map_data_lock:
|
|
payload = {
|
|
"type": RENDER_REQUEST_TYPE_MAP,
|
|
"center_lat": center_lat,
|
|
"center_lon": center_lon,
|
|
"zoom": zoom_level,
|
|
"canvas_width": self.canvas_width,
|
|
"canvas_height": self.canvas_height,
|
|
"target_bbox": copy.deepcopy(self._target_bbox_input_gui),
|
|
"draw_target_bbox_overlay": not self.is_detail_map,
|
|
"flights": copy.deepcopy(self._current_flights_to_display_gui),
|
|
"tracks": copy.deepcopy(self.flight_tracks_gui),
|
|
"max_track_points": self.max_track_points,
|
|
}
|
|
req_id = self.map_render_manager.put_render_request(payload)
|
|
if req_id is not None:
|
|
self.map_render_manager.set_expected_gui_render_id(req_id)
|
|
self._display_placeholder_text(f"Loading Map... Z{zoom_level}")
|
|
(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
) = (center_lat, center_lon, zoom_level)
|
|
else:
|
|
self._display_placeholder_text("Map Busy / Request Queue Full")
|
|
|
|
def _setup_event_bindings(self):
|
|
self.canvas.bind("<Configure>", self._on_canvas_resize)
|
|
self.canvas.bind("<ButtonPress-1>", self._on_left_button_press)
|
|
self.canvas.bind("<B1-Motion>", self._on_mouse_drag)
|
|
self.canvas.bind("<ButtonRelease-1>", self._on_left_button_release)
|
|
self.canvas.bind("<ButtonPress-3>", self._on_right_click)
|
|
self.canvas.bind("<MouseWheel>", self._on_mouse_wheel)
|
|
self.canvas.bind("<Button-4>", self._on_mouse_wheel_linux)
|
|
self.canvas.bind("<Button-5>", self._on_mouse_wheel_linux)
|
|
self._drag_start_x_canvas, self._drag_start_y_canvas = None, None
|
|
|
|
def _on_canvas_resize(self, event: tk.Event):
|
|
if (
|
|
event.width > 1
|
|
and event.height > 1
|
|
and (self.canvas_width != event.width or self.canvas_height != event.height)
|
|
):
|
|
if self._resize_debounce_job_id:
|
|
self.canvas.after_cancel(self._resize_debounce_job_id)
|
|
self._resize_debounce_job_id = self.canvas.after(
|
|
RESIZE_DEBOUNCE_DELAY_MS,
|
|
self._perform_resize_redraw,
|
|
event.width,
|
|
event.height,
|
|
)
|
|
|
|
def _perform_resize_redraw(self, width: int, height: int):
|
|
self._resize_debounce_job_id = None
|
|
if not self.canvas.winfo_exists():
|
|
return
|
|
self.canvas_width, self.canvas_height = width, height
|
|
if (
|
|
self._current_center_lat_gui is not None
|
|
and self._current_center_lon_gui is not None
|
|
and self._current_zoom_gui is not None
|
|
):
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
)
|
|
|
|
def _on_left_button_press(self, event: tk.Event):
|
|
self._drag_start_x_canvas, self._drag_start_y_canvas = event.x, event.y
|
|
|
|
def _on_mouse_drag(self, event: tk.Event):
|
|
if self._drag_start_x_canvas is None or self._drag_start_y_canvas is None:
|
|
return
|
|
dx, dy = (
|
|
event.x - self._drag_start_x_canvas,
|
|
event.y - self._drag_start_y_canvas,
|
|
)
|
|
if abs(dx) < 5 and abs(dy) < 5:
|
|
return
|
|
self._drag_start_x_canvas, self._drag_start_y_canvas = event.x, event.y
|
|
if not all(
|
|
[
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
PYPROJ_MODULE_LOCALLY_AVAILABLE,
|
|
map_utils.pyproj,
|
|
]
|
|
):
|
|
return
|
|
res = map_utils.calculate_meters_per_pixel(
|
|
self._current_center_lat_gui,
|
|
self._current_zoom_gui,
|
|
self.tile_manager.tile_size,
|
|
)
|
|
if not res:
|
|
return
|
|
dmx, dmy = -dx * res, dy * res
|
|
geod = map_utils.pyproj.Geod(ellps="WGS84")
|
|
new_lon, new_lat, _ = geod.fwd(
|
|
self._current_center_lon_gui, self._current_center_lat_gui, 90, dmx
|
|
)
|
|
new_lon, new_lat, _ = geod.fwd(new_lon, new_lat, 0, dmy)
|
|
self._request_map_render(new_lat, new_lon, self._current_zoom_gui)
|
|
|
|
def _on_left_button_release(self, event: tk.Event):
|
|
if (
|
|
self._drag_start_x_canvas is not None
|
|
and abs(event.x - self._drag_start_x_canvas) < 5
|
|
and abs(event.y - self._drag_start_y_canvas) < 5
|
|
):
|
|
if self._current_map_geo_bounds_gui and self._map_photo_image:
|
|
map_size = (
|
|
self._map_photo_image.width(),
|
|
self._map_photo_image.height(),
|
|
)
|
|
lon, lat = map_utils.pixel_to_geo(
|
|
event.x, event.y, self._current_map_geo_bounds_gui, map_size
|
|
)
|
|
if lon is not None and lat is not None:
|
|
self.app_controller.on_map_left_click(
|
|
lat, lon, event.x, event.y, event.x_root, event.y_root
|
|
)
|
|
self._drag_start_x_canvas = None
|
|
|
|
def _on_mouse_wheel(self, event: tk.Event):
|
|
if self._current_zoom_gui is None:
|
|
return
|
|
if event.delta > 0:
|
|
self.zoom_in_at_center()
|
|
else:
|
|
self.zoom_out_at_center()
|
|
|
|
def _on_mouse_wheel_linux(self, event: tk.Event):
|
|
if self._current_zoom_gui is None:
|
|
return
|
|
if event.num == 4:
|
|
self.zoom_in_at_center()
|
|
elif event.num == 5:
|
|
self.zoom_out_at_center()
|
|
|
|
def get_clicked_flight_icao(self, canvas_x: int, canvas_y: int) -> Optional[str]:
|
|
if not self._current_map_geo_bounds_gui or not self._map_photo_image:
|
|
return None
|
|
min_dist_sq, clicked_icao = float("inf"), None
|
|
radius_sq = 15**2
|
|
map_size = (self._map_photo_image.width(), self._map_photo_image.height())
|
|
|
|
# Determine which list of flights to use
|
|
flights_to_check = []
|
|
with self._map_data_lock:
|
|
if self._active_aircraft_states:
|
|
flights_to_check = list(self._active_aircraft_states.values())
|
|
else:
|
|
flights_to_check = self._current_flights_to_display_gui
|
|
|
|
for flight in flights_to_check:
|
|
if flight.latitude is not None and flight.longitude is not None:
|
|
px_coords = geo_to_pixel_on_unscaled_map(
|
|
flight.latitude,
|
|
flight.longitude,
|
|
self._current_map_geo_bounds_gui,
|
|
map_size,
|
|
)
|
|
if px_coords:
|
|
dist_sq = (canvas_x - px_coords[0]) ** 2 + (
|
|
canvas_y - px_coords[1]
|
|
) ** 2
|
|
if dist_sq < radius_sq and dist_sq < min_dist_sq:
|
|
min_dist_sq, clicked_icao = dist_sq, flight.icao24
|
|
return clicked_icao
|
|
|
|
def _on_right_click(self, event: tk.Event):
|
|
if self._current_map_geo_bounds_gui and self._map_photo_image:
|
|
map_size = (self._map_photo_image.width(), self._map_photo_image.height())
|
|
lon, lat = map_utils.pixel_to_geo(
|
|
event.x, event.y, self._current_map_geo_bounds_gui, map_size
|
|
)
|
|
if lon is not None and lat is not None:
|
|
self.app_controller.on_map_right_click(
|
|
lat, lon, event.x_root, event.y_root
|
|
)
|
|
|
|
def set_max_track_points(self, length: int):
|
|
self.max_track_points = max(2, length)
|
|
with self._map_data_lock:
|
|
for icao in self.flight_tracks_gui:
|
|
old_deque = self.flight_tracks_gui[icao]
|
|
self.flight_tracks_gui[icao] = deque(
|
|
list(old_deque), maxlen=self.max_track_points + 5
|
|
)
|
|
if self._current_center_lat_gui:
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
)
|
|
|
|
def set_target_bbox(self, bbox_dict: Optional[Dict[str, float]]):
|
|
if bbox_dict and map_utils._is_valid_bbox_dict(bbox_dict):
|
|
with self._map_data_lock:
|
|
self._target_bbox_input_gui = bbox_dict.copy()
|
|
self._request_map_render_for_bbox(bbox_dict)
|
|
if not self.is_detail_map:
|
|
self.app_controller.update_bbox_gui_fields(bbox_dict)
|
|
else:
|
|
with self._map_data_lock:
|
|
self._target_bbox_input_gui = None
|
|
if self._current_center_lat_gui:
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
)
|
|
if not self.is_detail_map:
|
|
self.app_controller.update_bbox_gui_fields({})
|
|
|
|
def _request_map_render_for_bbox(self, bbox, preserve_zoom=False):
|
|
center_lat, center_lon = (bbox["lat_min"] + bbox["lat_max"]) / 2, (
|
|
bbox["lon_min"] + bbox["lon_max"]
|
|
) / 2
|
|
zoom = self._current_zoom_gui
|
|
if not preserve_zoom or not zoom:
|
|
size_km = map_utils.calculate_geographic_bbox_size_km(
|
|
(bbox["lon_min"], bbox["lat_min"], bbox["lon_max"], bbox["lat_max"])
|
|
)
|
|
if size_km:
|
|
w, h = self.canvas_width * (
|
|
1 - CANVAS_MARGIN_FACTOR_FOR_BBOX_FIT
|
|
), self.canvas_height * (1 - CANVAS_MARGIN_FACTOR_FOR_BBOX_FIT)
|
|
zoom_w = map_utils.calculate_zoom_level_for_geographic_size(
|
|
center_lat, size_km[0] * 1000, int(w), self.tile_manager.tile_size
|
|
)
|
|
zoom_h = map_utils.calculate_zoom_level_for_geographic_size(
|
|
center_lat, size_km[1] * 1000, int(h), self.tile_manager.tile_size
|
|
)
|
|
zoom = (
|
|
min(zoom_w, zoom_h)
|
|
if zoom_w and zoom_h
|
|
else map_constants.DEFAULT_INITIAL_ZOOM
|
|
)
|
|
self._request_map_render(center_lat, center_lon, zoom)
|
|
|
|
def _clear_canvas_display_elements(self):
|
|
if self.canvas.winfo_exists():
|
|
if self._canvas_image_id:
|
|
self.canvas.delete(self._canvas_image_id)
|
|
if self._placeholder_text_id:
|
|
self.canvas.delete(self._placeholder_text_id)
|
|
self._canvas_image_id, self._placeholder_text_id, self._map_photo_image = (
|
|
None,
|
|
None,
|
|
None,
|
|
)
|
|
|
|
def _display_placeholder_text(self, text: str):
|
|
if not self.canvas.winfo_exists():
|
|
return
|
|
self._clear_canvas_display_elements()
|
|
self.canvas.configure(
|
|
bg=getattr(map_constants, "DEFAULT_PLACEHOLDER_COLOR_RGB_TK", "gray85")
|
|
)
|
|
w, h = (
|
|
self.canvas.winfo_width() or self.canvas_width,
|
|
self.canvas.winfo_height() or self.canvas_height,
|
|
)
|
|
if w > 1 and h > 1:
|
|
self._placeholder_text_id = self.canvas.create_text(
|
|
w / 2, h / 2, text=text, fill="gray10", font=("Arial", 11)
|
|
)
|
|
|
|
def clear_map_display(self):
|
|
self._display_placeholder_text("Map Cleared")
|
|
with self._map_data_lock:
|
|
self._current_flights_to_display_gui.clear()
|
|
self._active_aircraft_states.clear()
|
|
self._last_playback_timestamp = 0.0
|
|
self.flight_tracks_gui.clear()
|
|
if self._current_center_lat_gui:
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
)
|
|
|
|
def update_flights_on_map(self, flight_states: List[CanonicalFlightState]):
|
|
with self._map_data_lock:
|
|
self._current_flights_to_display_gui = flight_states
|
|
active_icao = {s.icao24 for s in flight_states}
|
|
for icao in list(self.flight_tracks_gui.keys()):
|
|
if icao not in active_icao:
|
|
del self.flight_tracks_gui[icao]
|
|
for state in flight_states:
|
|
if state.icao24 not in self.flight_tracks_gui:
|
|
self.flight_tracks_gui[state.icao24] = deque(
|
|
maxlen=self.max_track_points + 5
|
|
)
|
|
if (
|
|
state.latitude is not None
|
|
and state.longitude is not None
|
|
and state.timestamp is not None
|
|
):
|
|
self.flight_tracks_gui[state.icao24].append(
|
|
(state.latitude, state.longitude, state.timestamp)
|
|
)
|
|
if self._current_center_lat_gui:
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
)
|
|
|
|
def update_playback_frame(self, new_flight_states: List[CanonicalFlightState], virtual_timestamp: float):
|
|
"""
|
|
Updates the map for a playback frame, maintaining state and handling timeouts.
|
|
"""
|
|
with self._map_data_lock:
|
|
self._last_playback_timestamp = virtual_timestamp
|
|
|
|
# 1. Prune timed-out aircraft
|
|
timed_out_icaos = {
|
|
icao for icao, state in self._active_aircraft_states.items()
|
|
if virtual_timestamp - state.last_contact_timestamp > AIRCRAFT_VISIBILITY_TIMEOUT_SECONDS
|
|
}
|
|
if timed_out_icaos:
|
|
for icao in timed_out_icaos:
|
|
del self._active_aircraft_states[icao]
|
|
if icao in self.flight_tracks_gui:
|
|
del self.flight_tracks_gui[icao]
|
|
logger.debug(f"Playback: Pruned {len(timed_out_icaos)} timed-out aircraft.")
|
|
|
|
# 2. Update with new states
|
|
for state in new_flight_states:
|
|
self._active_aircraft_states[state.icao24] = state
|
|
|
|
# Update track (TODO: Add interpolation logic here in the next step)
|
|
if state.icao24 not in self.flight_tracks_gui:
|
|
self.flight_tracks_gui[state.icao24] = deque(maxlen=self.max_track_points + 5)
|
|
|
|
if state.latitude is not None and state.longitude is not None and state.timestamp is not None:
|
|
self.flight_tracks_gui[state.icao24].append(
|
|
(state.latitude, state.longitude, state.timestamp)
|
|
)
|
|
|
|
# Prepare a copy of the active flights for rendering
|
|
self._current_flights_to_display_gui = list(self._active_aircraft_states.values())
|
|
|
|
# 3. Request render
|
|
if self._current_center_lat_gui:
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui
|
|
)
|
|
|
|
def get_current_map_info(self) -> Dict[str, Any]:
|
|
size_w, size_h = (None, None)
|
|
if (
|
|
PYPROJ_MODULE_LOCALLY_AVAILABLE
|
|
and self._current_map_geo_bounds_gui
|
|
and map_utils.pyproj
|
|
):
|
|
size_w, size_h = map_utils.calculate_geographic_bbox_size_km(
|
|
self._current_map_geo_bounds_gui
|
|
) or (None, None)
|
|
with self._map_data_lock:
|
|
count = len(self._current_flights_to_display_gui)
|
|
bbox = copy.deepcopy(self._target_bbox_input_gui)
|
|
return {
|
|
"zoom": self._current_zoom_gui,
|
|
"map_geo_bounds": self._current_map_geo_bounds_gui,
|
|
"target_bbox_input": bbox,
|
|
"flight_count": count,
|
|
"map_size_km_w": size_w,
|
|
"map_size_km_h": size_h,
|
|
}
|
|
|
|
def shutdown_worker(self):
|
|
self.map_render_manager.shutdown_worker()
|
|
|
|
def recenter_map_at_coords(self, lat: float, lon: float):
|
|
if self._current_zoom_gui is not None:
|
|
self._request_map_render(lat, lon, self._current_zoom_gui)
|
|
|
|
def set_bbox_around_coords(self, lat: float, lon: float, size_km: float):
|
|
bbox_tuple = map_utils.get_bounding_box_from_center_size(lat, lon, size_km)
|
|
if bbox_tuple:
|
|
self.set_target_bbox(
|
|
{
|
|
"lon_min": bbox_tuple[0],
|
|
"lat_min": bbox_tuple[1],
|
|
"lon_max": bbox_tuple[2],
|
|
"lat_max": bbox_tuple[3],
|
|
}
|
|
)
|
|
|
|
def zoom_in_at_center(self):
|
|
if (
|
|
self._current_zoom_gui is not None
|
|
and self._current_center_lat_gui is not None
|
|
and self._current_center_lon_gui is not None
|
|
):
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui + 1,
|
|
)
|
|
|
|
def zoom_out_at_center(self):
|
|
if (
|
|
self._current_zoom_gui is not None
|
|
and self._current_center_lat_gui is not None
|
|
and self._current_center_lon_gui is not None
|
|
):
|
|
self._request_map_render(
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui - 1,
|
|
)
|
|
|
|
def pan_map_fixed_step(self, direction: str):
|
|
if not all(
|
|
[
|
|
self._current_center_lat_gui,
|
|
self._current_center_lon_gui,
|
|
self._current_zoom_gui,
|
|
PYPROJ_MODULE_LOCALLY_AVAILABLE,
|
|
map_utils.pyproj,
|
|
]
|
|
):
|
|
return
|
|
res = map_utils.calculate_meters_per_pixel(
|
|
self._current_center_lat_gui,
|
|
self._current_zoom_gui,
|
|
self.tile_manager.tile_size,
|
|
)
|
|
if not res:
|
|
return
|
|
dx, dy = 0, 0
|
|
if direction == "left":
|
|
dx = -self.canvas_width * PAN_STEP_FRACTION
|
|
elif direction == "right":
|
|
dx = self.canvas_width * PAN_STEP_FRACTION
|
|
elif direction == "up":
|
|
dy = -self.canvas_height * PAN_STEP_FRACTION
|
|
elif direction == "down":
|
|
dy = self.canvas_height * PAN_STEP_FRACTION
|
|
dmx, dmy = dx * res, -dy * res
|
|
geod = map_utils.pyproj.Geod(ellps="WGS84")
|
|
new_lon, new_lat, _ = geod.fwd(
|
|
self._current_center_lon_gui, self._current_center_lat_gui, 90, dmx
|
|
)
|
|
new_lon, new_lat, _ = geod.fwd(new_lon, new_lat, 0, dmy)
|
|
self._request_map_render(new_lat, new_lon, self._current_zoom_gui)
|
|
|
|
def center_map_and_fit_patch(self, lat: float, lon: float, size_km: float):
|
|
zoom_w = map_utils.calculate_zoom_level_for_geographic_size(
|
|
lat, size_km * 1000, self.canvas_width, self.tile_manager.tile_size
|
|
)
|
|
zoom_h = map_utils.calculate_zoom_level_for_geographic_size(
|
|
lat, size_km * 1000, self.canvas_height, self.tile_manager.tile_size
|
|
)
|
|
zoom = (
|
|
min(zoom_w, zoom_h) - 1
|
|
if zoom_w and zoom_h
|
|
else map_constants.DEFAULT_INITIAL_ZOOM
|
|
)
|
|
self._request_map_render(lat, lon, max(map_constants.MIN_ZOOM_LEVEL, zoom)) |