primo commit del modulo map_manager
This commit is contained in:
commit
663a87a24b
151
.gitignore
vendored
Normal file
151
.gitignore
vendored
Normal file
@ -0,0 +1,151 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
pip-wheel-metadata/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a CI server in a temp folder.
|
||||
# Then everything is copied to shipping folder during release.
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# PEP 582; __pypackages__
|
||||
__pypackages__/
|
||||
|
||||
# PEP 621; pyproject.toml sections
|
||||
.pdm.toml
|
||||
.pdm.lock
|
||||
# .venv
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# static analysis tool
|
||||
.flake8
|
||||
|
||||
# VS Code
|
||||
.vscode/*
|
||||
!.vscode/settings.json
|
||||
!.vscode/tasks.json
|
||||
!.vscode/launch.json
|
||||
!.vscode/extensions.json
|
||||
.history/
|
||||
|
||||
# sublime
|
||||
*.sublime-workspace
|
||||
*.sublime-project
|
||||
|
||||
# Kate
|
||||
.kateproject
|
||||
.kateproject.lock
|
||||
.katenewfile. Neuen Filenamensvorschlag merken.
|
||||
|
||||
# Temporary files
|
||||
*.swp
|
||||
*~
|
||||
|
||||
174
INTEGRATION.md
Normal file
174
INTEGRATION.md
Normal file
@ -0,0 +1,174 @@
|
||||
# INTEGRAZIONE: usare `python-map-manager` come git submodule in GeoElevation
|
||||
|
||||
Questo documento spiega come integrare `python-map-manager` in un progetto esistente
|
||||
(ad esempio il repository `geoelevation`) come sottoprogetto (git submodule) e come
|
||||
usarlo a runtime.
|
||||
|
||||
Nota: `python-map-manager` è pensato per essere GUI-agnostico — la logica core non
|
||||
dipende da Tkinter/OpenCV; il demo `debug_tool.py` è separato e opzionale.
|
||||
|
||||
1) Aggiungere come git submodule
|
||||
|
||||
Dal repository principale (`geoelevation`) esegui (PowerShell):
|
||||
|
||||
```powershell
|
||||
# posizionati nella root del repo geoelevation
|
||||
cd C:\path\to\geoelevation
|
||||
|
||||
# aggiungi il submodule (sostituisci <REPO_URL> con l'URL del repo python-map-manager)
|
||||
git submodule add <REPO_URL> submodules/python-map-manager
|
||||
git submodule update --init --recursive
|
||||
```
|
||||
|
||||
Se preferisci tenere il codice direttamente nella tree, puoi copiare la cartella,
|
||||
ma usare un submodule mantiene la separazione e facilita aggiornamenti futuri.
|
||||
|
||||
2) Installare le dipendenze nell'ambiente del progetto
|
||||
|
||||
`python-map-manager` usa alcune librerie opzionali (Pillow, requests, mercantile, pyproj, ecc.).
|
||||
Nel progetto principale, installa i requisiti del submodule nel virtualenv del progetto:
|
||||
|
||||
```powershell
|
||||
python -m pip install -r submodules/python-map-manager/requirements.txt
|
||||
```
|
||||
|
||||
3) Rendere importabile il package nel codice di `geoelevation`
|
||||
|
||||
Opzioni possibili:
|
||||
|
||||
- Aggiungere il submodule al `PYTHONPATH` (semplice, non richiede packaging):
|
||||
|
||||
- Esempio (PowerShell) per lanciare script dalla root del progetto:
|
||||
|
||||
```powershell
|
||||
$env:PYTHONPATH = "$PWD\submodules\python-map-manager"; python -m your_app
|
||||
```
|
||||
|
||||
- O aggiungere ad `sys.path` all'avvio dell'applicazione (es. in `geoelevation/__main__.py`):
|
||||
|
||||
```python
|
||||
import os, sys
|
||||
pkg_root = os.path.join(os.path.dirname(__file__), '..', 'submodules', 'python-map-manager')
|
||||
if pkg_root not in sys.path:
|
||||
sys.path.insert(0, pkg_root)
|
||||
```
|
||||
|
||||
- (Opzionale) Se preferisci installare come pacchetto editable (richiede un `pyproject`/`setup`):
|
||||
|
||||
```powershell
|
||||
pip install -e submodules/python-map-manager
|
||||
```
|
||||
|
||||
4) Esempio rapido di utilizzo in `geoelevation`
|
||||
|
||||
```python
|
||||
from map_manager.engine import MapEngine
|
||||
|
||||
# creare engine (cache_dir può essere un percorso gestito da geoelevation)
|
||||
engine = MapEngine(service_name='osm', cache_dir='path/to/cache', enable_online=True)
|
||||
|
||||
# chiamata semplice (sincrona) per ottenere immagine area
|
||||
img = engine.get_image_for_area((w, s, e, n), zoom=12)
|
||||
|
||||
# usare progress callback (esempio):
|
||||
def progress_cb(done, total, last_tile_duration, from_cache, tile_coords):
|
||||
# attenzione: callback può essere invocato da thread di lavoro
|
||||
# se aggiornate UI dovete marshalare sull'UI thread (es. root.after in Tkinter)
|
||||
print(f"{done}/{total} tiles — last {last_tile_duration:.2f}s — from_cache={from_cache} coords={tile_coords}")
|
||||
|
||||
img = engine.get_image_for_area((w, s, e, n), zoom=None, max_size=800, progress_callback=progress_cb)
|
||||
```
|
||||
|
||||
5) Note su threading e UI
|
||||
|
||||
- `stitch_map_image` invoca il `progress_callback` dal thread dove viene eseguito
|
||||
(cioè il thread che chiama `get_image_for_area` / `stitch_map_image`).
|
||||
- Nel demo `debug_tool.py` le chiamate a `get_image_for_area` sono eseguite in un
|
||||
worker thread e il callback marshala gli aggiornamenti con `root.after(...)`.
|
||||
- Se integrate nelle GUI di `geoelevation`, assicuratevi di marshalarle similmente.
|
||||
|
||||
6) Dipendenze e packaging
|
||||
|
||||
- Se il vostro progetto usa un singolo `requirements.txt` o un `pyproject.toml`,
|
||||
considerate aggiungere le dipendenze del submodule (contenute in
|
||||
`submodules/python-map-manager/requirements.txt`) al file di dipendenze principale.
|
||||
- Raccomando di usare un virtualenv isolato per lo sviluppo e i test.
|
||||
|
||||
7) Test e CI
|
||||
|
||||
- I test presenti in `python-map-manager/tests` possono essere eseguiti dalla root
|
||||
del submodule. Quando integrate come submodule, nel CI aggiungete uno step che:
|
||||
- installa le dipendenze
|
||||
- esegue i test (es. `python -m pytest python-map-manager/tests`)
|
||||
|
||||
Domande frequenti (FAQ)
|
||||
- Q: Il demo GUI è obbligatorio? A: No, è solo un tool per sviluppatori.
|
||||
- Q: La libreria farà richieste in parallelo? A: Attualmente il comportamento di default
|
||||
è sequenziale; possiamo abilitare download concorrenti come opzione sicura.
|
||||
|
||||
Se vuoi, posso aggiungere un esempio `geoelevation` che mostra come inizializzare
|
||||
il `MapEngine` dal codice del progetto e integrare la pulizia della cache nei
|
||||
comandi di gestione di `geoelevation`.
|
||||
# Integration Guide — using `python-map-manager` from `geoelevation`
|
||||
|
||||
This document explains how to integrate the local `python-map-manager` package into the `geoelevation` application as a Git submodule (or as a local package) and shows example usage of the new public API (`MapEngine`, `MapVisualizer`).
|
||||
|
||||
1) Add as a git submodule (optional)
|
||||
|
||||
From the `geoelevation` repository root, add the external repo as a submodule:
|
||||
|
||||
```powershell
|
||||
git submodule add -b master <URL_REPO> external/python-map-manager
|
||||
git submodule update --init --recursive
|
||||
```
|
||||
|
||||
If you're working locally and prefer to keep the module inside the same repo for now, copy the `python-map-manager` directory under `external/` or keep it at repo root.
|
||||
|
||||
2) Make the package importable
|
||||
|
||||
Options:
|
||||
- Add the submodule path to `PYTHONPATH` at runtime, or
|
||||
- Install the package into your environment (editable install) during development:
|
||||
|
||||
```powershell
|
||||
python -m pip install -e external\python-map-manager
|
||||
# or, for local-only testing
|
||||
$env:PYTHONPATH+=';C:\path\to\geoelevation\external\python-map-manager'
|
||||
```
|
||||
|
||||
3) Minimal usage example
|
||||
|
||||
In code, import the engine and visualizer:
|
||||
|
||||
```python
|
||||
from map_manager.engine import MapEngine
|
||||
from map_manager.visualizer import MapVisualizer
|
||||
|
||||
engine = MapEngine(service_name='osm', cache_dir='map_tile_cache', enable_online=True)
|
||||
visual = MapVisualizer(engine)
|
||||
|
||||
# Example: get an image for a bbox (west,south,east,north)
|
||||
bbox = (7.0, 45.0, 8.0, 46.0)
|
||||
img = engine.get_image_for_area(bbox, max_size=1024) # choose zoom automatically
|
||||
if img is not None:
|
||||
visual.show_pil_image(img)
|
||||
|
||||
# For callbacks: set a click handler
|
||||
def on_map_click(lat, lon):
|
||||
elevation = query_elevation(lat, lon) # your app logic
|
||||
print('Elevation', elevation)
|
||||
|
||||
visual.set_click_callback(on_map_click)
|
||||
```
|
||||
|
||||
4) Integration points in `geoelevation`
|
||||
|
||||
- Replace internal `map_viewer` usage with the submodule API:
|
||||
- Where `map_viewer` previously called tile download/stitching, replace with `MapEngine.get_image_for_area` or `get_image_for_point`.
|
||||
- Where UI components called directly into `ElevationManager`, change to use `MapVisualizer` callbacks (subscribe your callback to `visual.set_click_callback`).
|
||||
|
||||
5) Notes and recommendations
|
||||
|
||||
- Keep `MapEngine` independent of `ElevationManager` — pass callbacks into `MapVisualizer` from `geoelevation` so the submodule does not import application business logic.
|
||||
- For headless or server usage, do not import/use `MapVisualizer`; `MapEngine` works without OpenCV.
|
||||
- Add `python-map-manager/requirements.txt` to your environment or include its dependencies in the main project's dependency list.
|
||||
40
README.md
Normal file
40
README.md
Normal file
@ -0,0 +1,40 @@
|
||||
# python-map-manager (local copy)
|
||||
|
||||
Minimal local copy of the `python-map-manager` package for development and testing.
|
||||
|
||||
Usage:
|
||||
|
||||
1. Install dependencies (recommended in a virtualenv):
|
||||
|
||||
```powershell
|
||||
python -m pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. Run the debug tool:
|
||||
|
||||
```powershell
|
||||
python debug_tool.py
|
||||
```
|
||||
|
||||
This will try to download tiles from OpenStreetMap and show a small stitched image.
|
||||
|
||||
Progress callback
|
||||
-----------------
|
||||
|
||||
The library supports an optional progress callback that consumer applications
|
||||
can provide to receive per-tile progress updates while stitching. The
|
||||
callback signature is:
|
||||
|
||||
```
|
||||
progress_callback(done: int, total: int, last_tile_duration: float, from_cache: bool, tile_coords: tuple)
|
||||
```
|
||||
|
||||
- `done`: number of tiles processed so far
|
||||
- `total`: total number of tiles to process
|
||||
- `last_tile_duration`: seconds spent fetching/reading the last tile
|
||||
- `from_cache`: True if the tile was already present in cache before retrieval
|
||||
- `tile_coords`: a (z,x,y) tuple identifying the tile
|
||||
|
||||
The callback may be invoked from the thread that called into `stitch_map_image`,
|
||||
so GUI applications should marshal updates onto their UI thread (for example
|
||||
using `root.after` in Tkinter).
|
||||
535
debug_tool.py
Normal file
535
debug_tool.py
Normal file
@ -0,0 +1,535 @@
|
||||
"""Tkinter-based interactive demo for python-map-manager.
|
||||
|
||||
Provides a small GUI to exercise MapEngine and MapVisualizer with preset
|
||||
examples and controls for point/area fetching, caching and saving images.
|
||||
"""
|
||||
import sys
|
||||
import pathlib
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
script_dir = pathlib.Path(__file__).parent.resolve()
|
||||
if str(script_dir) not in sys.path:
|
||||
sys.path.insert(0, str(script_dir))
|
||||
|
||||
try:
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, filedialog, messagebox
|
||||
except Exception:
|
||||
tk = None
|
||||
|
||||
from PIL import Image, ImageTk
|
||||
|
||||
from map_manager.engine import MapEngine
|
||||
from map_manager.visualizer import MapVisualizer
|
||||
|
||||
|
||||
class DebugGUI:
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
root.title('python-map-manager Demo')
|
||||
|
||||
self.service = 'osm'
|
||||
self.cache_dir = 'debug_map_cache'
|
||||
self.online = True
|
||||
self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online)
|
||||
self.visual = MapVisualizer(self.engine)
|
||||
self.last_image = None
|
||||
self._photo = None
|
||||
|
||||
self._build_ui()
|
||||
|
||||
def _build_ui(self):
|
||||
frm = ttk.Frame(self.root, padding=8)
|
||||
frm.grid(column=0, row=0, sticky='nsew')
|
||||
self.root.columnconfigure(0, weight=1)
|
||||
self.root.rowconfigure(0, weight=1)
|
||||
|
||||
# Left: controls
|
||||
controls = ttk.Frame(frm)
|
||||
controls.grid(column=0, row=0, sticky='nw')
|
||||
|
||||
# Point inputs
|
||||
ttk.Label(controls, text='Point (lat lon zoom radius)').grid(column=0, row=0, sticky='w')
|
||||
self.point_lat = tk.StringVar(value='41.9028')
|
||||
self.point_lon = tk.StringVar(value='12.4964')
|
||||
self.point_zoom = tk.StringVar(value='12')
|
||||
self.point_radius = tk.StringVar(value='1')
|
||||
e_lat = ttk.Entry(controls, width=10, textvariable=self.point_lat)
|
||||
e_lon = ttk.Entry(controls, width=10, textvariable=self.point_lon)
|
||||
e_zoom = ttk.Entry(controls, width=6, textvariable=self.point_zoom)
|
||||
e_rad = ttk.Entry(controls, width=6, textvariable=self.point_radius)
|
||||
e_lat.grid(column=0, row=1)
|
||||
e_lon.grid(column=1, row=1)
|
||||
e_zoom.grid(column=2, row=1)
|
||||
e_rad.grid(column=3, row=1)
|
||||
self.btn_point = ttk.Button(controls, text='Fetch Point', command=self.fetch_point)
|
||||
self.btn_point.grid(column=0, row=2, columnspan=2, pady=4)
|
||||
|
||||
# Area inputs
|
||||
ttk.Label(controls, text='Area (W S E N)').grid(column=0, row=3, sticky='w', pady=(8,0))
|
||||
self.area_w = tk.StringVar(value='12.49')
|
||||
self.area_s = tk.StringVar(value='41.89')
|
||||
self.area_e = tk.StringVar(value='12.50')
|
||||
self.area_n = tk.StringVar(value='41.90')
|
||||
a_w = ttk.Entry(controls, width=10, textvariable=self.area_w)
|
||||
a_s = ttk.Entry(controls, width=10, textvariable=self.area_s)
|
||||
a_e = ttk.Entry(controls, width=10, textvariable=self.area_e)
|
||||
a_n = ttk.Entry(controls, width=10, textvariable=self.area_n)
|
||||
a_w.grid(column=0, row=4)
|
||||
a_s.grid(column=1, row=4)
|
||||
a_e.grid(column=2, row=4)
|
||||
a_n.grid(column=3, row=4)
|
||||
|
||||
ttk.Label(controls, text='Zoom / MaxSize').grid(column=0, row=5, sticky='w')
|
||||
self.area_zoom = tk.StringVar(value='')
|
||||
self.area_maxsize = tk.StringVar(value='800')
|
||||
az = ttk.Entry(controls, width=6, textvariable=self.area_zoom)
|
||||
am = ttk.Entry(controls, width=8, textvariable=self.area_maxsize)
|
||||
az.grid(column=0, row=6)
|
||||
am.grid(column=1, row=6)
|
||||
self.btn_area = ttk.Button(controls, text='Fetch Area', command=self.fetch_area)
|
||||
self.btn_area.grid(column=0, row=7, columnspan=2, pady=4)
|
||||
|
||||
# Preset examples
|
||||
ttk.Label(controls, text='Examples').grid(column=0, row=8, sticky='w', pady=(8,0))
|
||||
self.examples = ttk.Combobox(controls, values=['Rome point', 'Rome area', 'Antimeridian area (toy)'])
|
||||
self.examples.current(0)
|
||||
self.examples.grid(column=0, row=9, columnspan=2, sticky='we')
|
||||
ttk.Button(controls, text='Run Example', command=self.run_example).grid(column=2, row=9)
|
||||
|
||||
# Actions
|
||||
ttk.Button(controls, text='Toggle Online', command=self.toggle_online).grid(column=0, row=10, pady=(12,0))
|
||||
ttk.Button(controls, text='Clear Cache', command=self.clear_cache).grid(column=1, row=10, pady=(12,0))
|
||||
ttk.Button(controls, text='Save Image', command=self.save_image).grid(column=2, row=10, pady=(12,0))
|
||||
ttk.Button(controls, text='Show Image', command=self.show_image).grid(column=3, row=10, pady=(12,0))
|
||||
|
||||
# Status/log
|
||||
self.status = tk.StringVar(value='Ready')
|
||||
ttk.Label(controls, textvariable=self.status).grid(column=0, row=11, columnspan=4, sticky='we', pady=(8,0))
|
||||
|
||||
# Progress bar + ETA
|
||||
self.progress_var = tk.IntVar(value=0)
|
||||
self.progress = ttk.Progressbar(controls, orient='horizontal', length=200, mode='determinate', variable=self.progress_var)
|
||||
self.progress.grid(column=0, row=12, columnspan=3, sticky='we', pady=(6,0))
|
||||
self.eta_var = tk.StringVar(value='')
|
||||
ttk.Label(controls, textvariable=self.eta_var).grid(column=3, row=12, sticky='w', pady=(6,0))
|
||||
|
||||
# Right: image preview
|
||||
preview = ttk.Frame(frm)
|
||||
preview.grid(column=1, row=0, sticky='nsew', padx=(12,0))
|
||||
frm.columnconfigure(1, weight=1)
|
||||
frm.rowconfigure(0, weight=1)
|
||||
|
||||
self.canvas = tk.Label(preview, text='No image', background='black', width=60, height=30)
|
||||
self.canvas.pack(fill='both', expand=True)
|
||||
|
||||
def log(self, msg: str):
|
||||
print(msg)
|
||||
self.status.set(msg)
|
||||
|
||||
def fetch_point(self):
|
||||
try:
|
||||
lat = float(self.point_lat.get())
|
||||
lon = float(self.point_lon.get())
|
||||
zoom = int(self.point_zoom.get())
|
||||
radius = int(self.point_radius.get())
|
||||
except Exception:
|
||||
messagebox.showerror('Input error', 'Invalid numeric input for point')
|
||||
return
|
||||
self.log(f'Fetching point {lat},{lon} @ z={zoom} r={radius}...')
|
||||
# run in background thread and show ETA based on per-tile durations
|
||||
self._set_busy(True)
|
||||
def worker():
|
||||
durations = []
|
||||
cached = 0
|
||||
downloaded = 0
|
||||
def progress_cb(done, total, last_dur, from_cache, tile_coords):
|
||||
nonlocal cached, downloaded
|
||||
durations.append(last_dur)
|
||||
if from_cache:
|
||||
cached += 1
|
||||
else:
|
||||
downloaded += 1
|
||||
avg = sum(durations) / len(durations) if durations else 0.0
|
||||
remaining = max(0, total - done)
|
||||
eta = remaining * avg
|
||||
# schedule UI update on main thread
|
||||
try:
|
||||
self.root.after(0, lambda: self._update_progress(done, total, eta, cached, downloaded))
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=radius, progress_callback=progress_cb)
|
||||
if img is None:
|
||||
self.root.after(0, lambda: self.log('No image returned (check dependencies and online state)'))
|
||||
else:
|
||||
self.last_image = img
|
||||
self.root.after(0, lambda: self._update_preview(img))
|
||||
self.root.after(0, lambda: self.log('Point image loaded'))
|
||||
finally:
|
||||
self.root.after(0, lambda: self._set_busy(False))
|
||||
threading.Thread(target=worker, daemon=True).start()
|
||||
|
||||
def fetch_area(self):
|
||||
try:
|
||||
w = float(self.area_w.get())
|
||||
s = float(self.area_s.get())
|
||||
e = float(self.area_e.get())
|
||||
n = float(self.area_n.get())
|
||||
except Exception:
|
||||
messagebox.showerror('Input error', 'Invalid numeric input for area')
|
||||
return
|
||||
zoom = None
|
||||
if self.area_zoom.get().strip():
|
||||
try:
|
||||
zoom = int(self.area_zoom.get())
|
||||
except Exception:
|
||||
messagebox.showerror('Input error', 'Invalid zoom')
|
||||
return
|
||||
maxsize = None
|
||||
if self.area_maxsize.get().strip():
|
||||
try:
|
||||
maxsize = int(self.area_maxsize.get())
|
||||
except Exception:
|
||||
messagebox.showerror('Input error', 'Invalid maxsize')
|
||||
return
|
||||
self.log(f'Fetching area bbox=({w},{s},{e},{n}) zoom={zoom} maxsize={maxsize}...')
|
||||
self._set_busy(True)
|
||||
def worker():
|
||||
durations = []
|
||||
cached = 0
|
||||
downloaded = 0
|
||||
def progress_cb(done, total, last_dur, from_cache, tile_coords):
|
||||
nonlocal cached, downloaded
|
||||
durations.append(last_dur)
|
||||
if from_cache:
|
||||
cached += 1
|
||||
else:
|
||||
downloaded += 1
|
||||
avg = sum(durations) / len(durations) if durations else 0.0
|
||||
remaining = max(0, total - done)
|
||||
eta = remaining * avg
|
||||
try:
|
||||
self.root.after(0, lambda: self._update_progress(done, total, eta, cached, downloaded))
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
img = self.engine.get_image_for_area((w, s, e, n), zoom=zoom, max_size=maxsize, progress_callback=progress_cb)
|
||||
if img is None:
|
||||
self.root.after(0, lambda: self.log('No image returned (check dependencies and online state)'))
|
||||
else:
|
||||
self.last_image = img
|
||||
self.root.after(0, lambda: self._update_preview(img))
|
||||
self.root.after(0, lambda: self.log('Area image loaded'))
|
||||
finally:
|
||||
self.root.after(0, lambda: self._set_busy(False))
|
||||
threading.Thread(target=worker, daemon=True).start()
|
||||
|
||||
def run_example(self):
|
||||
sel = self.examples.get()
|
||||
if sel == 'Rome point':
|
||||
self.point_lat.set('41.9028')
|
||||
self.point_lon.set('12.4964')
|
||||
self.point_zoom.set('12')
|
||||
self.point_radius.set('1')
|
||||
self.fetch_point()
|
||||
elif sel == 'Rome area':
|
||||
self.area_w.set('12.49')
|
||||
self.area_s.set('41.89')
|
||||
self.area_e.set('12.50')
|
||||
self.area_n.set('41.90')
|
||||
self.area_maxsize.set('800')
|
||||
self.fetch_area()
|
||||
elif sel == 'Antimeridian area (toy)':
|
||||
# small bbox crossing antimeridian near 179E/-179W
|
||||
self.area_w.set('179.5')
|
||||
self.area_s.set('-1.0')
|
||||
self.area_e.set('-179.5')
|
||||
self.area_n.set('1.0')
|
||||
self.area_maxsize.set('600')
|
||||
self.fetch_area()
|
||||
|
||||
def toggle_online(self):
|
||||
self.online = not self.online
|
||||
self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online)
|
||||
self.visual = MapVisualizer(self.engine)
|
||||
self.log(f'Online fetching set to {self.online}')
|
||||
|
||||
def clear_cache(self):
|
||||
try:
|
||||
self.engine.tile_manager.clear_entire_service_cache()
|
||||
self.log('Cache cleared')
|
||||
except Exception as e:
|
||||
messagebox.showerror('Error', f'Failed to clear cache: {e}')
|
||||
|
||||
def save_image(self):
|
||||
if not self.last_image:
|
||||
messagebox.showinfo('No image', 'No image to save')
|
||||
return
|
||||
path = filedialog.asksaveasfilename(defaultextension='.png', filetypes=[('PNG','*.png')])
|
||||
if not path:
|
||||
return
|
||||
try:
|
||||
self.last_image.save(path)
|
||||
self.log(f'Saved image to {path}')
|
||||
except Exception as e:
|
||||
messagebox.showerror('Save error', f'Failed to save: {e}')
|
||||
|
||||
def show_image(self):
|
||||
if not self.last_image:
|
||||
messagebox.showinfo('No image', 'No image to show')
|
||||
return
|
||||
try:
|
||||
self.visual.show_pil_image(self.last_image)
|
||||
except Exception as e:
|
||||
messagebox.showerror('Show error', f'Failed to show image: {e}')
|
||||
|
||||
def _update_preview(self, pil_image: Image.Image):
|
||||
# Resize image to fit preview area if needed
|
||||
try:
|
||||
w = max(200, min(1024, pil_image.width))
|
||||
h = int(pil_image.height * (w / pil_image.width)) if pil_image.width else pil_image.height
|
||||
thumb = pil_image.copy()
|
||||
thumb.thumbnail((w, 800))
|
||||
self._photo = ImageTk.PhotoImage(thumb)
|
||||
self.canvas.configure(image=self._photo, text='')
|
||||
except Exception as e:
|
||||
self.canvas.configure(text=f'Preview error: {e}')
|
||||
|
||||
def _update_progress(self, done: int, total: int, eta_seconds: float, cached_count: int = 0, downloaded_count: int = 0):
|
||||
try:
|
||||
self.progress['maximum'] = total
|
||||
self.progress_var.set(done)
|
||||
if eta_seconds is None:
|
||||
eta_text = ''
|
||||
else:
|
||||
eta_text = f'ETA: {int(eta_seconds)}s'
|
||||
counts_text = f' D:{downloaded_count} C:{cached_count}'
|
||||
self.eta_var.set(eta_text + counts_text)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _set_busy(self, busy: bool):
|
||||
try:
|
||||
state = 'disabled' if busy else 'normal'
|
||||
self.btn_point.configure(state=state)
|
||||
self.btn_area.configure(state=state)
|
||||
if not busy:
|
||||
# clear progress when done
|
||||
self.progress_var.set(0)
|
||||
self.progress['maximum'] = 1
|
||||
self.eta_var.set('')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
if tk is None:
|
||||
print('Tkinter is not available in this Python environment.')
|
||||
return
|
||||
root = tk.Tk()
|
||||
app = DebugGUI(root)
|
||||
root.mainloop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
"""Simple debug tool to exercise MapEngine and MapVisualizer locally."""
|
||||
"""Interactive debug tool for python-map-manager.
|
||||
|
||||
Provides a small REPL to exercise MapEngine and MapVisualizer without
|
||||
editing code. Commands:
|
||||
- point LAT LON ZOOM [RADIUS] -> fetch and show tiles around a point
|
||||
- area W S E N [--zoom Z | --maxsize PIX] -> fetch and show area
|
||||
- save PATH -> save last image to PATH
|
||||
- show -> open last image with visualizer
|
||||
- toggle_online -> toggle online fetching on/off
|
||||
- clear_cache -> clear the map tiles cache for current service
|
||||
- exit -> quit
|
||||
|
||||
This tool is intended for developer iteration and debugging.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import pathlib
|
||||
import os
|
||||
import shlex
|
||||
from typing import Optional
|
||||
|
||||
script_dir = pathlib.Path(__file__).parent.resolve()
|
||||
if str(script_dir) not in sys.path:
|
||||
sys.path.insert(0, str(script_dir))
|
||||
|
||||
from map_manager.engine import MapEngine
|
||||
from map_manager.visualizer import MapVisualizer
|
||||
|
||||
|
||||
def prompt():
|
||||
return input('map-debug> ')
|
||||
|
||||
|
||||
def parse_floats(args):
|
||||
try:
|
||||
return [float(x) for x in args]
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class DebugApp:
|
||||
def __init__(self):
|
||||
self.service = 'osm'
|
||||
self.cache_dir = 'debug_map_cache'
|
||||
self.online = True
|
||||
self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online)
|
||||
self.visual = MapVisualizer(self.engine)
|
||||
self.visual.set_click_callback(self.on_click)
|
||||
self.last_image = None
|
||||
|
||||
def on_click(self, lat, lon):
|
||||
print(f'Clicked callback: {lat}, {lon}')
|
||||
|
||||
def cmd_point(self, parts):
|
||||
# point LAT LON ZOOM [RADIUS]
|
||||
if len(parts) < 4:
|
||||
print('Usage: point LAT LON ZOOM [RADIUS]')
|
||||
return
|
||||
vals = parse_floats(parts[1:])
|
||||
if not vals or len(vals) < 3:
|
||||
print('Invalid numeric arguments')
|
||||
return
|
||||
lat, lon, zoom = vals[0], vals[1], int(vals[2])
|
||||
radius = int(vals[3]) if len(vals) >= 4 else 1
|
||||
print(f'Fetching point {lat},{lon} @ zoom {zoom} radius {radius}')
|
||||
img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=radius)
|
||||
if img is None:
|
||||
print('No image returned.')
|
||||
else:
|
||||
self.last_image = img
|
||||
self.visual.show_pil_image(img)
|
||||
|
||||
def cmd_area(self, parts):
|
||||
# area W S E N [--zoom Z] [--maxsize PIX]
|
||||
# minimal parser
|
||||
if len(parts) < 5:
|
||||
print('Usage: area W S E N [--zoom Z] [--maxsize PIX]')
|
||||
return
|
||||
vals = parse_floats(parts[1:5])
|
||||
if not vals:
|
||||
print('Invalid numeric bbox')
|
||||
return
|
||||
w, s, e, n = vals
|
||||
zoom = None
|
||||
maxsize = None
|
||||
# parse optional flags
|
||||
i = 5
|
||||
while i < len(parts):
|
||||
p = parts[i]
|
||||
if p == '--zoom' and i + 1 < len(parts):
|
||||
try:
|
||||
zoom = int(parts[i+1])
|
||||
except Exception:
|
||||
pass
|
||||
i += 2
|
||||
elif p == '--maxsize' and i + 1 < len(parts):
|
||||
try:
|
||||
maxsize = int(parts[i+1])
|
||||
except Exception:
|
||||
pass
|
||||
i += 2
|
||||
else:
|
||||
i += 1
|
||||
|
||||
print(f'Fetching area bbox=({w},{s},{e},{n}) zoom={zoom} maxsize={maxsize}')
|
||||
img = self.engine.get_image_for_area((w, s, e, n), zoom=zoom, max_size=maxsize)
|
||||
if img is None:
|
||||
print('No image returned.')
|
||||
else:
|
||||
self.last_image = img
|
||||
self.visual.show_pil_image(img)
|
||||
|
||||
def cmd_save(self, parts):
|
||||
if not self.last_image:
|
||||
print('No image to save')
|
||||
return
|
||||
if len(parts) < 2:
|
||||
print('Usage: save PATH')
|
||||
return
|
||||
path = parts[1]
|
||||
try:
|
||||
self.last_image.save(path)
|
||||
print(f'Saved image to {path}')
|
||||
except Exception as e:
|
||||
print(f'Failed to save image: {e}')
|
||||
|
||||
def cmd_show(self, parts):
|
||||
if not self.last_image:
|
||||
print('No image to show')
|
||||
return
|
||||
self.visual.show_pil_image(self.last_image)
|
||||
|
||||
def cmd_toggle_online(self, parts):
|
||||
self.online = not self.online
|
||||
# Recreate engine with new flag
|
||||
self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online)
|
||||
self.visual = MapVisualizer(self.engine)
|
||||
self.visual.set_click_callback(self.on_click)
|
||||
print(f'Online fetching set to {self.online}')
|
||||
|
||||
def cmd_clear_cache(self, parts):
|
||||
# clear entire service cache directory
|
||||
try:
|
||||
self.engine.tile_manager.clear_entire_service_cache()
|
||||
print('Service cache cleared')
|
||||
except Exception as e:
|
||||
print(f'Failed to clear cache: {e}')
|
||||
|
||||
def repl(self):
|
||||
print('Interactive map debug tool. Type "help" for commands.')
|
||||
while True:
|
||||
try:
|
||||
line = prompt()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print('\nExiting')
|
||||
return
|
||||
if not line:
|
||||
continue
|
||||
parts = shlex.split(line)
|
||||
cmd = parts[0].lower()
|
||||
if cmd in ('exit', 'quit'):
|
||||
print('Bye')
|
||||
return
|
||||
if cmd == 'help':
|
||||
print('Commands: point, area, save, show, toggle_online, clear_cache, exit')
|
||||
continue
|
||||
if cmd == 'point':
|
||||
self.cmd_point(parts)
|
||||
continue
|
||||
if cmd == 'area':
|
||||
self.cmd_area(parts)
|
||||
continue
|
||||
if cmd == 'save':
|
||||
self.cmd_save(parts)
|
||||
continue
|
||||
if cmd == 'show':
|
||||
self.cmd_show(parts)
|
||||
continue
|
||||
if cmd == 'toggle_online':
|
||||
self.cmd_toggle_online(parts)
|
||||
continue
|
||||
if cmd == 'clear_cache':
|
||||
self.cmd_clear_cache(parts)
|
||||
continue
|
||||
print('Unknown command. Type help for commands.')
|
||||
|
||||
|
||||
def main():
|
||||
app = DebugApp()
|
||||
app.repl()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
38
engine.py
Normal file
38
engine.py
Normal file
@ -0,0 +1,38 @@
|
||||
"""MapEngine: facade for map retrieval operations.
|
||||
|
||||
Minimal implementation: constructs a MapTileManager and exposes
|
||||
`get_image_for_area` and `get_image_for_point` helper methods.
|
||||
"""
|
||||
from typing import Optional, Tuple
|
||||
from map_manager.tile_manager import MapTileManager
|
||||
from map_manager.services import get_map_service_instance
|
||||
|
||||
class MapEngine:
|
||||
def __init__(self, service_name: str = 'osm', cache_dir: Optional[str] = None, enable_online: bool = True):
|
||||
service = get_map_service_instance(service_name)
|
||||
if service is None:
|
||||
raise ValueError(f"Unknown map service: {service_name}")
|
||||
self.tile_manager = MapTileManager(service, cache_root_directory=cache_dir, enable_online_tile_fetching=enable_online)
|
||||
|
||||
def get_image_for_area(self, bbox: Tuple[float, float, float, float], zoom: int) -> Optional[object]:
|
||||
"""Return a stitched PIL image covering `bbox` at `zoom` level."""
|
||||
from map_manager.utils import get_tile_ranges_for_bbox
|
||||
tile_ranges = get_tile_ranges_for_bbox(bbox, zoom)
|
||||
if not tile_ranges:
|
||||
return None
|
||||
return self.tile_manager.stitch_map_image(zoom, tile_ranges[0], tile_ranges[1])
|
||||
|
||||
def get_image_for_point(self, lat: float, lon: float, zoom: int, tiles_radius: int = 1) -> Optional[object]:
|
||||
"""Return a stitched image centered on (lat,lon) using a small tile window.
|
||||
|
||||
tiles_radius=1 returns a 3x3 tile image.
|
||||
"""
|
||||
try:
|
||||
import mercantile
|
||||
except Exception:
|
||||
return None
|
||||
center_tile = mercantile.tile(lon, lat, zoom)
|
||||
x, y = center_tile.x, center_tile.y
|
||||
range_x = (x - tiles_radius, x + tiles_radius)
|
||||
range_y = (y - tiles_radius, y + tiles_radius)
|
||||
return self.tile_manager.stitch_map_image(zoom, range_x, range_y)
|
||||
14
map_manager/__init__.py
Normal file
14
map_manager/__init__.py
Normal file
@ -0,0 +1,14 @@
|
||||
"""Top-level package exports for python-map-manager.map_manager
|
||||
|
||||
This package exposes the main public classes: MapEngine, MapVisualizer
|
||||
and the MapTileManager for direct use.
|
||||
"""
|
||||
from .engine import MapEngine
|
||||
from .visualizer import MapVisualizer
|
||||
from .tile_manager import MapTileManager
|
||||
|
||||
__all__ = [
|
||||
"MapEngine",
|
||||
"MapVisualizer",
|
||||
"MapTileManager",
|
||||
]
|
||||
103
map_manager/drawing.py
Normal file
103
map_manager/drawing.py
Normal file
@ -0,0 +1,103 @@
|
||||
"""Drawing helpers migrated from original project; adapted to new package layout.
|
||||
"""
|
||||
import logging
|
||||
from typing import Optional, Tuple, List, Dict
|
||||
|
||||
try:
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
PIL_LIB_AVAILABLE_DRAWING = True
|
||||
except ImportError:
|
||||
Image = None # type: ignore
|
||||
ImageDraw = None # type: ignore
|
||||
ImageFont = None # type: ignore
|
||||
PIL_LIB_AVAILABLE_DRAWING = False
|
||||
logging.error("MapDrawing: Pillow (PIL) library not found. Drawing operations will fail.")
|
||||
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
CV2_NUMPY_LIBS_AVAILABLE_DRAWING = True
|
||||
except ImportError:
|
||||
cv2 = None # type: ignore
|
||||
np = None # type: ignore
|
||||
CV2_NUMPY_LIBS_AVAILABLE_DRAWING = False
|
||||
logging.warning("MapDrawing: OpenCV or NumPy not found. Some drawing operations (markers) will be disabled.")
|
||||
|
||||
try:
|
||||
import mercantile
|
||||
MERCANTILE_LIB_AVAILABLE_DRAWING = True
|
||||
except ImportError:
|
||||
mercantile = None # type: ignore
|
||||
MERCANTILE_LIB_AVAILABLE_DRAWING = False
|
||||
logging.error("MapDrawing: 'mercantile' library not found. Coordinate conversions will fail.")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Prefer local utils import name
|
||||
from .utils import get_hgt_tile_geographic_bounds
|
||||
from .utils import MapCalculationError
|
||||
|
||||
# Fallback style constants
|
||||
DEM_BOUNDARY_COLOR = "red"
|
||||
DEM_BOUNDARY_THICKNESS_PX = 3
|
||||
AREA_BOUNDARY_COLOR = "blue"
|
||||
AREA_BOUNDARY_THICKNESS_PX = 2
|
||||
TILE_TEXT_COLOR = "white"
|
||||
TILE_TEXT_BG_COLOR = "rgba(0, 0, 0, 150)"
|
||||
DEM_TILE_LABEL_BASE_FONT_SIZE = 12
|
||||
DEM_TILE_LABEL_BASE_ZOOM = 10
|
||||
_DEFAULT_FONT_FOR_LABELS = None
|
||||
|
||||
|
||||
def _geo_to_pixel_on_unscaled_map(latitude_deg: float, longitude_deg: float, current_map_geo_bounds: Optional[Tuple[float, float, float, float]], current_stitched_map_pixel_shape: Optional[Tuple[int, int]]) -> Optional[Tuple[int, int]]:
|
||||
if not MERCANTILE_LIB_AVAILABLE_DRAWING or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None:
|
||||
logger.warning("Map context incomplete or mercantile missing for geo_to_pixel_on_unscaled_map conversion.")
|
||||
return None
|
||||
unscaled_height, unscaled_width = current_stitched_map_pixel_shape
|
||||
map_west_lon, map_south_lat, map_east_lon, map_north_lat = current_map_geo_bounds
|
||||
try:
|
||||
map_ul_merc_x, map_ul_merc_y = mercantile.xy(map_west_lon, map_north_lat) # type: ignore
|
||||
map_lr_merc_x, map_lr_merc_y = mercantile.xy(map_east_lon, map_south_lat) # type: ignore
|
||||
total_map_width_merc = abs(map_lr_merc_x - map_ul_merc_x)
|
||||
total_map_height_merc = abs(map_ul_merc_y - map_lr_merc_y)
|
||||
if total_map_width_merc <= 0 or total_map_height_merc <= 0:
|
||||
logger.warning("Map Mercator extent is zero, cannot convert geo to pixel on unscaled map.")
|
||||
return None
|
||||
target_merc_x, target_merc_y = mercantile.xy(longitude_deg, latitude_deg) # type: ignore
|
||||
relative_merc_x_in_map = (target_merc_x - map_ul_merc_x) / total_map_width_merc if total_map_width_merc > 0 else 0.0
|
||||
relative_merc_y_in_map = (map_ul_merc_y - target_merc_y) / total_map_height_merc if total_map_height_merc > 0 else 0.0
|
||||
pixel_x_on_unscaled = int(round(relative_merc_x_in_map * unscaled_width))
|
||||
pixel_y_on_unscaled = int(round(relative_merc_y_in_map * unscaled_height))
|
||||
px_clamped = max(0, min(pixel_x_on_unscaled, unscaled_width - 1))
|
||||
py_clamped = max(0, min(pixel_y_on_unscaled, unscaled_height - 1))
|
||||
return (px_clamped, py_clamped)
|
||||
except Exception as e_geo_to_px_unscaled:
|
||||
logger.exception(f"Error during geo_to_pixel_on_unscaled_map conversion: {e_geo_to_px_unscaled}")
|
||||
return None
|
||||
|
||||
|
||||
def draw_point_marker(pil_image_to_draw_on, latitude_deg: float, longitude_deg: float, current_map_geo_bounds: Optional[Tuple[float, float, float, float]], current_stitched_map_pixel_shape: Optional[Tuple[int, int]]):
|
||||
if not PIL_LIB_AVAILABLE_DRAWING or pil_image_to_draw_on is None or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None:
|
||||
logger.warning("Cannot draw point marker: PIL image or map context missing.")
|
||||
return pil_image_to_draw_on
|
||||
pixel_coords_on_unscaled = _geo_to_pixel_on_unscaled_map(latitude_deg, longitude_deg, current_map_geo_bounds, current_stitched_map_pixel_shape)
|
||||
if pixel_coords_on_unscaled:
|
||||
px_clamped, py_clamped = pixel_coords_on_unscaled
|
||||
logger.debug(f"Drawing point marker at unscaled pixel ({px_clamped},{py_clamped}) for geo ({latitude_deg:.5f},{longitude_deg:.5f})")
|
||||
if CV2_NUMPY_LIBS_AVAILABLE_DRAWING and cv2 and np:
|
||||
try:
|
||||
if pil_image_to_draw_on.mode != 'RGB':
|
||||
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on.convert('RGB')), cv2.COLOR_RGB2BGR) # type: ignore
|
||||
else:
|
||||
map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on), cv2.COLOR_RGB2BGR) # type: ignore
|
||||
cv2.drawMarker(map_cv_bgr, (px_clamped, py_clamped), (0, 0, 255), cv2.MARKER_CROSS, markerSize=15, thickness=2) # type: ignore
|
||||
return Image.fromarray(cv2.cvtColor(map_cv_bgr, cv2.COLOR_BGR2RGB)) # type: ignore
|
||||
except Exception as e_draw_click_cv:
|
||||
logger.exception(f"Error drawing point marker with OpenCV: {e_draw_click_cv}")
|
||||
return pil_image_to_draw_on
|
||||
else:
|
||||
logger.warning("CV2 or NumPy not available, cannot draw point marker using OpenCV.")
|
||||
return pil_image_to_draw_on
|
||||
else:
|
||||
logger.warning(f"Geo-to-pixel conversion failed for ({latitude_deg:.5f},{longitude_deg:.5f}), cannot draw point marker.")
|
||||
return pil_image_to_draw_on
|
||||
92
map_manager/engine.py
Normal file
92
map_manager/engine.py
Normal file
@ -0,0 +1,92 @@
|
||||
"""MapEngine: facade for map retrieval operations (inside package).
|
||||
|
||||
Minimal implementation: constructs a MapTileManager and exposes
|
||||
`get_image_for_area` and `get_image_for_point` helper methods.
|
||||
"""
|
||||
from typing import Optional, Tuple
|
||||
from .tile_manager import MapTileManager
|
||||
from .services import get_map_service_instance
|
||||
|
||||
|
||||
class MapEngine:
|
||||
"""Facade for map retrieval operations.
|
||||
|
||||
Features:
|
||||
- can compute an appropriate zoom level for a bounding box based on a
|
||||
`max_size` in pixels and the tile size
|
||||
- exposes `get_image_for_area(bbox, zoom=None, max_size=None)` where zoom
|
||||
or max_size (or both) can be provided
|
||||
"""
|
||||
|
||||
def __init__(self, service_name: str = 'osm', cache_dir: Optional[str] = None, enable_online: bool = True):
|
||||
service = get_map_service_instance(service_name)
|
||||
if service is None:
|
||||
raise ValueError(f"Unknown map service: {service_name}")
|
||||
self.tile_manager = MapTileManager(service, cache_root_directory=cache_dir, enable_online_tile_fetching=enable_online)
|
||||
|
||||
def _choose_zoom_for_bbox(self, bbox: Tuple[float, float, float, float], max_size_pixels: int) -> Optional[int]:
|
||||
"""Choose the highest zoom level that results in a stitched image not
|
||||
exceeding `max_size_pixels` in both dimensions.
|
||||
|
||||
Strategy: iterate from service max zoom down to 0 and return the first
|
||||
zoom where the pixel width and height (num_tiles * tile_size) fit.
|
||||
"""
|
||||
try:
|
||||
from .utils import get_tile_ranges_for_bbox
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
tile_size = self.tile_manager.tile_size
|
||||
max_zoom = self.tile_manager.map_service.max_zoom
|
||||
|
||||
for z in range(max_zoom, -1, -1):
|
||||
ranges = get_tile_ranges_for_bbox(bbox, z)
|
||||
if not ranges:
|
||||
continue
|
||||
x_range, y_range = ranges
|
||||
num_w = (x_range[1] - x_range[0]) + 1
|
||||
num_h = (y_range[1] - y_range[0]) + 1
|
||||
px_w = num_w * tile_size
|
||||
px_h = num_h * tile_size
|
||||
if px_w <= max_size_pixels and px_h <= max_size_pixels:
|
||||
return z
|
||||
|
||||
# If no zoom fits the requested max size, return the lowest zoom (0)
|
||||
return 0
|
||||
|
||||
def get_image_for_area(self, bbox: Tuple[float, float, float, float], zoom: Optional[int] = None, max_size: Optional[int] = None, progress_callback=None) -> Optional[object]:
|
||||
"""Return a stitched PIL image covering `bbox`.
|
||||
|
||||
Parameters:
|
||||
- bbox: (west, south, east, north)
|
||||
- zoom: explicit zoom level (if provided, used as-is)
|
||||
- max_size: maximum pixel size (width and height) for the returned image;
|
||||
if `zoom` is None, engine will compute a suitable zoom.
|
||||
"""
|
||||
if zoom is None and max_size is not None:
|
||||
zoom = self._choose_zoom_for_bbox(bbox, max_size)
|
||||
|
||||
if zoom is None:
|
||||
# Default to a conservative zoom (middle range)
|
||||
zoom = min(12, self.tile_manager.map_service.max_zoom)
|
||||
|
||||
from .utils import get_tile_ranges_for_bbox
|
||||
tile_ranges = get_tile_ranges_for_bbox(bbox, zoom)
|
||||
if not tile_ranges:
|
||||
return None
|
||||
return self.tile_manager.stitch_map_image(zoom, tile_ranges[0], tile_ranges[1], progress_callback=progress_callback)
|
||||
|
||||
def get_image_for_point(self, lat: float, lon: float, zoom: int, tiles_radius: int = 1, progress_callback=None) -> Optional[object]:
|
||||
"""Return a stitched image centered on (lat,lon) using a small tile window.
|
||||
|
||||
`tiles_radius=1` returns a 3x3 tile image.
|
||||
"""
|
||||
try:
|
||||
import mercantile
|
||||
except Exception:
|
||||
return None
|
||||
center_tile = mercantile.tile(lon, lat, zoom)
|
||||
x, y = center_tile.x, center_tile.y
|
||||
range_x = (x - tiles_radius, x + tiles_radius)
|
||||
range_y = (y - tiles_radius, y + tiles_radius)
|
||||
return self.tile_manager.stitch_map_image(zoom, range_x, range_y, progress_callback=progress_callback)
|
||||
135
map_manager/services.py
Normal file
135
map_manager/services.py
Normal file
@ -0,0 +1,135 @@
|
||||
"""Provides map service implementations (OpenStreetMap) for python-map-manager.
|
||||
|
||||
This file is migrated from the original project and contains the
|
||||
BaseMapService and OpenStreetMapService classes plus a factory helper.
|
||||
"""
|
||||
import abc
|
||||
import logging
|
||||
from urllib.parse import urlparse
|
||||
from typing import Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseMapService(abc.ABC):
|
||||
DEFAULT_TILE_PIXEL_SIZE: int = 256
|
||||
DEFAULT_MAX_ZOOM_LEVEL: int = 19
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
service_api_key: Optional[str] = None,
|
||||
tile_pixel_dim: int = DEFAULT_TILE_PIXEL_SIZE,
|
||||
max_supported_zoom: int = DEFAULT_MAX_ZOOM_LEVEL
|
||||
) -> None:
|
||||
self._service_log_prefix = f"[{self.__class__.__name__}]"
|
||||
logger.debug(f"{self._service_log_prefix} Initializing base map service.")
|
||||
|
||||
self.api_key: Optional[str] = service_api_key
|
||||
self.tile_size: int = tile_pixel_dim
|
||||
self.max_zoom: int = max_supported_zoom
|
||||
|
||||
if not (isinstance(self.tile_size, int) and self.tile_size > 0):
|
||||
logger.warning(
|
||||
f"{self._service_log_prefix} Invalid tile_size '{self.tile_size}'. Using default: {self.DEFAULT_TILE_PIXEL_SIZE}px."
|
||||
)
|
||||
self.tile_size = self.DEFAULT_TILE_PIXEL_SIZE
|
||||
if not (isinstance(self.max_zoom, int) and 0 <= self.max_zoom <= 25):
|
||||
logger.warning(
|
||||
f"{self._service_log_prefix} Invalid max_zoom '{self.max_zoom}'. Using default: {self.DEFAULT_MAX_ZOOM_LEVEL}."
|
||||
)
|
||||
self.max_zoom = self.DEFAULT_MAX_ZOOM_LEVEL
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def name(self) -> str:
|
||||
pass
|
||||
|
||||
@property
|
||||
@abc.abstractmethod
|
||||
def attribution(self) -> str:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_tile_url(self, z: int, x: int, y: int) -> Optional[str]:
|
||||
pass
|
||||
|
||||
def is_zoom_level_valid(self, zoom_level: int) -> bool:
|
||||
is_valid = 0 <= zoom_level <= self.max_zoom
|
||||
if not is_valid:
|
||||
logger.warning(
|
||||
f"{self._service_log_prefix} Requested zoom level {zoom_level} is outside the valid range [0, {self.max_zoom}] for this service."
|
||||
)
|
||||
return is_valid
|
||||
|
||||
def _is_generated_url_structurally_valid(self, url_string: str) -> bool:
|
||||
if not url_string:
|
||||
logger.error(f"{self._service_log_prefix} Generated URL is empty.")
|
||||
return False
|
||||
try:
|
||||
parsed_url = urlparse(url_string)
|
||||
has_scheme_and_netloc = bool(parsed_url.scheme and parsed_url.netloc)
|
||||
if not has_scheme_and_netloc:
|
||||
logger.error(f"{self._service_log_prefix} Generated URL '{url_string}' appears malformed (missing scheme or netloc).")
|
||||
return has_scheme_and_netloc
|
||||
except Exception as e_url_parse:
|
||||
logger.error(
|
||||
f"{self._service_log_prefix} Error parsing generated URL '{url_string}': {e_url_parse}"
|
||||
)
|
||||
return False
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"<{self.__class__.__name__}(Name: '{self.name}', MaxZoom: {self.max_zoom}, TileSize: {self.tile_size})>"
|
||||
)
|
||||
|
||||
|
||||
class OpenStreetMapService(BaseMapService):
|
||||
SERVICE_IDENTIFIER_NAME: str = "osm"
|
||||
SERVICE_ATTRIBUTION_TEXT: str = (
|
||||
"© OpenStreetMap contributors (openstreetmap.org/copyright)"
|
||||
)
|
||||
TILE_URL_TEMPLATE: str = "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png"
|
||||
OSM_MAX_ZOOM_LEVEL: int = 19
|
||||
SUBDOMAINS: Tuple[str, ...] = ("a", "b", "c")
|
||||
_subdomain_index: int = 0
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(service_api_key=None, max_supported_zoom=self.OSM_MAX_ZOOM_LEVEL)
|
||||
logger.info(f"{self._service_log_prefix} OpenStreetMap service instance ready.")
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.SERVICE_IDENTIFIER_NAME
|
||||
|
||||
@property
|
||||
def attribution(self) -> str:
|
||||
return self.SERVICE_ATTRIBUTION_TEXT
|
||||
|
||||
def get_tile_url(self, z: int, x: int, y: int) -> Optional[str]:
|
||||
if not self.is_zoom_level_valid(z):
|
||||
return None
|
||||
subdomain = self.SUBDOMAINS[OpenStreetMapService._subdomain_index % len(self.SUBDOMAINS)]
|
||||
OpenStreetMapService._subdomain_index += 1
|
||||
try:
|
||||
tile_url = self.TILE_URL_TEMPLATE.format(s=subdomain, z=z, x=x, y=y)
|
||||
if not self._is_generated_url_structurally_valid(tile_url):
|
||||
return None
|
||||
logger.debug(f"{self._service_log_prefix} Generated URL for ({z},{x},{y}): {tile_url}")
|
||||
return tile_url
|
||||
except Exception as e_url_format:
|
||||
logger.error(
|
||||
f"{self._service_log_prefix} Error formatting tile URL for ({z},{x},{y}): {e_url_format}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def get_map_service_instance(service_name_key: str, api_key_value: Optional[str] = None) -> Optional[BaseMapService]:
|
||||
log_prefix_factory = "[MapServiceFactory]"
|
||||
normalized_service_name = service_name_key.lower().strip()
|
||||
logger.debug(f"{log_prefix_factory} Requesting map service instance for '{normalized_service_name}'.")
|
||||
|
||||
if normalized_service_name == OpenStreetMapService.SERVICE_IDENTIFIER_NAME:
|
||||
return OpenStreetMapService()
|
||||
else:
|
||||
logger.error(f"{log_prefix_factory} Unknown map service name specified: '{service_name_key}'.")
|
||||
return None
|
||||
294
map_manager/tile_manager.py
Normal file
294
map_manager/tile_manager.py
Normal file
@ -0,0 +1,294 @@
|
||||
"""Map tile manager migrated from original project and adapted.
|
||||
|
||||
Manages fetching, caching and stitching tiles using a BaseMapService.
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Optional, Dict
|
||||
import io
|
||||
import shutil
|
||||
|
||||
try:
|
||||
import requests
|
||||
REQUESTS_AVAILABLE = True
|
||||
except ImportError:
|
||||
requests = None # type: ignore
|
||||
REQUESTS_AVAILABLE = False
|
||||
logging.error("MapTileManager: 'requests' library not found. Online tile fetching will fail.")
|
||||
|
||||
try:
|
||||
from PIL import Image, ImageDraw
|
||||
ImageType = Image.Image # type: ignore
|
||||
PIL_AVAILABLE_MANAGER = True
|
||||
except ImportError:
|
||||
Image = None # type: ignore
|
||||
ImageDraw = None # type: ignore
|
||||
ImageType = None # type: ignore
|
||||
logging.error("MapTileManager: 'Pillow' library not found. Image operations will fail.")
|
||||
|
||||
from .services import BaseMapService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_MAP_TILE_CACHE_ROOT_DIR = "map_tile_cache"
|
||||
DEFAULT_ENABLE_ONLINE_FETCHING = True
|
||||
DEFAULT_NETWORK_TIMEOUT_SECONDS = 10
|
||||
DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS = 2
|
||||
DEFAULT_MAX_DOWNLOAD_RETRIES = 2
|
||||
DEFAULT_PLACEHOLDER_COLOR_RGB = (220, 220, 220)
|
||||
|
||||
|
||||
class MapTileManager:
|
||||
def __init__(self, map_service: BaseMapService, cache_root_directory: Optional[str] = None, enable_online_tile_fetching: Optional[bool] = None, tile_pixel_size: Optional[int] = None) -> None:
|
||||
logger.info("Initializing MapTileManager...")
|
||||
if not REQUESTS_AVAILABLE:
|
||||
raise ImportError("'requests' library is required by MapTileManager but not found.")
|
||||
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
|
||||
raise ImportError("'Pillow' library (or its drawing module ImageDraw) is required by MapTileManager but not found.")
|
||||
if not isinstance(map_service, BaseMapService):
|
||||
raise TypeError("map_service_instance must be an instance of BaseMapService.")
|
||||
|
||||
self.map_service: BaseMapService = map_service
|
||||
self.service_identifier_name: str = self.map_service.name
|
||||
if tile_pixel_size is not None:
|
||||
if not isinstance(tile_pixel_size, int) or tile_pixel_size <= 0:
|
||||
raise ValueError(f"Invalid provided tile_pixel_size: {tile_pixel_size}.")
|
||||
self.tile_size: int = tile_pixel_size
|
||||
else:
|
||||
self.tile_size: int = self.map_service.tile_size
|
||||
|
||||
effective_cache_root_dir = cache_root_directory if cache_root_directory is not None else DEFAULT_MAP_TILE_CACHE_ROOT_DIR
|
||||
self.service_specific_cache_dir: Path = Path(effective_cache_root_dir) / self.service_identifier_name
|
||||
self.is_online_fetching_enabled: bool = enable_online_tile_fetching if enable_online_tile_fetching is not None else DEFAULT_ENABLE_ONLINE_FETCHING
|
||||
|
||||
self.http_user_agent: str = "GeoElevationMapViewer/0.1 (Python Requests)"
|
||||
self.http_request_headers: Dict[str, str] = {"User-Agent": self.http_user_agent}
|
||||
self.http_request_timeout_seconds: int = DEFAULT_NETWORK_TIMEOUT_SECONDS
|
||||
self.download_max_retries: int = DEFAULT_MAX_DOWNLOAD_RETRIES
|
||||
self.download_retry_delay_seconds: int = DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS
|
||||
|
||||
self._ensure_service_cache_directory_exists()
|
||||
self._cache_access_lock = threading.Lock()
|
||||
|
||||
logger.info(f"MapTileManager initialized for service '{self.service_identifier_name}'. Online: {self.is_online_fetching_enabled}")
|
||||
|
||||
def _ensure_service_cache_directory_exists(self) -> None:
|
||||
try:
|
||||
self.service_specific_cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
logger.debug(f"Cache directory verified/created: {self.service_specific_cache_dir}")
|
||||
except OSError as e_mkdir:
|
||||
logger.error(f"Failed to create cache directory '{self.service_specific_cache_dir}': {e_mkdir}")
|
||||
|
||||
def _get_tile_cache_file_path(self, z: int, x: int, y: int) -> Path:
|
||||
return self.service_specific_cache_dir / str(z) / str(x) / f"{y}.png"
|
||||
|
||||
def _load_tile_from_cache(self, tile_cache_path: Path, tile_coordinates_log_str: str):
|
||||
logger.debug(f"Checking cache for tile {tile_coordinates_log_str} at {tile_cache_path}")
|
||||
try:
|
||||
with self._cache_access_lock:
|
||||
if tile_cache_path.is_file():
|
||||
pil_image = Image.open(tile_cache_path) # type: ignore
|
||||
pil_image.load()
|
||||
if pil_image.mode != "RGB":
|
||||
pil_image = pil_image.convert("RGB")
|
||||
return pil_image
|
||||
else:
|
||||
return None
|
||||
except Exception:
|
||||
logger.exception("Unexpected error accessing cache file")
|
||||
return None
|
||||
|
||||
def _download_and_save_tile_to_cache(self, zoom_level: int, tile_x: int, tile_y: int, tile_cache_path: Path, tile_coordinates_log_str: str):
|
||||
if not self.is_online_fetching_enabled:
|
||||
logger.debug(f"Online fetching disabled. Cannot download tile {tile_coordinates_log_str}.")
|
||||
return None
|
||||
tile_download_url = self.map_service.get_tile_url(zoom_level, tile_x, tile_y)
|
||||
if not tile_download_url:
|
||||
logger.error(f"Failed to get URL for tile {tile_coordinates_log_str} from service.")
|
||||
return None
|
||||
logger.info(f"Downloading tile {tile_coordinates_log_str} from: {tile_download_url}")
|
||||
downloaded_pil_image = None
|
||||
for attempt_num in range(self.download_max_retries + 1):
|
||||
try:
|
||||
response = requests.get(tile_download_url, headers=self.http_request_headers, timeout=self.http_request_timeout_seconds, stream=True) # type: ignore
|
||||
response.raise_for_status()
|
||||
image_binary_data = response.content
|
||||
if not image_binary_data:
|
||||
logger.warning(f"Downloaded empty content for tile {tile_coordinates_log_str}.")
|
||||
break
|
||||
try:
|
||||
pil_image = Image.open(io.BytesIO(image_binary_data)) # type: ignore
|
||||
pil_image.load()
|
||||
if pil_image.mode != "RGB":
|
||||
pil_image = pil_image.convert("RGB")
|
||||
self._save_image_to_cache_file(tile_cache_path, pil_image)
|
||||
downloaded_pil_image = pil_image
|
||||
break
|
||||
except Exception as e_img_proc:
|
||||
logger.error(f"Failed to process image data for {tile_coordinates_log_str}: {e_img_proc}")
|
||||
break
|
||||
except Exception as e_req:
|
||||
logger.warning(f"Request error for tile {tile_coordinates_log_str}: {e_req}")
|
||||
if attempt_num < self.download_max_retries:
|
||||
time.sleep(self.download_retry_delay_seconds)
|
||||
if downloaded_pil_image is None:
|
||||
logger.error(f"Failed to download tile {tile_coordinates_log_str} after all retries.")
|
||||
return downloaded_pil_image
|
||||
|
||||
def _save_image_to_cache_file(self, tile_cache_path: Path, pil_image: ImageType) -> None:
|
||||
with self._cache_access_lock:
|
||||
try:
|
||||
tile_cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
pil_image.save(tile_cache_path, format='PNG')
|
||||
logger.debug(f"Saved tile to cache: {tile_cache_path}")
|
||||
except Exception as e_save_unexpected:
|
||||
logger.exception(f"Unexpected error saving tile to cache {tile_cache_path}: {e_save_unexpected}")
|
||||
|
||||
def get_tile_image(self, zoom_level: int, tile_x: int, tile_y: int, force_online_refresh: bool = False):
|
||||
tile_coords_log_str = f"({zoom_level},{tile_x},{tile_y})"
|
||||
if not self.map_service.is_zoom_level_valid(zoom_level):
|
||||
logger.error(f"Invalid zoom level {zoom_level} for map service '{self.service_identifier_name}'. Cannot get tile.")
|
||||
return self._create_placeholder_tile_image(f"Invalid Zoom {zoom_level}")
|
||||
tile_cache_file = self._get_tile_cache_file_path(zoom_level, tile_x, tile_y)
|
||||
retrieved_image = None
|
||||
if not force_online_refresh:
|
||||
retrieved_image = self._load_tile_from_cache(tile_cache_file, tile_coords_log_str)
|
||||
if retrieved_image is None:
|
||||
retrieved_image = self._download_and_save_tile_to_cache(zoom_level, tile_x, tile_y, tile_cache_file, tile_coords_log_str)
|
||||
if retrieved_image is None:
|
||||
logger.warning(f"Failed to retrieve tile {tile_coords_log_str}. Using placeholder.")
|
||||
retrieved_image = self._create_placeholder_tile_image(tile_coords_log_str)
|
||||
return retrieved_image
|
||||
|
||||
def stitch_map_image(self, zoom_level: int, x_tile_range: Tuple[int, int], y_tile_range: Tuple[int, int], progress_callback=None):
|
||||
"""Stitch tiles into a single PIL image.
|
||||
|
||||
Optional `progress_callback` is invoked after each tile is processed with
|
||||
the signature: progress_callback(done:int, total:int, last_tile_duration:float, from_cache:bool, tile_coords:tuple)
|
||||
|
||||
Note: the callback may be invoked from the caller's thread (i.e. the
|
||||
thread where `stitch_map_image` is running). Consumers that update UI
|
||||
must marshal updates to their UI/main thread.
|
||||
"""
|
||||
min_tile_x, max_tile_x = x_tile_range
|
||||
min_tile_y, max_tile_y = y_tile_range
|
||||
if not (min_tile_x <= max_tile_x and min_tile_y <= max_tile_y):
|
||||
logger.error(f"Invalid tile ranges for stitching: X={x_tile_range}, Y={y_tile_range}")
|
||||
return None
|
||||
single_tile_pixel_size = self.tile_size
|
||||
if single_tile_pixel_size <= 0:
|
||||
logger.error(f"Invalid tile size ({single_tile_pixel_size}) stored in manager. Cannot stitch.")
|
||||
return None
|
||||
num_tiles_wide = (max_tile_x - min_tile_x) + 1
|
||||
num_tiles_high = (max_tile_y - min_tile_y) + 1
|
||||
total_image_width = num_tiles_wide * single_tile_pixel_size
|
||||
total_image_height = num_tiles_high * single_tile_pixel_size
|
||||
MAX_IMAGE_DIMENSION = 16384
|
||||
if total_image_width > MAX_IMAGE_DIMENSION or total_image_height > MAX_IMAGE_DIMENSION:
|
||||
logger.error("Requested stitched image size exceeds maximum allowed dimension.")
|
||||
return None
|
||||
try:
|
||||
if PIL_AVAILABLE_MANAGER:
|
||||
stitched_map_image = Image.new("RGB", (total_image_width, total_image_height)) # type: ignore
|
||||
else:
|
||||
raise ImportError("Pillow not available to create new image.")
|
||||
except Exception as e_create_blank:
|
||||
logger.exception(f"Failed to create blank image for stitching: {e_create_blank}")
|
||||
return None
|
||||
total_tiles = num_tiles_wide * num_tiles_high
|
||||
tiles_done = 0
|
||||
for row_index, current_tile_y in enumerate(range(min_tile_y, max_tile_y + 1)):
|
||||
for col_index, current_tile_x in enumerate(range(min_tile_x, max_tile_x + 1)):
|
||||
# measure per-tile retrieval time to allow ETA estimation in callers
|
||||
start_time = time.time()
|
||||
# check whether tile exists in cache before retrieval (used to report from_cache)
|
||||
tile_cache_path = self._get_tile_cache_file_path(zoom_level, current_tile_x, current_tile_y)
|
||||
from_cache_flag = tile_cache_path.is_file()
|
||||
tile_image_pil = self.get_tile_image(zoom_level, current_tile_x, current_tile_y)
|
||||
last_tile_duration = time.time() - start_time
|
||||
if tile_image_pil is None:
|
||||
logger.critical(f"Critical error: get_tile_image returned None for ({zoom_level},{current_tile_x},{current_tile_y}). Aborting stitch.")
|
||||
return None
|
||||
paste_position_x = col_index * single_tile_pixel_size
|
||||
paste_position_y = row_index * single_tile_pixel_size
|
||||
try:
|
||||
if tile_image_pil and tile_image_pil.size != (self.tile_size, self.tile_size):
|
||||
if PIL_AVAILABLE_MANAGER:
|
||||
tile_image_pil = tile_image_pil.resize((self.tile_size, self.tile_size), Image.Resampling.LANCZOS) # type: ignore
|
||||
else:
|
||||
continue
|
||||
if tile_image_pil:
|
||||
stitched_map_image.paste(tile_image_pil, (paste_position_x, paste_position_y)) # type: ignore
|
||||
except Exception as e_paste:
|
||||
logger.exception(f"Error pasting tile ({zoom_level},{current_tile_x},{current_tile_y}) at ({paste_position_x},{paste_position_y}): {e_paste}")
|
||||
# update progress
|
||||
tiles_done += 1
|
||||
try:
|
||||
if progress_callback:
|
||||
# Provide caller with basic timing and whether tile was from cache
|
||||
progress_callback(tiles_done, total_tiles, last_tile_duration, from_cache_flag, (zoom_level, current_tile_x, current_tile_y))
|
||||
except Exception:
|
||||
# progress callbacks must not interrupt stitching
|
||||
logger.exception('Progress callback raised an exception')
|
||||
logger.info(f"Map stitching complete for zoom {zoom_level}, X={x_tile_range}, Y={y_tile_range}.")
|
||||
return stitched_map_image
|
||||
|
||||
def _create_placeholder_tile_image(self, identifier: str = "N/A"):
|
||||
if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None):
|
||||
logger.warning("Cannot create placeholder tile: Pillow or ImageDraw library not available.")
|
||||
return None
|
||||
try:
|
||||
tile_pixel_size = self.tile_size
|
||||
placeholder_color = DEFAULT_PLACEHOLDER_COLOR_RGB
|
||||
placeholder_img = Image.new("RGB", (tile_pixel_size, tile_pixel_size), color=placeholder_color) # type: ignore
|
||||
draw = ImageDraw.Draw(placeholder_img) # type: ignore
|
||||
overlay_text = f"Tile Fail\n{identifier}"
|
||||
try:
|
||||
draw.text((10, 10), overlay_text, fill="black") # simple fallback
|
||||
except Exception:
|
||||
draw.text((10, 10), overlay_text, fill="black")
|
||||
return placeholder_img
|
||||
except Exception as e_placeholder:
|
||||
logger.exception(f"Error creating placeholder tile image: {e_placeholder}")
|
||||
return None
|
||||
|
||||
def _get_bounds_for_tile_range(self, zoom: int, tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]]):
|
||||
try:
|
||||
import mercantile as local_mercantile
|
||||
if local_mercantile is None:
|
||||
raise ImportError("mercantile is None after import.")
|
||||
except ImportError:
|
||||
logger.error("mercantile library not found, cannot calculate bounds for tile range.")
|
||||
return None
|
||||
try:
|
||||
min_x, max_x = tile_ranges[0]
|
||||
min_y, max_y = tile_ranges[1]
|
||||
top_left_tile_bounds = local_mercantile.bounds(min_x, min_y, zoom)
|
||||
bottom_right_tile_bounds = local_mercantile.bounds(max_x, max_y, zoom)
|
||||
overall_west_lon = top_left_tile_bounds.west
|
||||
overall_south_lat = bottom_right_tile_bounds.south
|
||||
overall_east_lon = bottom_right_tile_bounds.east
|
||||
overall_north_lat = top_left_tile_bounds.north
|
||||
return (overall_west_lon, overall_south_lat, overall_east_lon, overall_north_lat)
|
||||
except Exception as e_bounds_calc:
|
||||
logger.exception(f"Error calculating geographic bounds for tile range: {e_bounds_calc}")
|
||||
return None
|
||||
|
||||
def clear_entire_service_cache(self) -> None:
|
||||
logger.info(f"Attempting to clear entire cache for service '{self.service_identifier_name}' at {self.service_specific_cache_dir}")
|
||||
if not self.service_specific_cache_dir.exists():
|
||||
logger.warning(f"Cache directory '{self.service_specific_cache_dir}' does not exist. Nothing to clear.")
|
||||
return
|
||||
with self._cache_access_lock:
|
||||
try:
|
||||
if self.service_specific_cache_dir.is_dir():
|
||||
shutil.rmtree(self.service_specific_cache_dir)
|
||||
logger.info(f"Successfully cleared cache at {self.service_specific_cache_dir}.")
|
||||
self._ensure_service_cache_directory_exists()
|
||||
else:
|
||||
logger.warning(f"Cache path '{self.service_specific_cache_dir}' is not a directory.")
|
||||
except Exception as e_clear_unexpected:
|
||||
logger.exception(f"Unexpected error clearing cache '{self.service_specific_cache_dir}': {e_clear_unexpected}")
|
||||
171
map_manager/utils.py
Normal file
171
map_manager/utils.py
Normal file
@ -0,0 +1,171 @@
|
||||
"""Utility functions for map calculations (migrated from original project).
|
||||
|
||||
Contains bounding box, tile range and resolution helpers.
|
||||
"""
|
||||
import logging
|
||||
import math
|
||||
from typing import Tuple, Optional, List
|
||||
|
||||
try:
|
||||
import pyproj
|
||||
PYPROJ_AVAILABLE = True
|
||||
except ImportError:
|
||||
pyproj = None # type: ignore
|
||||
PYPROJ_AVAILABLE = False
|
||||
logging.warning("MapUtils: 'pyproj' library not found. Some geodetic calculations will fail.")
|
||||
|
||||
try:
|
||||
import mercantile
|
||||
MERCANTILE_AVAILABLE_UTILS = True
|
||||
except ImportError:
|
||||
mercantile = None # type: ignore
|
||||
MERCANTILE_AVAILABLE_UTILS = False
|
||||
logging.warning("MapUtils: 'mercantile' library not found. Tile calculations will fail.")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MapCalculationError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def get_bounding_box_from_center_size(center_latitude_deg: float, center_longitude_deg: float, area_size_km: float) -> Optional[Tuple[float, float, float, float]]:
|
||||
if not PYPROJ_AVAILABLE:
|
||||
logger.error("'pyproj' library is required for bounding box calculation from center/size but is not found.")
|
||||
return None
|
||||
if not (isinstance(area_size_km, (int, float)) and area_size_km > 0):
|
||||
logger.error(f"Invalid area_size_km: {area_size_km}. Must be a positive number.")
|
||||
return None
|
||||
if not (-90.0 <= center_latitude_deg <= 90.0):
|
||||
logger.error(f"Invalid center_latitude_deg: {center_latitude_deg}. Must be in [-90, 90].")
|
||||
return None
|
||||
if not (-180.0 <= center_longitude_deg <= 180.0):
|
||||
logger.error(f"Invalid center_longitude_deg: {center_longitude_deg}. Must be in [-180, 180].")
|
||||
return None
|
||||
|
||||
try:
|
||||
geodetic_calculator = pyproj.Geod(ellps="WGS84") # type: ignore
|
||||
half_side_length_meters = (area_size_km / 2.0) * 1000.0
|
||||
|
||||
_, north_boundary_lat, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=0.0, dist=half_side_length_meters)
|
||||
_, south_boundary_lat, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=180.0, dist=half_side_length_meters)
|
||||
east_boundary_lon, _, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=90.0, dist=half_side_length_meters)
|
||||
west_boundary_lon, _, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=270.0, dist=half_side_length_meters)
|
||||
|
||||
north_boundary_lat = min(90.0, max(-90.0, north_boundary_lat))
|
||||
south_boundary_lat = min(90.0, max(-90.0, south_boundary_lat))
|
||||
|
||||
logger.debug(f"Calculated BBox: W={west_boundary_lon:.6f}, S={south_boundary_lat:.6f}, E={east_boundary_lon:.6f}, N={north_boundary_lat:.6f}")
|
||||
return (west_boundary_lon, south_boundary_lat, east_boundary_lon, north_boundary_lat)
|
||||
|
||||
except Exception as e_bbox_calc:
|
||||
logger.exception(f"Error calculating bounding box from center/size: {e_bbox_calc}")
|
||||
return None
|
||||
|
||||
|
||||
def get_tile_ranges_for_bbox(bounding_box_deg: Tuple[float, float, float, float], zoom_level: int) -> Optional[Tuple[Tuple[int, int], Tuple[int, int]]]:
|
||||
if not MERCANTILE_AVAILABLE_UTILS:
|
||||
logger.error("'mercantile' library is required for tile range calculation but is not found.")
|
||||
return None
|
||||
west_lon, south_lat, east_lon, north_lat = bounding_box_deg
|
||||
try:
|
||||
tiles_in_bbox_generator = mercantile.tiles(west_lon, south_lat, east_lon, north_lat, zooms=[zoom_level])
|
||||
list_of_tiles = list(tiles_in_bbox_generator)
|
||||
if not list_of_tiles:
|
||||
logger.warning("No tiles found by mercantile.tiles for BBox at zoom {zoom_level}.")
|
||||
clamped_west_lon = max(-180.0, min(180.0, west_lon))
|
||||
clamped_east_lon = max(-180.0, min(180.0, east_lon))
|
||||
clamped_south_lat = max(-90.0, min(90.0, south_lat))
|
||||
clamped_north_lat = max(-90.0, min(90.0, north_lat))
|
||||
center_lon = (clamped_west_lon + clamped_east_lon) / 2.0
|
||||
center_lat = (clamped_south_lat + clamped_north_lat) / 2.0
|
||||
center_lat = max(-85.0, min(85.0, center_lat))
|
||||
if clamped_west_lon > clamped_east_lon:
|
||||
center_lon = (clamped_west_lon + clamped_east_lon + 360) / 2.0
|
||||
if center_lon > 180: center_lon -= 360
|
||||
|
||||
center_point_tile = mercantile.tile(center_lon, center_lat, zoom_level)
|
||||
min_tile_x = center_point_tile.x
|
||||
max_tile_x = center_point_tile.x
|
||||
min_tile_y = center_point_tile.y
|
||||
max_tile_y = center_point_tile.y
|
||||
else:
|
||||
x_coordinates = [tile.x for tile in list_of_tiles]
|
||||
y_coordinates = [tile.y for tile in list_of_tiles]
|
||||
min_tile_x = min(x_coordinates)
|
||||
max_tile_x = max(x_coordinates)
|
||||
min_tile_y = min(y_coordinates)
|
||||
max_tile_y = max(y_coordinates)
|
||||
|
||||
return ((min_tile_x, max_tile_x), (min_tile_y, max_tile_y))
|
||||
|
||||
except Exception as e_tile_range_calc:
|
||||
logger.exception(f"Error calculating tile ranges: {e_tile_range_calc}")
|
||||
return None
|
||||
|
||||
|
||||
def calculate_meters_per_pixel(latitude_degrees: float, zoom_level: int, tile_pixel_size: int = 256) -> Optional[float]:
|
||||
try:
|
||||
if not (-90.0 <= latitude_degrees <= 90.0):
|
||||
logger.warning(f"Invalid latitude for m/px calc: {latitude_degrees}")
|
||||
return None
|
||||
if not (0 <= zoom_level <= 25):
|
||||
logger.warning(f"Invalid zoom level for m/px calc: {zoom_level}")
|
||||
return None
|
||||
if not (isinstance(tile_pixel_size, int) and tile_pixel_size > 0):
|
||||
logger.warning(f"Invalid tile_pixel_size for m/px calc: {tile_pixel_size}")
|
||||
return None
|
||||
|
||||
EARTH_CIRCUMFERENCE_METERS = 40075016.686
|
||||
latitude_radians = math.radians(latitude_degrees)
|
||||
resolution_m_px = (EARTH_CIRCUMFERENCE_METERS * math.cos(latitude_radians)) / (tile_pixel_size * (2**zoom_level))
|
||||
if not math.isfinite(resolution_m_px) or resolution_m_px <= 0:
|
||||
logger.warning(f"Calculated non-finite or non-positive m/px ({resolution_m_px}) at Lat {latitude_degrees}. Returning None.")
|
||||
return None
|
||||
return resolution_m_px
|
||||
except Exception as e_mpp_calc:
|
||||
logger.exception(f"Error calculating meters per pixel: {e_mpp_calc}")
|
||||
return None
|
||||
|
||||
|
||||
def calculate_geographic_bbox_size_km(bounding_box_deg: Tuple[float, float, float, float]) -> Optional[Tuple[float, float]]:
|
||||
if not PYPROJ_AVAILABLE:
|
||||
logger.error("'pyproj' library is required for geographic size calculation but is not found.")
|
||||
return None
|
||||
west_lon, south_lat, east_lon, north_lat = bounding_box_deg
|
||||
if not (-90.0 <= south_lat <= north_lat <= 90.0):
|
||||
south_lat = max(-90.0, south_lat)
|
||||
north_lat = min(90.0, north_lat)
|
||||
if south_lat >= north_lat:
|
||||
logger.error(f"Invalid latitude range after clamping: {south_lat}, {north_lat}. Cannot calculate size.")
|
||||
return None
|
||||
try:
|
||||
geodetic_calculator = pyproj.Geod(ellps="WGS84")
|
||||
center_lat = (south_lat + north_lat) / 2.0
|
||||
center_lat = max(-89.9, min(89.9, center_lat))
|
||||
_, _, width_meters = geodetic_calculator.inv(west_lon, center_lat, east_lon, center_lat)
|
||||
center_lon_for_height = (west_lon + east_lon) / 2.0
|
||||
center_lon_for_height = max(-180.0, min(180.0, center_lon_for_height))
|
||||
_, _, height_meters = geodetic_calculator.inv(center_lon_for_height, south_lat, center_lon_for_height, north_lat)
|
||||
approx_width_km = abs(width_meters) / 1000.0
|
||||
approx_height_km = abs(height_meters) / 1000.0
|
||||
if approx_width_km <= 0 or approx_height_km <= 0:
|
||||
logger.warning(f"Calculated non-positive width or height for BBox {bounding_box_deg}. Result: ({approx_width_km:.2f}, {approx_height_km:.2f}). Returning None.")
|
||||
return None
|
||||
return (approx_width_km, approx_height_km)
|
||||
except Exception as e_size_calc:
|
||||
logger.exception(f"Error calculating geographic bounding box size: {e_size_calc}")
|
||||
return None
|
||||
|
||||
|
||||
def get_hgt_tile_geographic_bounds(lat_coord: int, lon_coord: int) -> Tuple[float, float, float, float]:
|
||||
west_lon = float(lon_coord)
|
||||
south_lat = float(lat_coord)
|
||||
east_lon = float(lon_coord + 1)
|
||||
north_lat = float(lat_coord + 1)
|
||||
west_lon = max(-180.0, min(180.0, west_lon))
|
||||
south_lat = max(-90.0, min(90.0, south_lat))
|
||||
east_lon = max(-180.0, min(180.0, east_lon))
|
||||
north_lat = max(-90.0, min(90.0, north_lat))
|
||||
logger.debug(f"Calculated HGT tile bounds for ({lat_coord},{lon_coord}): ({west_lon:.6f}, {south_lat:.6f}, {east_lon:.6f}, {north_lat:.6f})")
|
||||
return (west_lon, south_lat, east_lon, north_lat)
|
||||
47
map_manager/visualizer.py
Normal file
47
map_manager/visualizer.py
Normal file
@ -0,0 +1,47 @@
|
||||
"""Lightweight MapVisualizer inside package: minimal display helper using OpenCV if available."""
|
||||
from typing import Optional, Callable
|
||||
|
||||
try:
|
||||
import cv2
|
||||
CV2_AVAILABLE = True
|
||||
except Exception:
|
||||
cv2 = None
|
||||
CV2_AVAILABLE = False
|
||||
|
||||
|
||||
class MapVisualizer:
|
||||
def __init__(self, engine):
|
||||
self.engine = engine
|
||||
self.callback_on_click: Optional[Callable[[float, float], None]] = None
|
||||
|
||||
def set_click_callback(self, callback: Callable[[float, float], None]):
|
||||
self.callback_on_click = callback
|
||||
|
||||
def show_pil_image(self, pil_image):
|
||||
if pil_image is None:
|
||||
return
|
||||
if CV2_AVAILABLE:
|
||||
import numpy as np
|
||||
img = pil_image.convert('RGB')
|
||||
arr = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
|
||||
cv2.imshow('Map', arr)
|
||||
cv2.waitKey(0)
|
||||
cv2.destroyAllWindows()
|
||||
else:
|
||||
# Fallback: save temp file and open with default image viewer
|
||||
import tempfile, os
|
||||
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.png')
|
||||
pil_image.save(tmp.name)
|
||||
tmp.close()
|
||||
if os.name == 'nt':
|
||||
os.startfile(tmp.name) # type: ignore
|
||||
else:
|
||||
try:
|
||||
import webbrowser
|
||||
webbrowser.open('file://' + tmp.name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def show_point(self, lat: float, lon: float, zoom: int = 12, tiles_radius: int = 1):
|
||||
img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=tiles_radius)
|
||||
self.show_pil_image(img)
|
||||
5
requirements.txt
Normal file
5
requirements.txt
Normal file
@ -0,0 +1,5 @@
|
||||
requests
|
||||
Pillow
|
||||
opencv-python
|
||||
mercantile
|
||||
pyproj
|
||||
60
tests/test_engine_and_download.py
Normal file
60
tests/test_engine_and_download.py
Normal file
@ -0,0 +1,60 @@
|
||||
import unittest
|
||||
import tempfile
|
||||
import io
|
||||
import os
|
||||
from unittest.mock import patch, Mock
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from map_manager.engine import MapEngine
|
||||
from map_manager.services import OpenStreetMapService
|
||||
from map_manager.tile_manager import MapTileManager
|
||||
|
||||
|
||||
class TestEngineZoomAndDownload(unittest.TestCase):
|
||||
def test_choose_zoom_monotonic(self):
|
||||
# Small bbox near Rome
|
||||
bbox = (12.49, 41.89, 12.50, 41.90)
|
||||
engine = MapEngine(service_name='osm', cache_dir=None, enable_online=True)
|
||||
|
||||
zoom_large = engine._choose_zoom_for_bbox(bbox, max_size_pixels=2000)
|
||||
zoom_small = engine._choose_zoom_for_bbox(bbox, max_size_pixels=200)
|
||||
|
||||
# Expect that allowing larger max_size yields same-or-higher zoom
|
||||
self.assertIsNotNone(zoom_large)
|
||||
self.assertIsNotNone(zoom_small)
|
||||
self.assertGreaterEqual(zoom_large, zoom_small)
|
||||
|
||||
def test_download_and_cache_mocked(self):
|
||||
service = OpenStreetMapService()
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
mgr = MapTileManager(service, cache_root_directory=td, enable_online_tile_fetching=True, tile_pixel_size=64)
|
||||
|
||||
# Prepare a fake PNG image bytes for the mock response
|
||||
fake_img = Image.new('RGB', (64, 64), (123, 222, 111))
|
||||
bio = io.BytesIO()
|
||||
fake_img.save(bio, format='PNG')
|
||||
img_bytes = bio.getvalue()
|
||||
|
||||
# Create a mock response object
|
||||
mock_resp = Mock()
|
||||
mock_resp.content = img_bytes
|
||||
mock_resp.raise_for_status = Mock()
|
||||
|
||||
# Patch requests.get used in the tile_manager module
|
||||
with patch('map_manager.tile_manager.requests.get', return_value=mock_resp) as mock_get:
|
||||
# Request a tile that is not cached
|
||||
result = mgr.get_tile_image(5, 10, 12)
|
||||
self.assertIsNotNone(result)
|
||||
self.assertEqual(result.size, (64, 64))
|
||||
|
||||
# Check that the tile file was created on disk
|
||||
tile_path = mgr._get_tile_cache_file_path(5, 10, 12)
|
||||
self.assertTrue(tile_path.is_file())
|
||||
|
||||
# Ensure our mocked requests.get was called
|
||||
mock_get.assert_called()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
34
tests/test_engine_edgecases.py
Normal file
34
tests/test_engine_edgecases.py
Normal file
@ -0,0 +1,34 @@
|
||||
import unittest
|
||||
import tempfile
|
||||
from map_manager.engine import MapEngine
|
||||
|
||||
|
||||
class TestEngineEdgeCases(unittest.TestCase):
|
||||
def test_antimeridian_bbox_returns_image(self):
|
||||
# BBox crossing the antimeridian: e.g. west 179, east -179 (covers small area)
|
||||
bbox = (179.0, -1.0, -179.0, 1.0)
|
||||
# Use offline mode so test does not require network
|
||||
engine = MapEngine(service_name='osm', cache_dir=None, enable_online=False)
|
||||
|
||||
# Request image with explicit small max_size to force zoom selection
|
||||
img = engine.get_image_for_area(bbox, zoom=None, max_size=800)
|
||||
# Should return a PIL image (placeholder-based) even if tiles are not downloaded
|
||||
self.assertIsNotNone(img)
|
||||
# Basic sanity: image should have width and height > 0
|
||||
self.assertGreater(img.size[0], 0)
|
||||
self.assertGreater(img.size[1], 0)
|
||||
|
||||
def test_max_zoom_not_exceed_service_limit(self):
|
||||
# Large bbox (global) with very large max_size; engine should pick a zoom
|
||||
bbox = (-179.0, -85.0, 179.0, 85.0)
|
||||
engine = MapEngine(service_name='osm', cache_dir=None, enable_online=False)
|
||||
chosen = engine._choose_zoom_for_bbox(bbox, max_size_pixels=20000)
|
||||
self.assertIsNotNone(chosen)
|
||||
# chosen zoom must be within service bounds
|
||||
max_zoom = engine.tile_manager.map_service.max_zoom
|
||||
self.assertGreaterEqual(chosen, 0)
|
||||
self.assertLessEqual(chosen, max_zoom)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
42
tests/test_progress_callback.py
Normal file
42
tests/test_progress_callback.py
Normal file
@ -0,0 +1,42 @@
|
||||
import os
|
||||
import sys
|
||||
import pathlib
|
||||
from PIL import Image
|
||||
# ensure package path is importable when running tests directly
|
||||
pkg_root = pathlib.Path(__file__).resolve().parents[1]
|
||||
if str(pkg_root) not in sys.path:
|
||||
sys.path.insert(0, str(pkg_root))
|
||||
|
||||
from map_manager.services import OpenStreetMapService
|
||||
from map_manager.tile_manager import MapTileManager
|
||||
|
||||
|
||||
def test_stitch_progress_callback(tmp_path):
|
||||
service = OpenStreetMapService()
|
||||
cache_dir = tmp_path / 'cache'
|
||||
mgr = MapTileManager(service, cache_root_directory=str(cache_dir), enable_online_tile_fetching=False)
|
||||
|
||||
# monkeypatch get_tile_image to avoid network and make it quick
|
||||
def fake_get_tile_image(z, x, y, force_online_refresh=False):
|
||||
return Image.new('RGB', (mgr.tile_size, mgr.tile_size), (100, 100, 100))
|
||||
|
||||
mgr.get_tile_image = fake_get_tile_image
|
||||
|
||||
calls = []
|
||||
|
||||
def progress_cb(done, total, last_dur, from_cache, tile_coords):
|
||||
calls.append((done, total, last_dur, from_cache, tile_coords))
|
||||
|
||||
# stitch a small range (2x1 tiles => 2 tiles)
|
||||
result = mgr.stitch_map_image(1, (0, 1), (0, 0), progress_callback=progress_cb)
|
||||
assert result is not None
|
||||
# ensure callback called for each tile
|
||||
assert len(calls) == 2
|
||||
# check that totals are consistent
|
||||
for idx, call in enumerate(calls, start=1):
|
||||
done, total, last_dur, from_cache, tile_coords = call
|
||||
assert total == 2
|
||||
assert done == idx
|
||||
assert isinstance(last_dur, float)
|
||||
assert isinstance(from_cache, bool)
|
||||
assert isinstance(tile_coords, tuple) and len(tile_coords) == 3
|
||||
17
tests/test_tile_manager.py
Normal file
17
tests/test_tile_manager.py
Normal file
@ -0,0 +1,17 @@
|
||||
import unittest
|
||||
from map_manager.services import OpenStreetMapService
|
||||
from map_manager.tile_manager import MapTileManager
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class TestTileManagerPaths(unittest.TestCase):
|
||||
def test_cache_file_path(self):
|
||||
service = OpenStreetMapService()
|
||||
mgr = MapTileManager(service, cache_root_directory='test_cache_dir', enable_online_tile_fetching=False, tile_pixel_size=256)
|
||||
p = mgr._get_tile_cache_file_path(10, 20, 30)
|
||||
# Expect path like test_cache_dir/osm/10/20/30.png
|
||||
self.assertTrue(str(p).replace('\\', '/').endswith('test_cache_dir/osm/10/20/30.png') or str(p).replace('\\','/').endswith('test_cache_dir/osm/10/20/30.png'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
34
tests/test_tile_manager_cache.py
Normal file
34
tests/test_tile_manager_cache.py
Normal file
@ -0,0 +1,34 @@
|
||||
import unittest
|
||||
import tempfile
|
||||
import os
|
||||
from PIL import Image
|
||||
|
||||
from map_manager.services import OpenStreetMapService
|
||||
from map_manager.tile_manager import MapTileManager
|
||||
|
||||
|
||||
class TestTileManagerCacheBehavior(unittest.TestCase):
|
||||
def test_cache_hit_and_placeholder(self):
|
||||
service = OpenStreetMapService()
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
mgr = MapTileManager(service, cache_root_directory=td, enable_online_tile_fetching=False, tile_pixel_size=64)
|
||||
# Create a cached tile file
|
||||
p = mgr._get_tile_cache_file_path(5, 10, 12)
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
img = Image.new("RGB", (64, 64), (255, 0, 0))
|
||||
img.save(p)
|
||||
|
||||
# Now get_tile_image should load from cache
|
||||
loaded = mgr.get_tile_image(5, 10, 12)
|
||||
self.assertIsNotNone(loaded)
|
||||
self.assertEqual(loaded.size, (64, 64))
|
||||
|
||||
# For a missing tile (cache miss) and online disabled, should return a placeholder image
|
||||
missing = mgr.get_tile_image(6, 11, 13)
|
||||
self.assertIsNotNone(missing)
|
||||
# Placeholder should have expected tile size
|
||||
self.assertEqual(missing.size, (64, 64))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
18
tests/test_utils.py
Normal file
18
tests/test_utils.py
Normal file
@ -0,0 +1,18 @@
|
||||
import unittest
|
||||
from map_manager.utils import get_hgt_tile_geographic_bounds, calculate_meters_per_pixel
|
||||
|
||||
|
||||
class TestUtils(unittest.TestCase):
|
||||
def test_hgt_tile_bounds(self):
|
||||
# N45E007 -> west=7, south=45, east=8, north=46
|
||||
bbox = get_hgt_tile_geographic_bounds(45, 7)
|
||||
self.assertEqual(bbox, (7.0, 45.0, 8.0, 46.0))
|
||||
|
||||
def test_meters_per_pixel(self):
|
||||
mpp = calculate_meters_per_pixel(45.0, 10, tile_pixel_size=256)
|
||||
self.assertIsNotNone(mpp)
|
||||
self.assertGreater(mpp, 0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
46
visualizer.py
Normal file
46
visualizer.py
Normal file
@ -0,0 +1,46 @@
|
||||
"""Lightweight MapVisualizer: minimal display helper using OpenCV if available."""
|
||||
from typing import Optional, Callable
|
||||
|
||||
try:
|
||||
import cv2
|
||||
CV2_AVAILABLE = True
|
||||
except Exception:
|
||||
cv2 = None
|
||||
CV2_AVAILABLE = False
|
||||
|
||||
class MapVisualizer:
|
||||
def __init__(self, engine):
|
||||
self.engine = engine
|
||||
self.callback_on_click: Optional[Callable[[float, float], None]] = None
|
||||
|
||||
def set_click_callback(self, callback: Callable[[float, float], None]):
|
||||
self.callback_on_click = callback
|
||||
|
||||
def show_pil_image(self, pil_image):
|
||||
if pil_image is None:
|
||||
return
|
||||
if CV2_AVAILABLE:
|
||||
import numpy as np
|
||||
img = pil_image.convert('RGB')
|
||||
arr = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
|
||||
cv2.imshow('Map', arr)
|
||||
cv2.waitKey(0)
|
||||
cv2.destroyAllWindows()
|
||||
else:
|
||||
# Fallback: save temp file and open with default image viewer
|
||||
import tempfile, os
|
||||
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.png')
|
||||
pil_image.save(tmp.name)
|
||||
tmp.close()
|
||||
if os.name == 'nt':
|
||||
os.startfile(tmp.name) # type: ignore
|
||||
else:
|
||||
try:
|
||||
import webbrowser
|
||||
webbrowser.open('file://' + tmp.name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def show_point(self, lat: float, lon: float, zoom: int = 12, tiles_radius: int = 1):
|
||||
img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=tiles_radius)
|
||||
self.show_pil_image(img)
|
||||
Loading…
Reference in New Issue
Block a user