293 lines
16 KiB
Python
293 lines
16 KiB
Python
# FlightMonitor/map/map_render_manager.py
|
|
"""
|
|
Manages the background worker thread responsible for rendering map images.
|
|
Handles communication (requests, results) with the worker thread.
|
|
"""
|
|
import threading
|
|
import queue
|
|
import time
|
|
from typing import Optional, Dict, Any, List, Tuple, Callable, Union, TYPE_CHECKING
|
|
|
|
# Import PIL/ImageTk types for type hinting without direct import of modules that might be missing
|
|
try:
|
|
from PIL import Image, ImageTk
|
|
PIL_IMAGE_TYPES_AVAILABLE = True
|
|
ImageType = Image.Image # Define ImageType alias
|
|
PhotoImageType = ImageTk.PhotoImage # Define PhotoImageType alias
|
|
except ImportError:
|
|
PIL_IMAGE_TYPES_AVAILABLE = False
|
|
ImageType = Any # Fallback to Any if PIL is not installed
|
|
PhotoImageType = Any # Fallback to Any if PIL is not installed
|
|
|
|
from ..utils.logger import get_logger
|
|
|
|
# Type checking imports for circular dependencies (if needed, e.g., MapCanvasManager)
|
|
if TYPE_CHECKING:
|
|
from .map_canvas_manager import MapCanvasManager # For type hinting
|
|
|
|
module_logger = get_logger(__name__)
|
|
|
|
# Constants for map worker
|
|
MAP_WORKER_QUEUE_TIMEOUT_S = 0.1 # Timeout for getting items from worker queue
|
|
RENDER_REQUEST_TYPE_MAP = "render_map" # Type of request for map rendering
|
|
RENDER_REQUEST_TYPE_SHUTDOWN = "shutdown_worker" # Type of request to shut down worker
|
|
|
|
|
|
class MapRenderManager:
|
|
"""
|
|
Manages a dedicated worker thread for processing map rendering requests.
|
|
Handles queuing of requests, retrieving rendered results, and worker shutdown.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initializes the MapRenderManager.
|
|
Sets up the queues and the worker thread.
|
|
"""
|
|
self._map_render_request_queue: queue.Queue[Dict[str, Any]] = queue.Queue(maxsize=5) # Requests from GUI thread to worker
|
|
self._map_render_result_queue: queue.Queue[Dict[str, Any]] = queue.Queue(maxsize=5) # Results from worker to GUI thread
|
|
self._map_worker_stop_event: threading.Event = threading.Event() # Event to signal worker to stop
|
|
self._map_worker_thread: Optional[threading.Thread] = None # The worker thread instance
|
|
|
|
# Used for tracking requests/results to avoid processing stale data
|
|
self._last_render_request_id: int = 0
|
|
self._expected_render_id_gui: int = 0 # Managed by GUI part of MapCanvasManager, but stored here for context
|
|
|
|
# The actual rendering pipeline function will be passed from MapCanvasManager
|
|
# to avoid circular imports and keep this manager focused on thread/queue logic.
|
|
self._render_pipeline_callable: Optional[Callable[..., Tuple[Optional[PhotoImageType], Optional[Tuple[float, ...]], Optional[str]]]] = None
|
|
|
|
module_logger.debug("MapRenderManager initialized.")
|
|
|
|
def set_render_pipeline_callable(self, render_pipeline_callable: Callable[..., Tuple[Optional[PhotoImageType], Optional[Tuple[float, ...]], Optional[str]]]):
|
|
"""
|
|
Sets the callable function that the worker thread will execute for rendering.
|
|
This function should typically be MapCanvasManager._execute_render_pipeline.
|
|
|
|
Args:
|
|
render_pipeline_callable: The function responsible for executing the map rendering pipeline.
|
|
It should return (PhotoImage, map_geo_bounds, error_message).
|
|
"""
|
|
self._render_pipeline_callable = render_pipeline_callable
|
|
module_logger.debug("MapRenderManager: Render pipeline callable set.")
|
|
|
|
def start_worker(self):
|
|
"""
|
|
Starts the background map rendering worker thread.
|
|
If a worker is already running, it logs a warning.
|
|
"""
|
|
if self._map_worker_thread is not None and self._map_worker_thread.is_alive():
|
|
module_logger.warning("MapRenderManager: Map worker thread already running.")
|
|
return
|
|
|
|
self._map_worker_stop_event.clear() # Clear stop signal for new start
|
|
self._map_worker_thread = threading.Thread(
|
|
target=self._map_render_worker_target,
|
|
name="MapRenderWorker",
|
|
daemon=True # Daemon thread will exit when main program exits
|
|
)
|
|
self._map_worker_thread.start()
|
|
module_logger.info("MapRenderManager: Map rendering worker thread started successfully.")
|
|
|
|
def shutdown_worker(self):
|
|
"""
|
|
Signals the worker thread to stop and waits for it to finish.
|
|
Also clears any pending requests/results in the queues.
|
|
"""
|
|
module_logger.info("MapRenderManager: Shutdown worker requested.")
|
|
self._map_worker_stop_event.set() # Signal the worker thread to stop
|
|
|
|
# Attempt to put a shutdown request in the queue to unblock the worker if it's waiting
|
|
try:
|
|
self._map_render_request_queue.put_nowait({"type": RENDER_REQUEST_TYPE_SHUTDOWN, "request_id": -999})
|
|
except queue.Full:
|
|
module_logger.warning("MapRenderManager: Worker request queue full, cannot send shutdown signal via queue.")
|
|
except Exception as e:
|
|
module_logger.error(f"MapRenderManager: Error sending shutdown signal to worker queue: {e}")
|
|
|
|
# Wait for the worker thread to finish
|
|
if self._map_worker_thread and self._map_worker_thread.is_alive():
|
|
module_logger.info("MapRenderManager: Waiting for map worker thread to join...")
|
|
self._map_worker_thread.join(timeout=2.0) # Wait for max 2 seconds
|
|
if self._map_worker_thread.is_alive():
|
|
module_logger.warning("MapRenderManager: Map worker thread did not join in time.")
|
|
else:
|
|
module_logger.info("MapRenderManager: Map worker thread joined successfully.")
|
|
self._map_worker_thread = None # Clear thread reference
|
|
|
|
# Clear any remaining items in queues
|
|
while not self._map_render_request_queue.empty():
|
|
try: self._map_render_request_queue.get_nowait(); self._map_render_request_queue.task_done()
|
|
except Exception: break
|
|
while not self._map_render_result_queue.empty():
|
|
try: self._map_render_result_queue.get_nowait(); self._map_render_result_queue.task_done()
|
|
except Exception: break
|
|
module_logger.info("MapRenderManager: Worker shutdown sequence complete.")
|
|
|
|
def _map_render_worker_target(self):
|
|
"""
|
|
The main loop for the background map rendering worker thread.
|
|
Pulls rendering requests from the queue and executes the rendering pipeline.
|
|
"""
|
|
thread_name = threading.current_thread().name
|
|
module_logger.debug(f"{thread_name}: Worker thread target loop started.")
|
|
|
|
if not self._render_pipeline_callable:
|
|
module_logger.critical(f"{thread_name}: Render pipeline callable not set. Worker cannot function. Exiting.")
|
|
return
|
|
|
|
while not self._map_worker_stop_event.is_set():
|
|
request_data = None
|
|
request_id = -1
|
|
try:
|
|
# Wait for a request or until stop event is set (with timeout for responsiveness)
|
|
request_data = self._map_render_request_queue.get(timeout=MAP_WORKER_QUEUE_TIMEOUT_S)
|
|
request_type = request_data.get("type")
|
|
request_id = request_data.get("request_id", -1)
|
|
# module_logger.debug(f"{thread_name}: Dequeued request. Type: '{request_type}', ID: {request_id}")
|
|
|
|
if request_type == RENDER_REQUEST_TYPE_SHUTDOWN:
|
|
module_logger.info(f"{thread_name}: Received shutdown request (ID: {request_id}). Terminating.")
|
|
self._map_render_request_queue.task_done()
|
|
break # Exit the worker loop
|
|
|
|
if request_type == RENDER_REQUEST_TYPE_MAP:
|
|
# module_logger.debug(f"{thread_name}: Processing RENDER_REQUEST_TYPE_MAP for ID: {request_id}")
|
|
|
|
# Extract all arguments needed for the rendering pipeline
|
|
center_lat = request_data.get("center_lat")
|
|
center_lon = request_data.get("center_lon")
|
|
zoom = request_data.get("zoom")
|
|
canvas_w = request_data.get("canvas_width")
|
|
canvas_h = request_data.get("canvas_height")
|
|
target_bbox_to_center_and_draw = request_data.get("target_bbox")
|
|
draw_target_bbox_overlay_flag = request_data.get("draw_target_bbox_overlay", False)
|
|
flights_to_draw = request_data.get("flights", [])
|
|
tracks_to_draw = request_data.get("tracks", {})
|
|
max_track_pts_from_req = request_data.get("max_track_points", 20) # Default to 20 if not in request
|
|
|
|
if None in [center_lat, center_lon, zoom, canvas_w, canvas_h]:
|
|
error_message = f"Missing critical rendering parameters for ID {request_id}."
|
|
module_logger.error(f"{thread_name}: {error_message}")
|
|
result_payload_err = {"request_id": request_id, "photo_image": None, "map_geo_bounds": None, "error": error_message}
|
|
try: self._map_render_result_queue.put_nowait(result_payload_err)
|
|
except queue.Full: module_logger.error(f"{thread_name}: Result queue full, dropped error for ID {request_id}.")
|
|
self._map_render_request_queue.task_done()
|
|
continue # Skip to next request
|
|
|
|
# Execute the actual rendering pipeline (provided by MapCanvasManager)
|
|
photo_image_result, actual_map_bounds, error_message = self._render_pipeline_callable(
|
|
center_lat, center_lon, zoom, canvas_w, canvas_h,
|
|
target_bbox_to_center_and_draw,
|
|
draw_target_bbox_overlay_flag,
|
|
flights_to_draw, tracks_to_draw, max_track_pts_from_req
|
|
)
|
|
|
|
# Check for stop signal again after a potentially long rendering operation
|
|
if self._map_worker_stop_event.is_set():
|
|
module_logger.info(f"{thread_name}: Stop event set after render pipeline for ID {request_id}. Discarding result.")
|
|
break # Exit the worker loop
|
|
|
|
# Put the rendering result into the result queue for the GUI thread
|
|
result_payload = {
|
|
"request_id": request_id, "photo_image": photo_image_result,
|
|
"map_geo_bounds": actual_map_bounds, "error": error_message,
|
|
}
|
|
try:
|
|
self._map_render_result_queue.put_nowait(result_payload)
|
|
# module_logger.debug(f"{thread_name}: Successfully put result for ID {request_id} into queue.")
|
|
except queue.Full:
|
|
module_logger.error(f"{thread_name}: Result queue full for ID {request_id}. Result discarded.")
|
|
except Exception as e_put:
|
|
module_logger.error(f"{thread_name}: Error putting result for ID {request_id} into queue: {e_put}")
|
|
|
|
else:
|
|
module_logger.warning(f"{thread_name}: Received unknown request type '{request_type}' for ID {request_id}.")
|
|
|
|
self._map_render_request_queue.task_done() # Mark task as done for this request
|
|
|
|
except queue.Empty:
|
|
# No requests in queue, just continue to check stop event
|
|
continue
|
|
except Exception as e:
|
|
# Catch any unhandled exceptions during request processing
|
|
module_logger.exception(f"{thread_name}: Unhandled exception in worker loop for request ID {request_id if request_data else 'N/A'}: {e}")
|
|
# Try to report the error back to the GUI
|
|
if request_data and request_id != -1:
|
|
error_payload_exc = {"request_id": request_id, "photo_image": None, "map_geo_bounds": None, "error": f"Worker loop unhandled exception: {type(e).__name__}"}
|
|
try:
|
|
self._map_render_result_queue.put_nowait(error_payload_exc)
|
|
except queue.Full:
|
|
module_logger.error(f"{thread_name}: Result queue full while trying to report worker unhandled exception.")
|
|
except Exception as e_put_err:
|
|
module_logger.error(f"{thread_name}: Error putting unhandled exception report to result queue: {e_put_err}")
|
|
# Ensure the task is marked done if an exception occurred after getting it
|
|
if request_data:
|
|
try: self._map_render_request_queue.task_done()
|
|
except ValueError: pass # task_done() might raise if called too many times
|
|
time.sleep(0.5) # Brief pause after an error to prevent tight looping on persistent errors
|
|
|
|
module_logger.info(f"{thread_name}: Worker thread target loop finished.")
|
|
|
|
def put_render_request(self, request_payload: Dict[str, Any]) -> Optional[int]:
|
|
"""
|
|
Puts a map rendering request into the worker's request queue.
|
|
Assigns a unique request ID.
|
|
|
|
Args:
|
|
request_payload: A dictionary containing all parameters for rendering.
|
|
|
|
Returns:
|
|
Optional[int]: The ID of the request if successfully queued, None otherwise.
|
|
"""
|
|
self._last_render_request_id += 1 # Increment request ID
|
|
current_request_id = self._last_render_request_id
|
|
request_payload["request_id"] = current_request_id # Add ID to payload
|
|
|
|
try:
|
|
self._map_render_request_queue.put_nowait(request_payload)
|
|
module_logger.debug(f"MapRenderManager: Queued render request ID {current_request_id}.")
|
|
return current_request_id
|
|
except queue.Full:
|
|
module_logger.warning(f"MapRenderManager: Render request queue FULL. Request ID {current_request_id} was DROPPED.")
|
|
return None
|
|
except Exception as e:
|
|
module_logger.error(f"MapRenderManager: Error putting render request ID {current_request_id} into queue: {e}")
|
|
return None
|
|
|
|
def get_render_result(self) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Attempts to retrieve a map rendering result from the worker's result queue.
|
|
This method should be called periodically by the GUI thread.
|
|
|
|
Returns:
|
|
Optional[Dict[str, Any]]: A dictionary containing the rendering result, or None if no result is available.
|
|
"""
|
|
try:
|
|
result = self._map_render_result_queue.get_nowait()
|
|
self._map_render_result_queue.task_done()
|
|
return result
|
|
except queue.Empty:
|
|
return None
|
|
except Exception as e:
|
|
module_logger.error(f"MapRenderManager: Error getting render result from queue: {e}")
|
|
return None
|
|
|
|
def get_expected_gui_render_id(self) -> int:
|
|
"""
|
|
Returns the ID of the last request that the GUI expects to process.
|
|
This is set by MapCanvasManager before queuing a request.
|
|
"""
|
|
return self._expected_render_id_gui
|
|
|
|
def set_expected_gui_render_id(self, request_id: int):
|
|
"""
|
|
Sets the ID of the last request that the GUI expects to process.
|
|
Called by MapCanvasManager when it queues a new request.
|
|
"""
|
|
self._expected_render_id_gui = request_id
|
|
module_logger.debug(f"MapRenderManager: Expected GUI render ID set to {request_id}.")
|
|
|
|
def is_worker_alive(self) -> bool:
|
|
"""Checks if the worker thread is currently alive."""
|
|
return self._map_worker_thread is not None and self._map_worker_thread.is_alive() |