commit 663a87a24ba7375316f0cd81ca658a62c3607ce4 Author: VALLONGOL Date: Tue Dec 2 09:09:22 2025 +0100 primo commit del modulo map_manager diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..22f97c2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,151 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a CI server in a temp folder. +# Then everything is copied to shipping folder during release. +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# PEP 582; __pypackages__ +__pypackages__/ + +# PEP 621; pyproject.toml sections +.pdm.toml +.pdm.lock +# .venv + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# static analysis tool +.flake8 + +# VS Code +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +.history/ + +# sublime +*.sublime-workspace +*.sublime-project + +# Kate +.kateproject +.kateproject.lock +.katenewfile. Neuen Filenamensvorschlag merken. + +# Temporary files +*.swp +*~ + diff --git a/INTEGRATION.md b/INTEGRATION.md new file mode 100644 index 0000000..017d2b3 --- /dev/null +++ b/INTEGRATION.md @@ -0,0 +1,174 @@ +# INTEGRAZIONE: usare `python-map-manager` come git submodule in GeoElevation + +Questo documento spiega come integrare `python-map-manager` in un progetto esistente +(ad esempio il repository `geoelevation`) come sottoprogetto (git submodule) e come +usarlo a runtime. + +Nota: `python-map-manager` è pensato per essere GUI-agnostico — la logica core non +dipende da Tkinter/OpenCV; il demo `debug_tool.py` è separato e opzionale. + +1) Aggiungere come git submodule + +Dal repository principale (`geoelevation`) esegui (PowerShell): + +```powershell +# posizionati nella root del repo geoelevation +cd C:\path\to\geoelevation + +# aggiungi il submodule (sostituisci con l'URL del repo python-map-manager) +git submodule add submodules/python-map-manager +git submodule update --init --recursive +``` + +Se preferisci tenere il codice direttamente nella tree, puoi copiare la cartella, +ma usare un submodule mantiene la separazione e facilita aggiornamenti futuri. + +2) Installare le dipendenze nell'ambiente del progetto + +`python-map-manager` usa alcune librerie opzionali (Pillow, requests, mercantile, pyproj, ecc.). +Nel progetto principale, installa i requisiti del submodule nel virtualenv del progetto: + +```powershell +python -m pip install -r submodules/python-map-manager/requirements.txt +``` + +3) Rendere importabile il package nel codice di `geoelevation` + +Opzioni possibili: + +- Aggiungere il submodule al `PYTHONPATH` (semplice, non richiede packaging): + + - Esempio (PowerShell) per lanciare script dalla root del progetto: + + ```powershell + $env:PYTHONPATH = "$PWD\submodules\python-map-manager"; python -m your_app + ``` + +- O aggiungere ad `sys.path` all'avvio dell'applicazione (es. in `geoelevation/__main__.py`): + + ```python + import os, sys + pkg_root = os.path.join(os.path.dirname(__file__), '..', 'submodules', 'python-map-manager') + if pkg_root not in sys.path: + sys.path.insert(0, pkg_root) + ``` + +- (Opzionale) Se preferisci installare come pacchetto editable (richiede un `pyproject`/`setup`): + + ```powershell + pip install -e submodules/python-map-manager + ``` + +4) Esempio rapido di utilizzo in `geoelevation` + +```python +from map_manager.engine import MapEngine + +# creare engine (cache_dir può essere un percorso gestito da geoelevation) +engine = MapEngine(service_name='osm', cache_dir='path/to/cache', enable_online=True) + +# chiamata semplice (sincrona) per ottenere immagine area +img = engine.get_image_for_area((w, s, e, n), zoom=12) + +# usare progress callback (esempio): +def progress_cb(done, total, last_tile_duration, from_cache, tile_coords): + # attenzione: callback può essere invocato da thread di lavoro + # se aggiornate UI dovete marshalare sull'UI thread (es. root.after in Tkinter) + print(f"{done}/{total} tiles — last {last_tile_duration:.2f}s — from_cache={from_cache} coords={tile_coords}") + +img = engine.get_image_for_area((w, s, e, n), zoom=None, max_size=800, progress_callback=progress_cb) +``` + +5) Note su threading e UI + +- `stitch_map_image` invoca il `progress_callback` dal thread dove viene eseguito + (cioè il thread che chiama `get_image_for_area` / `stitch_map_image`). +- Nel demo `debug_tool.py` le chiamate a `get_image_for_area` sono eseguite in un + worker thread e il callback marshala gli aggiornamenti con `root.after(...)`. +- Se integrate nelle GUI di `geoelevation`, assicuratevi di marshalarle similmente. + +6) Dipendenze e packaging + +- Se il vostro progetto usa un singolo `requirements.txt` o un `pyproject.toml`, + considerate aggiungere le dipendenze del submodule (contenute in + `submodules/python-map-manager/requirements.txt`) al file di dipendenze principale. +- Raccomando di usare un virtualenv isolato per lo sviluppo e i test. + +7) Test e CI + +- I test presenti in `python-map-manager/tests` possono essere eseguiti dalla root + del submodule. Quando integrate come submodule, nel CI aggiungete uno step che: + - installa le dipendenze + - esegue i test (es. `python -m pytest python-map-manager/tests`) + +Domande frequenti (FAQ) +- Q: Il demo GUI è obbligatorio? A: No, è solo un tool per sviluppatori. +- Q: La libreria farà richieste in parallelo? A: Attualmente il comportamento di default + è sequenziale; possiamo abilitare download concorrenti come opzione sicura. + +Se vuoi, posso aggiungere un esempio `geoelevation` che mostra come inizializzare +il `MapEngine` dal codice del progetto e integrare la pulizia della cache nei +comandi di gestione di `geoelevation`. +# Integration Guide — using `python-map-manager` from `geoelevation` + +This document explains how to integrate the local `python-map-manager` package into the `geoelevation` application as a Git submodule (or as a local package) and shows example usage of the new public API (`MapEngine`, `MapVisualizer`). + +1) Add as a git submodule (optional) + +From the `geoelevation` repository root, add the external repo as a submodule: + +```powershell +git submodule add -b master external/python-map-manager +git submodule update --init --recursive +``` + +If you're working locally and prefer to keep the module inside the same repo for now, copy the `python-map-manager` directory under `external/` or keep it at repo root. + +2) Make the package importable + +Options: +- Add the submodule path to `PYTHONPATH` at runtime, or +- Install the package into your environment (editable install) during development: + +```powershell +python -m pip install -e external\python-map-manager +# or, for local-only testing +$env:PYTHONPATH+=';C:\path\to\geoelevation\external\python-map-manager' +``` + +3) Minimal usage example + +In code, import the engine and visualizer: + +```python +from map_manager.engine import MapEngine +from map_manager.visualizer import MapVisualizer + +engine = MapEngine(service_name='osm', cache_dir='map_tile_cache', enable_online=True) +visual = MapVisualizer(engine) + +# Example: get an image for a bbox (west,south,east,north) +bbox = (7.0, 45.0, 8.0, 46.0) +img = engine.get_image_for_area(bbox, max_size=1024) # choose zoom automatically +if img is not None: + visual.show_pil_image(img) + +# For callbacks: set a click handler +def on_map_click(lat, lon): + elevation = query_elevation(lat, lon) # your app logic + print('Elevation', elevation) + +visual.set_click_callback(on_map_click) +``` + +4) Integration points in `geoelevation` + +- Replace internal `map_viewer` usage with the submodule API: + - Where `map_viewer` previously called tile download/stitching, replace with `MapEngine.get_image_for_area` or `get_image_for_point`. + - Where UI components called directly into `ElevationManager`, change to use `MapVisualizer` callbacks (subscribe your callback to `visual.set_click_callback`). + +5) Notes and recommendations + +- Keep `MapEngine` independent of `ElevationManager` — pass callbacks into `MapVisualizer` from `geoelevation` so the submodule does not import application business logic. +- For headless or server usage, do not import/use `MapVisualizer`; `MapEngine` works without OpenCV. +- Add `python-map-manager/requirements.txt` to your environment or include its dependencies in the main project's dependency list. diff --git a/README.md b/README.md new file mode 100644 index 0000000..88b5a34 --- /dev/null +++ b/README.md @@ -0,0 +1,40 @@ +# python-map-manager (local copy) + +Minimal local copy of the `python-map-manager` package for development and testing. + +Usage: + +1. Install dependencies (recommended in a virtualenv): + +```powershell +python -m pip install -r requirements.txt +``` + +2. Run the debug tool: + +```powershell +python debug_tool.py +``` + +This will try to download tiles from OpenStreetMap and show a small stitched image. + +Progress callback +----------------- + +The library supports an optional progress callback that consumer applications +can provide to receive per-tile progress updates while stitching. The +callback signature is: + +``` +progress_callback(done: int, total: int, last_tile_duration: float, from_cache: bool, tile_coords: tuple) +``` + +- `done`: number of tiles processed so far +- `total`: total number of tiles to process +- `last_tile_duration`: seconds spent fetching/reading the last tile +- `from_cache`: True if the tile was already present in cache before retrieval +- `tile_coords`: a (z,x,y) tuple identifying the tile + +The callback may be invoked from the thread that called into `stitch_map_image`, +so GUI applications should marshal updates onto their UI thread (for example +using `root.after` in Tkinter). diff --git a/debug_tool.py b/debug_tool.py new file mode 100644 index 0000000..7d37a67 --- /dev/null +++ b/debug_tool.py @@ -0,0 +1,535 @@ +"""Tkinter-based interactive demo for python-map-manager. + +Provides a small GUI to exercise MapEngine and MapVisualizer with preset +examples and controls for point/area fetching, caching and saving images. +""" +import sys +import pathlib +import os +import threading +import time +from typing import Optional + +script_dir = pathlib.Path(__file__).parent.resolve() +if str(script_dir) not in sys.path: + sys.path.insert(0, str(script_dir)) + +try: + import tkinter as tk + from tkinter import ttk, filedialog, messagebox +except Exception: + tk = None + +from PIL import Image, ImageTk + +from map_manager.engine import MapEngine +from map_manager.visualizer import MapVisualizer + + +class DebugGUI: + def __init__(self, root): + self.root = root + root.title('python-map-manager Demo') + + self.service = 'osm' + self.cache_dir = 'debug_map_cache' + self.online = True + self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online) + self.visual = MapVisualizer(self.engine) + self.last_image = None + self._photo = None + + self._build_ui() + + def _build_ui(self): + frm = ttk.Frame(self.root, padding=8) + frm.grid(column=0, row=0, sticky='nsew') + self.root.columnconfigure(0, weight=1) + self.root.rowconfigure(0, weight=1) + + # Left: controls + controls = ttk.Frame(frm) + controls.grid(column=0, row=0, sticky='nw') + + # Point inputs + ttk.Label(controls, text='Point (lat lon zoom radius)').grid(column=0, row=0, sticky='w') + self.point_lat = tk.StringVar(value='41.9028') + self.point_lon = tk.StringVar(value='12.4964') + self.point_zoom = tk.StringVar(value='12') + self.point_radius = tk.StringVar(value='1') + e_lat = ttk.Entry(controls, width=10, textvariable=self.point_lat) + e_lon = ttk.Entry(controls, width=10, textvariable=self.point_lon) + e_zoom = ttk.Entry(controls, width=6, textvariable=self.point_zoom) + e_rad = ttk.Entry(controls, width=6, textvariable=self.point_radius) + e_lat.grid(column=0, row=1) + e_lon.grid(column=1, row=1) + e_zoom.grid(column=2, row=1) + e_rad.grid(column=3, row=1) + self.btn_point = ttk.Button(controls, text='Fetch Point', command=self.fetch_point) + self.btn_point.grid(column=0, row=2, columnspan=2, pady=4) + + # Area inputs + ttk.Label(controls, text='Area (W S E N)').grid(column=0, row=3, sticky='w', pady=(8,0)) + self.area_w = tk.StringVar(value='12.49') + self.area_s = tk.StringVar(value='41.89') + self.area_e = tk.StringVar(value='12.50') + self.area_n = tk.StringVar(value='41.90') + a_w = ttk.Entry(controls, width=10, textvariable=self.area_w) + a_s = ttk.Entry(controls, width=10, textvariable=self.area_s) + a_e = ttk.Entry(controls, width=10, textvariable=self.area_e) + a_n = ttk.Entry(controls, width=10, textvariable=self.area_n) + a_w.grid(column=0, row=4) + a_s.grid(column=1, row=4) + a_e.grid(column=2, row=4) + a_n.grid(column=3, row=4) + + ttk.Label(controls, text='Zoom / MaxSize').grid(column=0, row=5, sticky='w') + self.area_zoom = tk.StringVar(value='') + self.area_maxsize = tk.StringVar(value='800') + az = ttk.Entry(controls, width=6, textvariable=self.area_zoom) + am = ttk.Entry(controls, width=8, textvariable=self.area_maxsize) + az.grid(column=0, row=6) + am.grid(column=1, row=6) + self.btn_area = ttk.Button(controls, text='Fetch Area', command=self.fetch_area) + self.btn_area.grid(column=0, row=7, columnspan=2, pady=4) + + # Preset examples + ttk.Label(controls, text='Examples').grid(column=0, row=8, sticky='w', pady=(8,0)) + self.examples = ttk.Combobox(controls, values=['Rome point', 'Rome area', 'Antimeridian area (toy)']) + self.examples.current(0) + self.examples.grid(column=0, row=9, columnspan=2, sticky='we') + ttk.Button(controls, text='Run Example', command=self.run_example).grid(column=2, row=9) + + # Actions + ttk.Button(controls, text='Toggle Online', command=self.toggle_online).grid(column=0, row=10, pady=(12,0)) + ttk.Button(controls, text='Clear Cache', command=self.clear_cache).grid(column=1, row=10, pady=(12,0)) + ttk.Button(controls, text='Save Image', command=self.save_image).grid(column=2, row=10, pady=(12,0)) + ttk.Button(controls, text='Show Image', command=self.show_image).grid(column=3, row=10, pady=(12,0)) + + # Status/log + self.status = tk.StringVar(value='Ready') + ttk.Label(controls, textvariable=self.status).grid(column=0, row=11, columnspan=4, sticky='we', pady=(8,0)) + + # Progress bar + ETA + self.progress_var = tk.IntVar(value=0) + self.progress = ttk.Progressbar(controls, orient='horizontal', length=200, mode='determinate', variable=self.progress_var) + self.progress.grid(column=0, row=12, columnspan=3, sticky='we', pady=(6,0)) + self.eta_var = tk.StringVar(value='') + ttk.Label(controls, textvariable=self.eta_var).grid(column=3, row=12, sticky='w', pady=(6,0)) + + # Right: image preview + preview = ttk.Frame(frm) + preview.grid(column=1, row=0, sticky='nsew', padx=(12,0)) + frm.columnconfigure(1, weight=1) + frm.rowconfigure(0, weight=1) + + self.canvas = tk.Label(preview, text='No image', background='black', width=60, height=30) + self.canvas.pack(fill='both', expand=True) + + def log(self, msg: str): + print(msg) + self.status.set(msg) + + def fetch_point(self): + try: + lat = float(self.point_lat.get()) + lon = float(self.point_lon.get()) + zoom = int(self.point_zoom.get()) + radius = int(self.point_radius.get()) + except Exception: + messagebox.showerror('Input error', 'Invalid numeric input for point') + return + self.log(f'Fetching point {lat},{lon} @ z={zoom} r={radius}...') + # run in background thread and show ETA based on per-tile durations + self._set_busy(True) + def worker(): + durations = [] + cached = 0 + downloaded = 0 + def progress_cb(done, total, last_dur, from_cache, tile_coords): + nonlocal cached, downloaded + durations.append(last_dur) + if from_cache: + cached += 1 + else: + downloaded += 1 + avg = sum(durations) / len(durations) if durations else 0.0 + remaining = max(0, total - done) + eta = remaining * avg + # schedule UI update on main thread + try: + self.root.after(0, lambda: self._update_progress(done, total, eta, cached, downloaded)) + except Exception: + pass + try: + img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=radius, progress_callback=progress_cb) + if img is None: + self.root.after(0, lambda: self.log('No image returned (check dependencies and online state)')) + else: + self.last_image = img + self.root.after(0, lambda: self._update_preview(img)) + self.root.after(0, lambda: self.log('Point image loaded')) + finally: + self.root.after(0, lambda: self._set_busy(False)) + threading.Thread(target=worker, daemon=True).start() + + def fetch_area(self): + try: + w = float(self.area_w.get()) + s = float(self.area_s.get()) + e = float(self.area_e.get()) + n = float(self.area_n.get()) + except Exception: + messagebox.showerror('Input error', 'Invalid numeric input for area') + return + zoom = None + if self.area_zoom.get().strip(): + try: + zoom = int(self.area_zoom.get()) + except Exception: + messagebox.showerror('Input error', 'Invalid zoom') + return + maxsize = None + if self.area_maxsize.get().strip(): + try: + maxsize = int(self.area_maxsize.get()) + except Exception: + messagebox.showerror('Input error', 'Invalid maxsize') + return + self.log(f'Fetching area bbox=({w},{s},{e},{n}) zoom={zoom} maxsize={maxsize}...') + self._set_busy(True) + def worker(): + durations = [] + cached = 0 + downloaded = 0 + def progress_cb(done, total, last_dur, from_cache, tile_coords): + nonlocal cached, downloaded + durations.append(last_dur) + if from_cache: + cached += 1 + else: + downloaded += 1 + avg = sum(durations) / len(durations) if durations else 0.0 + remaining = max(0, total - done) + eta = remaining * avg + try: + self.root.after(0, lambda: self._update_progress(done, total, eta, cached, downloaded)) + except Exception: + pass + try: + img = self.engine.get_image_for_area((w, s, e, n), zoom=zoom, max_size=maxsize, progress_callback=progress_cb) + if img is None: + self.root.after(0, lambda: self.log('No image returned (check dependencies and online state)')) + else: + self.last_image = img + self.root.after(0, lambda: self._update_preview(img)) + self.root.after(0, lambda: self.log('Area image loaded')) + finally: + self.root.after(0, lambda: self._set_busy(False)) + threading.Thread(target=worker, daemon=True).start() + + def run_example(self): + sel = self.examples.get() + if sel == 'Rome point': + self.point_lat.set('41.9028') + self.point_lon.set('12.4964') + self.point_zoom.set('12') + self.point_radius.set('1') + self.fetch_point() + elif sel == 'Rome area': + self.area_w.set('12.49') + self.area_s.set('41.89') + self.area_e.set('12.50') + self.area_n.set('41.90') + self.area_maxsize.set('800') + self.fetch_area() + elif sel == 'Antimeridian area (toy)': + # small bbox crossing antimeridian near 179E/-179W + self.area_w.set('179.5') + self.area_s.set('-1.0') + self.area_e.set('-179.5') + self.area_n.set('1.0') + self.area_maxsize.set('600') + self.fetch_area() + + def toggle_online(self): + self.online = not self.online + self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online) + self.visual = MapVisualizer(self.engine) + self.log(f'Online fetching set to {self.online}') + + def clear_cache(self): + try: + self.engine.tile_manager.clear_entire_service_cache() + self.log('Cache cleared') + except Exception as e: + messagebox.showerror('Error', f'Failed to clear cache: {e}') + + def save_image(self): + if not self.last_image: + messagebox.showinfo('No image', 'No image to save') + return + path = filedialog.asksaveasfilename(defaultextension='.png', filetypes=[('PNG','*.png')]) + if not path: + return + try: + self.last_image.save(path) + self.log(f'Saved image to {path}') + except Exception as e: + messagebox.showerror('Save error', f'Failed to save: {e}') + + def show_image(self): + if not self.last_image: + messagebox.showinfo('No image', 'No image to show') + return + try: + self.visual.show_pil_image(self.last_image) + except Exception as e: + messagebox.showerror('Show error', f'Failed to show image: {e}') + + def _update_preview(self, pil_image: Image.Image): + # Resize image to fit preview area if needed + try: + w = max(200, min(1024, pil_image.width)) + h = int(pil_image.height * (w / pil_image.width)) if pil_image.width else pil_image.height + thumb = pil_image.copy() + thumb.thumbnail((w, 800)) + self._photo = ImageTk.PhotoImage(thumb) + self.canvas.configure(image=self._photo, text='') + except Exception as e: + self.canvas.configure(text=f'Preview error: {e}') + + def _update_progress(self, done: int, total: int, eta_seconds: float, cached_count: int = 0, downloaded_count: int = 0): + try: + self.progress['maximum'] = total + self.progress_var.set(done) + if eta_seconds is None: + eta_text = '' + else: + eta_text = f'ETA: {int(eta_seconds)}s' + counts_text = f' D:{downloaded_count} C:{cached_count}' + self.eta_var.set(eta_text + counts_text) + except Exception: + pass + + def _set_busy(self, busy: bool): + try: + state = 'disabled' if busy else 'normal' + self.btn_point.configure(state=state) + self.btn_area.configure(state=state) + if not busy: + # clear progress when done + self.progress_var.set(0) + self.progress['maximum'] = 1 + self.eta_var.set('') + except Exception: + pass + + +def main(): + if tk is None: + print('Tkinter is not available in this Python environment.') + return + root = tk.Tk() + app = DebugGUI(root) + root.mainloop() + + +if __name__ == '__main__': + main() +"""Simple debug tool to exercise MapEngine and MapVisualizer locally.""" +"""Interactive debug tool for python-map-manager. + +Provides a small REPL to exercise MapEngine and MapVisualizer without +editing code. Commands: + - point LAT LON ZOOM [RADIUS] -> fetch and show tiles around a point + - area W S E N [--zoom Z | --maxsize PIX] -> fetch and show area + - save PATH -> save last image to PATH + - show -> open last image with visualizer + - toggle_online -> toggle online fetching on/off + - clear_cache -> clear the map tiles cache for current service + - exit -> quit + +This tool is intended for developer iteration and debugging. +""" + +import sys +import pathlib +import os +import shlex +from typing import Optional + +script_dir = pathlib.Path(__file__).parent.resolve() +if str(script_dir) not in sys.path: + sys.path.insert(0, str(script_dir)) + +from map_manager.engine import MapEngine +from map_manager.visualizer import MapVisualizer + + +def prompt(): + return input('map-debug> ') + + +def parse_floats(args): + try: + return [float(x) for x in args] + except Exception: + return None + + +class DebugApp: + def __init__(self): + self.service = 'osm' + self.cache_dir = 'debug_map_cache' + self.online = True + self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online) + self.visual = MapVisualizer(self.engine) + self.visual.set_click_callback(self.on_click) + self.last_image = None + + def on_click(self, lat, lon): + print(f'Clicked callback: {lat}, {lon}') + + def cmd_point(self, parts): + # point LAT LON ZOOM [RADIUS] + if len(parts) < 4: + print('Usage: point LAT LON ZOOM [RADIUS]') + return + vals = parse_floats(parts[1:]) + if not vals or len(vals) < 3: + print('Invalid numeric arguments') + return + lat, lon, zoom = vals[0], vals[1], int(vals[2]) + radius = int(vals[3]) if len(vals) >= 4 else 1 + print(f'Fetching point {lat},{lon} @ zoom {zoom} radius {radius}') + img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=radius) + if img is None: + print('No image returned.') + else: + self.last_image = img + self.visual.show_pil_image(img) + + def cmd_area(self, parts): + # area W S E N [--zoom Z] [--maxsize PIX] + # minimal parser + if len(parts) < 5: + print('Usage: area W S E N [--zoom Z] [--maxsize PIX]') + return + vals = parse_floats(parts[1:5]) + if not vals: + print('Invalid numeric bbox') + return + w, s, e, n = vals + zoom = None + maxsize = None + # parse optional flags + i = 5 + while i < len(parts): + p = parts[i] + if p == '--zoom' and i + 1 < len(parts): + try: + zoom = int(parts[i+1]) + except Exception: + pass + i += 2 + elif p == '--maxsize' and i + 1 < len(parts): + try: + maxsize = int(parts[i+1]) + except Exception: + pass + i += 2 + else: + i += 1 + + print(f'Fetching area bbox=({w},{s},{e},{n}) zoom={zoom} maxsize={maxsize}') + img = self.engine.get_image_for_area((w, s, e, n), zoom=zoom, max_size=maxsize) + if img is None: + print('No image returned.') + else: + self.last_image = img + self.visual.show_pil_image(img) + + def cmd_save(self, parts): + if not self.last_image: + print('No image to save') + return + if len(parts) < 2: + print('Usage: save PATH') + return + path = parts[1] + try: + self.last_image.save(path) + print(f'Saved image to {path}') + except Exception as e: + print(f'Failed to save image: {e}') + + def cmd_show(self, parts): + if not self.last_image: + print('No image to show') + return + self.visual.show_pil_image(self.last_image) + + def cmd_toggle_online(self, parts): + self.online = not self.online + # Recreate engine with new flag + self.engine = MapEngine(service_name=self.service, cache_dir=self.cache_dir, enable_online=self.online) + self.visual = MapVisualizer(self.engine) + self.visual.set_click_callback(self.on_click) + print(f'Online fetching set to {self.online}') + + def cmd_clear_cache(self, parts): + # clear entire service cache directory + try: + self.engine.tile_manager.clear_entire_service_cache() + print('Service cache cleared') + except Exception as e: + print(f'Failed to clear cache: {e}') + + def repl(self): + print('Interactive map debug tool. Type "help" for commands.') + while True: + try: + line = prompt() + except (EOFError, KeyboardInterrupt): + print('\nExiting') + return + if not line: + continue + parts = shlex.split(line) + cmd = parts[0].lower() + if cmd in ('exit', 'quit'): + print('Bye') + return + if cmd == 'help': + print('Commands: point, area, save, show, toggle_online, clear_cache, exit') + continue + if cmd == 'point': + self.cmd_point(parts) + continue + if cmd == 'area': + self.cmd_area(parts) + continue + if cmd == 'save': + self.cmd_save(parts) + continue + if cmd == 'show': + self.cmd_show(parts) + continue + if cmd == 'toggle_online': + self.cmd_toggle_online(parts) + continue + if cmd == 'clear_cache': + self.cmd_clear_cache(parts) + continue + print('Unknown command. Type help for commands.') + + +def main(): + app = DebugApp() + app.repl() + + +if __name__ == '__main__': + main() + diff --git a/engine.py b/engine.py new file mode 100644 index 0000000..3fee12f --- /dev/null +++ b/engine.py @@ -0,0 +1,38 @@ +"""MapEngine: facade for map retrieval operations. + +Minimal implementation: constructs a MapTileManager and exposes +`get_image_for_area` and `get_image_for_point` helper methods. +""" +from typing import Optional, Tuple +from map_manager.tile_manager import MapTileManager +from map_manager.services import get_map_service_instance + +class MapEngine: + def __init__(self, service_name: str = 'osm', cache_dir: Optional[str] = None, enable_online: bool = True): + service = get_map_service_instance(service_name) + if service is None: + raise ValueError(f"Unknown map service: {service_name}") + self.tile_manager = MapTileManager(service, cache_root_directory=cache_dir, enable_online_tile_fetching=enable_online) + + def get_image_for_area(self, bbox: Tuple[float, float, float, float], zoom: int) -> Optional[object]: + """Return a stitched PIL image covering `bbox` at `zoom` level.""" + from map_manager.utils import get_tile_ranges_for_bbox + tile_ranges = get_tile_ranges_for_bbox(bbox, zoom) + if not tile_ranges: + return None + return self.tile_manager.stitch_map_image(zoom, tile_ranges[0], tile_ranges[1]) + + def get_image_for_point(self, lat: float, lon: float, zoom: int, tiles_radius: int = 1) -> Optional[object]: + """Return a stitched image centered on (lat,lon) using a small tile window. + + tiles_radius=1 returns a 3x3 tile image. + """ + try: + import mercantile + except Exception: + return None + center_tile = mercantile.tile(lon, lat, zoom) + x, y = center_tile.x, center_tile.y + range_x = (x - tiles_radius, x + tiles_radius) + range_y = (y - tiles_radius, y + tiles_radius) + return self.tile_manager.stitch_map_image(zoom, range_x, range_y) diff --git a/map_manager/__init__.py b/map_manager/__init__.py new file mode 100644 index 0000000..b72de5f --- /dev/null +++ b/map_manager/__init__.py @@ -0,0 +1,14 @@ +"""Top-level package exports for python-map-manager.map_manager + +This package exposes the main public classes: MapEngine, MapVisualizer +and the MapTileManager for direct use. +""" +from .engine import MapEngine +from .visualizer import MapVisualizer +from .tile_manager import MapTileManager + +__all__ = [ + "MapEngine", + "MapVisualizer", + "MapTileManager", +] diff --git a/map_manager/drawing.py b/map_manager/drawing.py new file mode 100644 index 0000000..c6123cb --- /dev/null +++ b/map_manager/drawing.py @@ -0,0 +1,103 @@ +"""Drawing helpers migrated from original project; adapted to new package layout. +""" +import logging +from typing import Optional, Tuple, List, Dict + +try: + from PIL import Image, ImageDraw, ImageFont + PIL_LIB_AVAILABLE_DRAWING = True +except ImportError: + Image = None # type: ignore + ImageDraw = None # type: ignore + ImageFont = None # type: ignore + PIL_LIB_AVAILABLE_DRAWING = False + logging.error("MapDrawing: Pillow (PIL) library not found. Drawing operations will fail.") + +try: + import cv2 + import numpy as np + CV2_NUMPY_LIBS_AVAILABLE_DRAWING = True +except ImportError: + cv2 = None # type: ignore + np = None # type: ignore + CV2_NUMPY_LIBS_AVAILABLE_DRAWING = False + logging.warning("MapDrawing: OpenCV or NumPy not found. Some drawing operations (markers) will be disabled.") + +try: + import mercantile + MERCANTILE_LIB_AVAILABLE_DRAWING = True +except ImportError: + mercantile = None # type: ignore + MERCANTILE_LIB_AVAILABLE_DRAWING = False + logging.error("MapDrawing: 'mercantile' library not found. Coordinate conversions will fail.") + +logger = logging.getLogger(__name__) + +# Prefer local utils import name +from .utils import get_hgt_tile_geographic_bounds +from .utils import MapCalculationError + +# Fallback style constants +DEM_BOUNDARY_COLOR = "red" +DEM_BOUNDARY_THICKNESS_PX = 3 +AREA_BOUNDARY_COLOR = "blue" +AREA_BOUNDARY_THICKNESS_PX = 2 +TILE_TEXT_COLOR = "white" +TILE_TEXT_BG_COLOR = "rgba(0, 0, 0, 150)" +DEM_TILE_LABEL_BASE_FONT_SIZE = 12 +DEM_TILE_LABEL_BASE_ZOOM = 10 +_DEFAULT_FONT_FOR_LABELS = None + + +def _geo_to_pixel_on_unscaled_map(latitude_deg: float, longitude_deg: float, current_map_geo_bounds: Optional[Tuple[float, float, float, float]], current_stitched_map_pixel_shape: Optional[Tuple[int, int]]) -> Optional[Tuple[int, int]]: + if not MERCANTILE_LIB_AVAILABLE_DRAWING or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None: + logger.warning("Map context incomplete or mercantile missing for geo_to_pixel_on_unscaled_map conversion.") + return None + unscaled_height, unscaled_width = current_stitched_map_pixel_shape + map_west_lon, map_south_lat, map_east_lon, map_north_lat = current_map_geo_bounds + try: + map_ul_merc_x, map_ul_merc_y = mercantile.xy(map_west_lon, map_north_lat) # type: ignore + map_lr_merc_x, map_lr_merc_y = mercantile.xy(map_east_lon, map_south_lat) # type: ignore + total_map_width_merc = abs(map_lr_merc_x - map_ul_merc_x) + total_map_height_merc = abs(map_ul_merc_y - map_lr_merc_y) + if total_map_width_merc <= 0 or total_map_height_merc <= 0: + logger.warning("Map Mercator extent is zero, cannot convert geo to pixel on unscaled map.") + return None + target_merc_x, target_merc_y = mercantile.xy(longitude_deg, latitude_deg) # type: ignore + relative_merc_x_in_map = (target_merc_x - map_ul_merc_x) / total_map_width_merc if total_map_width_merc > 0 else 0.0 + relative_merc_y_in_map = (map_ul_merc_y - target_merc_y) / total_map_height_merc if total_map_height_merc > 0 else 0.0 + pixel_x_on_unscaled = int(round(relative_merc_x_in_map * unscaled_width)) + pixel_y_on_unscaled = int(round(relative_merc_y_in_map * unscaled_height)) + px_clamped = max(0, min(pixel_x_on_unscaled, unscaled_width - 1)) + py_clamped = max(0, min(pixel_y_on_unscaled, unscaled_height - 1)) + return (px_clamped, py_clamped) + except Exception as e_geo_to_px_unscaled: + logger.exception(f"Error during geo_to_pixel_on_unscaled_map conversion: {e_geo_to_px_unscaled}") + return None + + +def draw_point_marker(pil_image_to_draw_on, latitude_deg: float, longitude_deg: float, current_map_geo_bounds: Optional[Tuple[float, float, float, float]], current_stitched_map_pixel_shape: Optional[Tuple[int, int]]): + if not PIL_LIB_AVAILABLE_DRAWING or pil_image_to_draw_on is None or current_map_geo_bounds is None or current_stitched_map_pixel_shape is None: + logger.warning("Cannot draw point marker: PIL image or map context missing.") + return pil_image_to_draw_on + pixel_coords_on_unscaled = _geo_to_pixel_on_unscaled_map(latitude_deg, longitude_deg, current_map_geo_bounds, current_stitched_map_pixel_shape) + if pixel_coords_on_unscaled: + px_clamped, py_clamped = pixel_coords_on_unscaled + logger.debug(f"Drawing point marker at unscaled pixel ({px_clamped},{py_clamped}) for geo ({latitude_deg:.5f},{longitude_deg:.5f})") + if CV2_NUMPY_LIBS_AVAILABLE_DRAWING and cv2 and np: + try: + if pil_image_to_draw_on.mode != 'RGB': + map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on.convert('RGB')), cv2.COLOR_RGB2BGR) # type: ignore + else: + map_cv_bgr = cv2.cvtColor(np.array(pil_image_to_draw_on), cv2.COLOR_RGB2BGR) # type: ignore + cv2.drawMarker(map_cv_bgr, (px_clamped, py_clamped), (0, 0, 255), cv2.MARKER_CROSS, markerSize=15, thickness=2) # type: ignore + return Image.fromarray(cv2.cvtColor(map_cv_bgr, cv2.COLOR_BGR2RGB)) # type: ignore + except Exception as e_draw_click_cv: + logger.exception(f"Error drawing point marker with OpenCV: {e_draw_click_cv}") + return pil_image_to_draw_on + else: + logger.warning("CV2 or NumPy not available, cannot draw point marker using OpenCV.") + return pil_image_to_draw_on + else: + logger.warning(f"Geo-to-pixel conversion failed for ({latitude_deg:.5f},{longitude_deg:.5f}), cannot draw point marker.") + return pil_image_to_draw_on diff --git a/map_manager/engine.py b/map_manager/engine.py new file mode 100644 index 0000000..b1d129a --- /dev/null +++ b/map_manager/engine.py @@ -0,0 +1,92 @@ +"""MapEngine: facade for map retrieval operations (inside package). + +Minimal implementation: constructs a MapTileManager and exposes +`get_image_for_area` and `get_image_for_point` helper methods. +""" +from typing import Optional, Tuple +from .tile_manager import MapTileManager +from .services import get_map_service_instance + + +class MapEngine: + """Facade for map retrieval operations. + + Features: + - can compute an appropriate zoom level for a bounding box based on a + `max_size` in pixels and the tile size + - exposes `get_image_for_area(bbox, zoom=None, max_size=None)` where zoom + or max_size (or both) can be provided + """ + + def __init__(self, service_name: str = 'osm', cache_dir: Optional[str] = None, enable_online: bool = True): + service = get_map_service_instance(service_name) + if service is None: + raise ValueError(f"Unknown map service: {service_name}") + self.tile_manager = MapTileManager(service, cache_root_directory=cache_dir, enable_online_tile_fetching=enable_online) + + def _choose_zoom_for_bbox(self, bbox: Tuple[float, float, float, float], max_size_pixels: int) -> Optional[int]: + """Choose the highest zoom level that results in a stitched image not + exceeding `max_size_pixels` in both dimensions. + + Strategy: iterate from service max zoom down to 0 and return the first + zoom where the pixel width and height (num_tiles * tile_size) fit. + """ + try: + from .utils import get_tile_ranges_for_bbox + except Exception: + return None + + tile_size = self.tile_manager.tile_size + max_zoom = self.tile_manager.map_service.max_zoom + + for z in range(max_zoom, -1, -1): + ranges = get_tile_ranges_for_bbox(bbox, z) + if not ranges: + continue + x_range, y_range = ranges + num_w = (x_range[1] - x_range[0]) + 1 + num_h = (y_range[1] - y_range[0]) + 1 + px_w = num_w * tile_size + px_h = num_h * tile_size + if px_w <= max_size_pixels and px_h <= max_size_pixels: + return z + + # If no zoom fits the requested max size, return the lowest zoom (0) + return 0 + + def get_image_for_area(self, bbox: Tuple[float, float, float, float], zoom: Optional[int] = None, max_size: Optional[int] = None, progress_callback=None) -> Optional[object]: + """Return a stitched PIL image covering `bbox`. + + Parameters: + - bbox: (west, south, east, north) + - zoom: explicit zoom level (if provided, used as-is) + - max_size: maximum pixel size (width and height) for the returned image; + if `zoom` is None, engine will compute a suitable zoom. + """ + if zoom is None and max_size is not None: + zoom = self._choose_zoom_for_bbox(bbox, max_size) + + if zoom is None: + # Default to a conservative zoom (middle range) + zoom = min(12, self.tile_manager.map_service.max_zoom) + + from .utils import get_tile_ranges_for_bbox + tile_ranges = get_tile_ranges_for_bbox(bbox, zoom) + if not tile_ranges: + return None + return self.tile_manager.stitch_map_image(zoom, tile_ranges[0], tile_ranges[1], progress_callback=progress_callback) + + def get_image_for_point(self, lat: float, lon: float, zoom: int, tiles_radius: int = 1, progress_callback=None) -> Optional[object]: + """Return a stitched image centered on (lat,lon) using a small tile window. + + `tiles_radius=1` returns a 3x3 tile image. + """ + try: + import mercantile + except Exception: + return None + center_tile = mercantile.tile(lon, lat, zoom) + x, y = center_tile.x, center_tile.y + range_x = (x - tiles_radius, x + tiles_radius) + range_y = (y - tiles_radius, y + tiles_radius) + return self.tile_manager.stitch_map_image(zoom, range_x, range_y, progress_callback=progress_callback) diff --git a/map_manager/services.py b/map_manager/services.py new file mode 100644 index 0000000..fa84ef0 --- /dev/null +++ b/map_manager/services.py @@ -0,0 +1,135 @@ +"""Provides map service implementations (OpenStreetMap) for python-map-manager. + +This file is migrated from the original project and contains the +BaseMapService and OpenStreetMapService classes plus a factory helper. +""" +import abc +import logging +from urllib.parse import urlparse +from typing import Optional, Tuple + +logger = logging.getLogger(__name__) + + +class BaseMapService(abc.ABC): + DEFAULT_TILE_PIXEL_SIZE: int = 256 + DEFAULT_MAX_ZOOM_LEVEL: int = 19 + + def __init__( + self, + service_api_key: Optional[str] = None, + tile_pixel_dim: int = DEFAULT_TILE_PIXEL_SIZE, + max_supported_zoom: int = DEFAULT_MAX_ZOOM_LEVEL + ) -> None: + self._service_log_prefix = f"[{self.__class__.__name__}]" + logger.debug(f"{self._service_log_prefix} Initializing base map service.") + + self.api_key: Optional[str] = service_api_key + self.tile_size: int = tile_pixel_dim + self.max_zoom: int = max_supported_zoom + + if not (isinstance(self.tile_size, int) and self.tile_size > 0): + logger.warning( + f"{self._service_log_prefix} Invalid tile_size '{self.tile_size}'. Using default: {self.DEFAULT_TILE_PIXEL_SIZE}px." + ) + self.tile_size = self.DEFAULT_TILE_PIXEL_SIZE + if not (isinstance(self.max_zoom, int) and 0 <= self.max_zoom <= 25): + logger.warning( + f"{self._service_log_prefix} Invalid max_zoom '{self.max_zoom}'. Using default: {self.DEFAULT_MAX_ZOOM_LEVEL}." + ) + self.max_zoom = self.DEFAULT_MAX_ZOOM_LEVEL + + @property + @abc.abstractmethod + def name(self) -> str: + pass + + @property + @abc.abstractmethod + def attribution(self) -> str: + pass + + @abc.abstractmethod + def get_tile_url(self, z: int, x: int, y: int) -> Optional[str]: + pass + + def is_zoom_level_valid(self, zoom_level: int) -> bool: + is_valid = 0 <= zoom_level <= self.max_zoom + if not is_valid: + logger.warning( + f"{self._service_log_prefix} Requested zoom level {zoom_level} is outside the valid range [0, {self.max_zoom}] for this service." + ) + return is_valid + + def _is_generated_url_structurally_valid(self, url_string: str) -> bool: + if not url_string: + logger.error(f"{self._service_log_prefix} Generated URL is empty.") + return False + try: + parsed_url = urlparse(url_string) + has_scheme_and_netloc = bool(parsed_url.scheme and parsed_url.netloc) + if not has_scheme_and_netloc: + logger.error(f"{self._service_log_prefix} Generated URL '{url_string}' appears malformed (missing scheme or netloc).") + return has_scheme_and_netloc + except Exception as e_url_parse: + logger.error( + f"{self._service_log_prefix} Error parsing generated URL '{url_string}': {e_url_parse}" + ) + return False + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__}(Name: '{self.name}', MaxZoom: {self.max_zoom}, TileSize: {self.tile_size})>" + ) + + +class OpenStreetMapService(BaseMapService): + SERVICE_IDENTIFIER_NAME: str = "osm" + SERVICE_ATTRIBUTION_TEXT: str = ( + "© OpenStreetMap contributors (openstreetmap.org/copyright)" + ) + TILE_URL_TEMPLATE: str = "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png" + OSM_MAX_ZOOM_LEVEL: int = 19 + SUBDOMAINS: Tuple[str, ...] = ("a", "b", "c") + _subdomain_index: int = 0 + + def __init__(self) -> None: + super().__init__(service_api_key=None, max_supported_zoom=self.OSM_MAX_ZOOM_LEVEL) + logger.info(f"{self._service_log_prefix} OpenStreetMap service instance ready.") + + @property + def name(self) -> str: + return self.SERVICE_IDENTIFIER_NAME + + @property + def attribution(self) -> str: + return self.SERVICE_ATTRIBUTION_TEXT + + def get_tile_url(self, z: int, x: int, y: int) -> Optional[str]: + if not self.is_zoom_level_valid(z): + return None + subdomain = self.SUBDOMAINS[OpenStreetMapService._subdomain_index % len(self.SUBDOMAINS)] + OpenStreetMapService._subdomain_index += 1 + try: + tile_url = self.TILE_URL_TEMPLATE.format(s=subdomain, z=z, x=x, y=y) + if not self._is_generated_url_structurally_valid(tile_url): + return None + logger.debug(f"{self._service_log_prefix} Generated URL for ({z},{x},{y}): {tile_url}") + return tile_url + except Exception as e_url_format: + logger.error( + f"{self._service_log_prefix} Error formatting tile URL for ({z},{x},{y}): {e_url_format}" + ) + return None + + +def get_map_service_instance(service_name_key: str, api_key_value: Optional[str] = None) -> Optional[BaseMapService]: + log_prefix_factory = "[MapServiceFactory]" + normalized_service_name = service_name_key.lower().strip() + logger.debug(f"{log_prefix_factory} Requesting map service instance for '{normalized_service_name}'.") + + if normalized_service_name == OpenStreetMapService.SERVICE_IDENTIFIER_NAME: + return OpenStreetMapService() + else: + logger.error(f"{log_prefix_factory} Unknown map service name specified: '{service_name_key}'.") + return None diff --git a/map_manager/tile_manager.py b/map_manager/tile_manager.py new file mode 100644 index 0000000..428ccd2 --- /dev/null +++ b/map_manager/tile_manager.py @@ -0,0 +1,294 @@ +"""Map tile manager migrated from original project and adapted. + +Manages fetching, caching and stitching tiles using a BaseMapService. +""" +import logging +import os +import time +import threading +from pathlib import Path +from typing import Tuple, Optional, Dict +import io +import shutil + +try: + import requests + REQUESTS_AVAILABLE = True +except ImportError: + requests = None # type: ignore + REQUESTS_AVAILABLE = False + logging.error("MapTileManager: 'requests' library not found. Online tile fetching will fail.") + +try: + from PIL import Image, ImageDraw + ImageType = Image.Image # type: ignore + PIL_AVAILABLE_MANAGER = True +except ImportError: + Image = None # type: ignore + ImageDraw = None # type: ignore + ImageType = None # type: ignore + logging.error("MapTileManager: 'Pillow' library not found. Image operations will fail.") + +from .services import BaseMapService + +logger = logging.getLogger(__name__) + +DEFAULT_MAP_TILE_CACHE_ROOT_DIR = "map_tile_cache" +DEFAULT_ENABLE_ONLINE_FETCHING = True +DEFAULT_NETWORK_TIMEOUT_SECONDS = 10 +DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS = 2 +DEFAULT_MAX_DOWNLOAD_RETRIES = 2 +DEFAULT_PLACEHOLDER_COLOR_RGB = (220, 220, 220) + + +class MapTileManager: + def __init__(self, map_service: BaseMapService, cache_root_directory: Optional[str] = None, enable_online_tile_fetching: Optional[bool] = None, tile_pixel_size: Optional[int] = None) -> None: + logger.info("Initializing MapTileManager...") + if not REQUESTS_AVAILABLE: + raise ImportError("'requests' library is required by MapTileManager but not found.") + if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None): + raise ImportError("'Pillow' library (or its drawing module ImageDraw) is required by MapTileManager but not found.") + if not isinstance(map_service, BaseMapService): + raise TypeError("map_service_instance must be an instance of BaseMapService.") + + self.map_service: BaseMapService = map_service + self.service_identifier_name: str = self.map_service.name + if tile_pixel_size is not None: + if not isinstance(tile_pixel_size, int) or tile_pixel_size <= 0: + raise ValueError(f"Invalid provided tile_pixel_size: {tile_pixel_size}.") + self.tile_size: int = tile_pixel_size + else: + self.tile_size: int = self.map_service.tile_size + + effective_cache_root_dir = cache_root_directory if cache_root_directory is not None else DEFAULT_MAP_TILE_CACHE_ROOT_DIR + self.service_specific_cache_dir: Path = Path(effective_cache_root_dir) / self.service_identifier_name + self.is_online_fetching_enabled: bool = enable_online_tile_fetching if enable_online_tile_fetching is not None else DEFAULT_ENABLE_ONLINE_FETCHING + + self.http_user_agent: str = "GeoElevationMapViewer/0.1 (Python Requests)" + self.http_request_headers: Dict[str, str] = {"User-Agent": self.http_user_agent} + self.http_request_timeout_seconds: int = DEFAULT_NETWORK_TIMEOUT_SECONDS + self.download_max_retries: int = DEFAULT_MAX_DOWNLOAD_RETRIES + self.download_retry_delay_seconds: int = DEFAULT_DOWNLOAD_RETRY_DELAY_SECONDS + + self._ensure_service_cache_directory_exists() + self._cache_access_lock = threading.Lock() + + logger.info(f"MapTileManager initialized for service '{self.service_identifier_name}'. Online: {self.is_online_fetching_enabled}") + + def _ensure_service_cache_directory_exists(self) -> None: + try: + self.service_specific_cache_dir.mkdir(parents=True, exist_ok=True) + logger.debug(f"Cache directory verified/created: {self.service_specific_cache_dir}") + except OSError as e_mkdir: + logger.error(f"Failed to create cache directory '{self.service_specific_cache_dir}': {e_mkdir}") + + def _get_tile_cache_file_path(self, z: int, x: int, y: int) -> Path: + return self.service_specific_cache_dir / str(z) / str(x) / f"{y}.png" + + def _load_tile_from_cache(self, tile_cache_path: Path, tile_coordinates_log_str: str): + logger.debug(f"Checking cache for tile {tile_coordinates_log_str} at {tile_cache_path}") + try: + with self._cache_access_lock: + if tile_cache_path.is_file(): + pil_image = Image.open(tile_cache_path) # type: ignore + pil_image.load() + if pil_image.mode != "RGB": + pil_image = pil_image.convert("RGB") + return pil_image + else: + return None + except Exception: + logger.exception("Unexpected error accessing cache file") + return None + + def _download_and_save_tile_to_cache(self, zoom_level: int, tile_x: int, tile_y: int, tile_cache_path: Path, tile_coordinates_log_str: str): + if not self.is_online_fetching_enabled: + logger.debug(f"Online fetching disabled. Cannot download tile {tile_coordinates_log_str}.") + return None + tile_download_url = self.map_service.get_tile_url(zoom_level, tile_x, tile_y) + if not tile_download_url: + logger.error(f"Failed to get URL for tile {tile_coordinates_log_str} from service.") + return None + logger.info(f"Downloading tile {tile_coordinates_log_str} from: {tile_download_url}") + downloaded_pil_image = None + for attempt_num in range(self.download_max_retries + 1): + try: + response = requests.get(tile_download_url, headers=self.http_request_headers, timeout=self.http_request_timeout_seconds, stream=True) # type: ignore + response.raise_for_status() + image_binary_data = response.content + if not image_binary_data: + logger.warning(f"Downloaded empty content for tile {tile_coordinates_log_str}.") + break + try: + pil_image = Image.open(io.BytesIO(image_binary_data)) # type: ignore + pil_image.load() + if pil_image.mode != "RGB": + pil_image = pil_image.convert("RGB") + self._save_image_to_cache_file(tile_cache_path, pil_image) + downloaded_pil_image = pil_image + break + except Exception as e_img_proc: + logger.error(f"Failed to process image data for {tile_coordinates_log_str}: {e_img_proc}") + break + except Exception as e_req: + logger.warning(f"Request error for tile {tile_coordinates_log_str}: {e_req}") + if attempt_num < self.download_max_retries: + time.sleep(self.download_retry_delay_seconds) + if downloaded_pil_image is None: + logger.error(f"Failed to download tile {tile_coordinates_log_str} after all retries.") + return downloaded_pil_image + + def _save_image_to_cache_file(self, tile_cache_path: Path, pil_image: ImageType) -> None: + with self._cache_access_lock: + try: + tile_cache_path.parent.mkdir(parents=True, exist_ok=True) + pil_image.save(tile_cache_path, format='PNG') + logger.debug(f"Saved tile to cache: {tile_cache_path}") + except Exception as e_save_unexpected: + logger.exception(f"Unexpected error saving tile to cache {tile_cache_path}: {e_save_unexpected}") + + def get_tile_image(self, zoom_level: int, tile_x: int, tile_y: int, force_online_refresh: bool = False): + tile_coords_log_str = f"({zoom_level},{tile_x},{tile_y})" + if not self.map_service.is_zoom_level_valid(zoom_level): + logger.error(f"Invalid zoom level {zoom_level} for map service '{self.service_identifier_name}'. Cannot get tile.") + return self._create_placeholder_tile_image(f"Invalid Zoom {zoom_level}") + tile_cache_file = self._get_tile_cache_file_path(zoom_level, tile_x, tile_y) + retrieved_image = None + if not force_online_refresh: + retrieved_image = self._load_tile_from_cache(tile_cache_file, tile_coords_log_str) + if retrieved_image is None: + retrieved_image = self._download_and_save_tile_to_cache(zoom_level, tile_x, tile_y, tile_cache_file, tile_coords_log_str) + if retrieved_image is None: + logger.warning(f"Failed to retrieve tile {tile_coords_log_str}. Using placeholder.") + retrieved_image = self._create_placeholder_tile_image(tile_coords_log_str) + return retrieved_image + + def stitch_map_image(self, zoom_level: int, x_tile_range: Tuple[int, int], y_tile_range: Tuple[int, int], progress_callback=None): + """Stitch tiles into a single PIL image. + + Optional `progress_callback` is invoked after each tile is processed with + the signature: progress_callback(done:int, total:int, last_tile_duration:float, from_cache:bool, tile_coords:tuple) + + Note: the callback may be invoked from the caller's thread (i.e. the + thread where `stitch_map_image` is running). Consumers that update UI + must marshal updates to their UI/main thread. + """ + min_tile_x, max_tile_x = x_tile_range + min_tile_y, max_tile_y = y_tile_range + if not (min_tile_x <= max_tile_x and min_tile_y <= max_tile_y): + logger.error(f"Invalid tile ranges for stitching: X={x_tile_range}, Y={y_tile_range}") + return None + single_tile_pixel_size = self.tile_size + if single_tile_pixel_size <= 0: + logger.error(f"Invalid tile size ({single_tile_pixel_size}) stored in manager. Cannot stitch.") + return None + num_tiles_wide = (max_tile_x - min_tile_x) + 1 + num_tiles_high = (max_tile_y - min_tile_y) + 1 + total_image_width = num_tiles_wide * single_tile_pixel_size + total_image_height = num_tiles_high * single_tile_pixel_size + MAX_IMAGE_DIMENSION = 16384 + if total_image_width > MAX_IMAGE_DIMENSION or total_image_height > MAX_IMAGE_DIMENSION: + logger.error("Requested stitched image size exceeds maximum allowed dimension.") + return None + try: + if PIL_AVAILABLE_MANAGER: + stitched_map_image = Image.new("RGB", (total_image_width, total_image_height)) # type: ignore + else: + raise ImportError("Pillow not available to create new image.") + except Exception as e_create_blank: + logger.exception(f"Failed to create blank image for stitching: {e_create_blank}") + return None + total_tiles = num_tiles_wide * num_tiles_high + tiles_done = 0 + for row_index, current_tile_y in enumerate(range(min_tile_y, max_tile_y + 1)): + for col_index, current_tile_x in enumerate(range(min_tile_x, max_tile_x + 1)): + # measure per-tile retrieval time to allow ETA estimation in callers + start_time = time.time() + # check whether tile exists in cache before retrieval (used to report from_cache) + tile_cache_path = self._get_tile_cache_file_path(zoom_level, current_tile_x, current_tile_y) + from_cache_flag = tile_cache_path.is_file() + tile_image_pil = self.get_tile_image(zoom_level, current_tile_x, current_tile_y) + last_tile_duration = time.time() - start_time + if tile_image_pil is None: + logger.critical(f"Critical error: get_tile_image returned None for ({zoom_level},{current_tile_x},{current_tile_y}). Aborting stitch.") + return None + paste_position_x = col_index * single_tile_pixel_size + paste_position_y = row_index * single_tile_pixel_size + try: + if tile_image_pil and tile_image_pil.size != (self.tile_size, self.tile_size): + if PIL_AVAILABLE_MANAGER: + tile_image_pil = tile_image_pil.resize((self.tile_size, self.tile_size), Image.Resampling.LANCZOS) # type: ignore + else: + continue + if tile_image_pil: + stitched_map_image.paste(tile_image_pil, (paste_position_x, paste_position_y)) # type: ignore + except Exception as e_paste: + logger.exception(f"Error pasting tile ({zoom_level},{current_tile_x},{current_tile_y}) at ({paste_position_x},{paste_position_y}): {e_paste}") + # update progress + tiles_done += 1 + try: + if progress_callback: + # Provide caller with basic timing and whether tile was from cache + progress_callback(tiles_done, total_tiles, last_tile_duration, from_cache_flag, (zoom_level, current_tile_x, current_tile_y)) + except Exception: + # progress callbacks must not interrupt stitching + logger.exception('Progress callback raised an exception') + logger.info(f"Map stitching complete for zoom {zoom_level}, X={x_tile_range}, Y={y_tile_range}.") + return stitched_map_image + + def _create_placeholder_tile_image(self, identifier: str = "N/A"): + if not (PIL_AVAILABLE_MANAGER and ImageDraw is not None): + logger.warning("Cannot create placeholder tile: Pillow or ImageDraw library not available.") + return None + try: + tile_pixel_size = self.tile_size + placeholder_color = DEFAULT_PLACEHOLDER_COLOR_RGB + placeholder_img = Image.new("RGB", (tile_pixel_size, tile_pixel_size), color=placeholder_color) # type: ignore + draw = ImageDraw.Draw(placeholder_img) # type: ignore + overlay_text = f"Tile Fail\n{identifier}" + try: + draw.text((10, 10), overlay_text, fill="black") # simple fallback + except Exception: + draw.text((10, 10), overlay_text, fill="black") + return placeholder_img + except Exception as e_placeholder: + logger.exception(f"Error creating placeholder tile image: {e_placeholder}") + return None + + def _get_bounds_for_tile_range(self, zoom: int, tile_ranges: Tuple[Tuple[int, int], Tuple[int, int]]): + try: + import mercantile as local_mercantile + if local_mercantile is None: + raise ImportError("mercantile is None after import.") + except ImportError: + logger.error("mercantile library not found, cannot calculate bounds for tile range.") + return None + try: + min_x, max_x = tile_ranges[0] + min_y, max_y = tile_ranges[1] + top_left_tile_bounds = local_mercantile.bounds(min_x, min_y, zoom) + bottom_right_tile_bounds = local_mercantile.bounds(max_x, max_y, zoom) + overall_west_lon = top_left_tile_bounds.west + overall_south_lat = bottom_right_tile_bounds.south + overall_east_lon = bottom_right_tile_bounds.east + overall_north_lat = top_left_tile_bounds.north + return (overall_west_lon, overall_south_lat, overall_east_lon, overall_north_lat) + except Exception as e_bounds_calc: + logger.exception(f"Error calculating geographic bounds for tile range: {e_bounds_calc}") + return None + + def clear_entire_service_cache(self) -> None: + logger.info(f"Attempting to clear entire cache for service '{self.service_identifier_name}' at {self.service_specific_cache_dir}") + if not self.service_specific_cache_dir.exists(): + logger.warning(f"Cache directory '{self.service_specific_cache_dir}' does not exist. Nothing to clear.") + return + with self._cache_access_lock: + try: + if self.service_specific_cache_dir.is_dir(): + shutil.rmtree(self.service_specific_cache_dir) + logger.info(f"Successfully cleared cache at {self.service_specific_cache_dir}.") + self._ensure_service_cache_directory_exists() + else: + logger.warning(f"Cache path '{self.service_specific_cache_dir}' is not a directory.") + except Exception as e_clear_unexpected: + logger.exception(f"Unexpected error clearing cache '{self.service_specific_cache_dir}': {e_clear_unexpected}") diff --git a/map_manager/utils.py b/map_manager/utils.py new file mode 100644 index 0000000..6fb8f79 --- /dev/null +++ b/map_manager/utils.py @@ -0,0 +1,171 @@ +"""Utility functions for map calculations (migrated from original project). + +Contains bounding box, tile range and resolution helpers. +""" +import logging +import math +from typing import Tuple, Optional, List + +try: + import pyproj + PYPROJ_AVAILABLE = True +except ImportError: + pyproj = None # type: ignore + PYPROJ_AVAILABLE = False + logging.warning("MapUtils: 'pyproj' library not found. Some geodetic calculations will fail.") + +try: + import mercantile + MERCANTILE_AVAILABLE_UTILS = True +except ImportError: + mercantile = None # type: ignore + MERCANTILE_AVAILABLE_UTILS = False + logging.warning("MapUtils: 'mercantile' library not found. Tile calculations will fail.") + +logger = logging.getLogger(__name__) + + +class MapCalculationError(Exception): + pass + + +def get_bounding_box_from_center_size(center_latitude_deg: float, center_longitude_deg: float, area_size_km: float) -> Optional[Tuple[float, float, float, float]]: + if not PYPROJ_AVAILABLE: + logger.error("'pyproj' library is required for bounding box calculation from center/size but is not found.") + return None + if not (isinstance(area_size_km, (int, float)) and area_size_km > 0): + logger.error(f"Invalid area_size_km: {area_size_km}. Must be a positive number.") + return None + if not (-90.0 <= center_latitude_deg <= 90.0): + logger.error(f"Invalid center_latitude_deg: {center_latitude_deg}. Must be in [-90, 90].") + return None + if not (-180.0 <= center_longitude_deg <= 180.0): + logger.error(f"Invalid center_longitude_deg: {center_longitude_deg}. Must be in [-180, 180].") + return None + + try: + geodetic_calculator = pyproj.Geod(ellps="WGS84") # type: ignore + half_side_length_meters = (area_size_km / 2.0) * 1000.0 + + _, north_boundary_lat, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=0.0, dist=half_side_length_meters) + _, south_boundary_lat, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=180.0, dist=half_side_length_meters) + east_boundary_lon, _, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=90.0, dist=half_side_length_meters) + west_boundary_lon, _, _ = geodetic_calculator.fwd(lons=center_longitude_deg, lats=center_latitude_deg, az=270.0, dist=half_side_length_meters) + + north_boundary_lat = min(90.0, max(-90.0, north_boundary_lat)) + south_boundary_lat = min(90.0, max(-90.0, south_boundary_lat)) + + logger.debug(f"Calculated BBox: W={west_boundary_lon:.6f}, S={south_boundary_lat:.6f}, E={east_boundary_lon:.6f}, N={north_boundary_lat:.6f}") + return (west_boundary_lon, south_boundary_lat, east_boundary_lon, north_boundary_lat) + + except Exception as e_bbox_calc: + logger.exception(f"Error calculating bounding box from center/size: {e_bbox_calc}") + return None + + +def get_tile_ranges_for_bbox(bounding_box_deg: Tuple[float, float, float, float], zoom_level: int) -> Optional[Tuple[Tuple[int, int], Tuple[int, int]]]: + if not MERCANTILE_AVAILABLE_UTILS: + logger.error("'mercantile' library is required for tile range calculation but is not found.") + return None + west_lon, south_lat, east_lon, north_lat = bounding_box_deg + try: + tiles_in_bbox_generator = mercantile.tiles(west_lon, south_lat, east_lon, north_lat, zooms=[zoom_level]) + list_of_tiles = list(tiles_in_bbox_generator) + if not list_of_tiles: + logger.warning("No tiles found by mercantile.tiles for BBox at zoom {zoom_level}.") + clamped_west_lon = max(-180.0, min(180.0, west_lon)) + clamped_east_lon = max(-180.0, min(180.0, east_lon)) + clamped_south_lat = max(-90.0, min(90.0, south_lat)) + clamped_north_lat = max(-90.0, min(90.0, north_lat)) + center_lon = (clamped_west_lon + clamped_east_lon) / 2.0 + center_lat = (clamped_south_lat + clamped_north_lat) / 2.0 + center_lat = max(-85.0, min(85.0, center_lat)) + if clamped_west_lon > clamped_east_lon: + center_lon = (clamped_west_lon + clamped_east_lon + 360) / 2.0 + if center_lon > 180: center_lon -= 360 + + center_point_tile = mercantile.tile(center_lon, center_lat, zoom_level) + min_tile_x = center_point_tile.x + max_tile_x = center_point_tile.x + min_tile_y = center_point_tile.y + max_tile_y = center_point_tile.y + else: + x_coordinates = [tile.x for tile in list_of_tiles] + y_coordinates = [tile.y for tile in list_of_tiles] + min_tile_x = min(x_coordinates) + max_tile_x = max(x_coordinates) + min_tile_y = min(y_coordinates) + max_tile_y = max(y_coordinates) + + return ((min_tile_x, max_tile_x), (min_tile_y, max_tile_y)) + + except Exception as e_tile_range_calc: + logger.exception(f"Error calculating tile ranges: {e_tile_range_calc}") + return None + + +def calculate_meters_per_pixel(latitude_degrees: float, zoom_level: int, tile_pixel_size: int = 256) -> Optional[float]: + try: + if not (-90.0 <= latitude_degrees <= 90.0): + logger.warning(f"Invalid latitude for m/px calc: {latitude_degrees}") + return None + if not (0 <= zoom_level <= 25): + logger.warning(f"Invalid zoom level for m/px calc: {zoom_level}") + return None + if not (isinstance(tile_pixel_size, int) and tile_pixel_size > 0): + logger.warning(f"Invalid tile_pixel_size for m/px calc: {tile_pixel_size}") + return None + + EARTH_CIRCUMFERENCE_METERS = 40075016.686 + latitude_radians = math.radians(latitude_degrees) + resolution_m_px = (EARTH_CIRCUMFERENCE_METERS * math.cos(latitude_radians)) / (tile_pixel_size * (2**zoom_level)) + if not math.isfinite(resolution_m_px) or resolution_m_px <= 0: + logger.warning(f"Calculated non-finite or non-positive m/px ({resolution_m_px}) at Lat {latitude_degrees}. Returning None.") + return None + return resolution_m_px + except Exception as e_mpp_calc: + logger.exception(f"Error calculating meters per pixel: {e_mpp_calc}") + return None + + +def calculate_geographic_bbox_size_km(bounding_box_deg: Tuple[float, float, float, float]) -> Optional[Tuple[float, float]]: + if not PYPROJ_AVAILABLE: + logger.error("'pyproj' library is required for geographic size calculation but is not found.") + return None + west_lon, south_lat, east_lon, north_lat = bounding_box_deg + if not (-90.0 <= south_lat <= north_lat <= 90.0): + south_lat = max(-90.0, south_lat) + north_lat = min(90.0, north_lat) + if south_lat >= north_lat: + logger.error(f"Invalid latitude range after clamping: {south_lat}, {north_lat}. Cannot calculate size.") + return None + try: + geodetic_calculator = pyproj.Geod(ellps="WGS84") + center_lat = (south_lat + north_lat) / 2.0 + center_lat = max(-89.9, min(89.9, center_lat)) + _, _, width_meters = geodetic_calculator.inv(west_lon, center_lat, east_lon, center_lat) + center_lon_for_height = (west_lon + east_lon) / 2.0 + center_lon_for_height = max(-180.0, min(180.0, center_lon_for_height)) + _, _, height_meters = geodetic_calculator.inv(center_lon_for_height, south_lat, center_lon_for_height, north_lat) + approx_width_km = abs(width_meters) / 1000.0 + approx_height_km = abs(height_meters) / 1000.0 + if approx_width_km <= 0 or approx_height_km <= 0: + logger.warning(f"Calculated non-positive width or height for BBox {bounding_box_deg}. Result: ({approx_width_km:.2f}, {approx_height_km:.2f}). Returning None.") + return None + return (approx_width_km, approx_height_km) + except Exception as e_size_calc: + logger.exception(f"Error calculating geographic bounding box size: {e_size_calc}") + return None + + +def get_hgt_tile_geographic_bounds(lat_coord: int, lon_coord: int) -> Tuple[float, float, float, float]: + west_lon = float(lon_coord) + south_lat = float(lat_coord) + east_lon = float(lon_coord + 1) + north_lat = float(lat_coord + 1) + west_lon = max(-180.0, min(180.0, west_lon)) + south_lat = max(-90.0, min(90.0, south_lat)) + east_lon = max(-180.0, min(180.0, east_lon)) + north_lat = max(-90.0, min(90.0, north_lat)) + logger.debug(f"Calculated HGT tile bounds for ({lat_coord},{lon_coord}): ({west_lon:.6f}, {south_lat:.6f}, {east_lon:.6f}, {north_lat:.6f})") + return (west_lon, south_lat, east_lon, north_lat) diff --git a/map_manager/visualizer.py b/map_manager/visualizer.py new file mode 100644 index 0000000..55f4a1b --- /dev/null +++ b/map_manager/visualizer.py @@ -0,0 +1,47 @@ +"""Lightweight MapVisualizer inside package: minimal display helper using OpenCV if available.""" +from typing import Optional, Callable + +try: + import cv2 + CV2_AVAILABLE = True +except Exception: + cv2 = None + CV2_AVAILABLE = False + + +class MapVisualizer: + def __init__(self, engine): + self.engine = engine + self.callback_on_click: Optional[Callable[[float, float], None]] = None + + def set_click_callback(self, callback: Callable[[float, float], None]): + self.callback_on_click = callback + + def show_pil_image(self, pil_image): + if pil_image is None: + return + if CV2_AVAILABLE: + import numpy as np + img = pil_image.convert('RGB') + arr = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) + cv2.imshow('Map', arr) + cv2.waitKey(0) + cv2.destroyAllWindows() + else: + # Fallback: save temp file and open with default image viewer + import tempfile, os + tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.png') + pil_image.save(tmp.name) + tmp.close() + if os.name == 'nt': + os.startfile(tmp.name) # type: ignore + else: + try: + import webbrowser + webbrowser.open('file://' + tmp.name) + except Exception: + pass + + def show_point(self, lat: float, lon: float, zoom: int = 12, tiles_radius: int = 1): + img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=tiles_radius) + self.show_pil_image(img) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e35c654 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +requests +Pillow +opencv-python +mercantile +pyproj diff --git a/tests/test_engine_and_download.py b/tests/test_engine_and_download.py new file mode 100644 index 0000000..3e2bf27 --- /dev/null +++ b/tests/test_engine_and_download.py @@ -0,0 +1,60 @@ +import unittest +import tempfile +import io +import os +from unittest.mock import patch, Mock + +from PIL import Image + +from map_manager.engine import MapEngine +from map_manager.services import OpenStreetMapService +from map_manager.tile_manager import MapTileManager + + +class TestEngineZoomAndDownload(unittest.TestCase): + def test_choose_zoom_monotonic(self): + # Small bbox near Rome + bbox = (12.49, 41.89, 12.50, 41.90) + engine = MapEngine(service_name='osm', cache_dir=None, enable_online=True) + + zoom_large = engine._choose_zoom_for_bbox(bbox, max_size_pixels=2000) + zoom_small = engine._choose_zoom_for_bbox(bbox, max_size_pixels=200) + + # Expect that allowing larger max_size yields same-or-higher zoom + self.assertIsNotNone(zoom_large) + self.assertIsNotNone(zoom_small) + self.assertGreaterEqual(zoom_large, zoom_small) + + def test_download_and_cache_mocked(self): + service = OpenStreetMapService() + with tempfile.TemporaryDirectory() as td: + mgr = MapTileManager(service, cache_root_directory=td, enable_online_tile_fetching=True, tile_pixel_size=64) + + # Prepare a fake PNG image bytes for the mock response + fake_img = Image.new('RGB', (64, 64), (123, 222, 111)) + bio = io.BytesIO() + fake_img.save(bio, format='PNG') + img_bytes = bio.getvalue() + + # Create a mock response object + mock_resp = Mock() + mock_resp.content = img_bytes + mock_resp.raise_for_status = Mock() + + # Patch requests.get used in the tile_manager module + with patch('map_manager.tile_manager.requests.get', return_value=mock_resp) as mock_get: + # Request a tile that is not cached + result = mgr.get_tile_image(5, 10, 12) + self.assertIsNotNone(result) + self.assertEqual(result.size, (64, 64)) + + # Check that the tile file was created on disk + tile_path = mgr._get_tile_cache_file_path(5, 10, 12) + self.assertTrue(tile_path.is_file()) + + # Ensure our mocked requests.get was called + mock_get.assert_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_engine_edgecases.py b/tests/test_engine_edgecases.py new file mode 100644 index 0000000..ad3368d --- /dev/null +++ b/tests/test_engine_edgecases.py @@ -0,0 +1,34 @@ +import unittest +import tempfile +from map_manager.engine import MapEngine + + +class TestEngineEdgeCases(unittest.TestCase): + def test_antimeridian_bbox_returns_image(self): + # BBox crossing the antimeridian: e.g. west 179, east -179 (covers small area) + bbox = (179.0, -1.0, -179.0, 1.0) + # Use offline mode so test does not require network + engine = MapEngine(service_name='osm', cache_dir=None, enable_online=False) + + # Request image with explicit small max_size to force zoom selection + img = engine.get_image_for_area(bbox, zoom=None, max_size=800) + # Should return a PIL image (placeholder-based) even if tiles are not downloaded + self.assertIsNotNone(img) + # Basic sanity: image should have width and height > 0 + self.assertGreater(img.size[0], 0) + self.assertGreater(img.size[1], 0) + + def test_max_zoom_not_exceed_service_limit(self): + # Large bbox (global) with very large max_size; engine should pick a zoom + bbox = (-179.0, -85.0, 179.0, 85.0) + engine = MapEngine(service_name='osm', cache_dir=None, enable_online=False) + chosen = engine._choose_zoom_for_bbox(bbox, max_size_pixels=20000) + self.assertIsNotNone(chosen) + # chosen zoom must be within service bounds + max_zoom = engine.tile_manager.map_service.max_zoom + self.assertGreaterEqual(chosen, 0) + self.assertLessEqual(chosen, max_zoom) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_progress_callback.py b/tests/test_progress_callback.py new file mode 100644 index 0000000..06be554 --- /dev/null +++ b/tests/test_progress_callback.py @@ -0,0 +1,42 @@ +import os +import sys +import pathlib +from PIL import Image +# ensure package path is importable when running tests directly +pkg_root = pathlib.Path(__file__).resolve().parents[1] +if str(pkg_root) not in sys.path: + sys.path.insert(0, str(pkg_root)) + +from map_manager.services import OpenStreetMapService +from map_manager.tile_manager import MapTileManager + + +def test_stitch_progress_callback(tmp_path): + service = OpenStreetMapService() + cache_dir = tmp_path / 'cache' + mgr = MapTileManager(service, cache_root_directory=str(cache_dir), enable_online_tile_fetching=False) + + # monkeypatch get_tile_image to avoid network and make it quick + def fake_get_tile_image(z, x, y, force_online_refresh=False): + return Image.new('RGB', (mgr.tile_size, mgr.tile_size), (100, 100, 100)) + + mgr.get_tile_image = fake_get_tile_image + + calls = [] + + def progress_cb(done, total, last_dur, from_cache, tile_coords): + calls.append((done, total, last_dur, from_cache, tile_coords)) + + # stitch a small range (2x1 tiles => 2 tiles) + result = mgr.stitch_map_image(1, (0, 1), (0, 0), progress_callback=progress_cb) + assert result is not None + # ensure callback called for each tile + assert len(calls) == 2 + # check that totals are consistent + for idx, call in enumerate(calls, start=1): + done, total, last_dur, from_cache, tile_coords = call + assert total == 2 + assert done == idx + assert isinstance(last_dur, float) + assert isinstance(from_cache, bool) + assert isinstance(tile_coords, tuple) and len(tile_coords) == 3 diff --git a/tests/test_tile_manager.py b/tests/test_tile_manager.py new file mode 100644 index 0000000..1708747 --- /dev/null +++ b/tests/test_tile_manager.py @@ -0,0 +1,17 @@ +import unittest +from map_manager.services import OpenStreetMapService +from map_manager.tile_manager import MapTileManager +from pathlib import Path + + +class TestTileManagerPaths(unittest.TestCase): + def test_cache_file_path(self): + service = OpenStreetMapService() + mgr = MapTileManager(service, cache_root_directory='test_cache_dir', enable_online_tile_fetching=False, tile_pixel_size=256) + p = mgr._get_tile_cache_file_path(10, 20, 30) + # Expect path like test_cache_dir/osm/10/20/30.png + self.assertTrue(str(p).replace('\\', '/').endswith('test_cache_dir/osm/10/20/30.png') or str(p).replace('\\','/').endswith('test_cache_dir/osm/10/20/30.png')) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_tile_manager_cache.py b/tests/test_tile_manager_cache.py new file mode 100644 index 0000000..7850cc4 --- /dev/null +++ b/tests/test_tile_manager_cache.py @@ -0,0 +1,34 @@ +import unittest +import tempfile +import os +from PIL import Image + +from map_manager.services import OpenStreetMapService +from map_manager.tile_manager import MapTileManager + + +class TestTileManagerCacheBehavior(unittest.TestCase): + def test_cache_hit_and_placeholder(self): + service = OpenStreetMapService() + with tempfile.TemporaryDirectory() as td: + mgr = MapTileManager(service, cache_root_directory=td, enable_online_tile_fetching=False, tile_pixel_size=64) + # Create a cached tile file + p = mgr._get_tile_cache_file_path(5, 10, 12) + p.parent.mkdir(parents=True, exist_ok=True) + img = Image.new("RGB", (64, 64), (255, 0, 0)) + img.save(p) + + # Now get_tile_image should load from cache + loaded = mgr.get_tile_image(5, 10, 12) + self.assertIsNotNone(loaded) + self.assertEqual(loaded.size, (64, 64)) + + # For a missing tile (cache miss) and online disabled, should return a placeholder image + missing = mgr.get_tile_image(6, 11, 13) + self.assertIsNotNone(missing) + # Placeholder should have expected tile size + self.assertEqual(missing.size, (64, 64)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..0ecc4fc --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,18 @@ +import unittest +from map_manager.utils import get_hgt_tile_geographic_bounds, calculate_meters_per_pixel + + +class TestUtils(unittest.TestCase): + def test_hgt_tile_bounds(self): + # N45E007 -> west=7, south=45, east=8, north=46 + bbox = get_hgt_tile_geographic_bounds(45, 7) + self.assertEqual(bbox, (7.0, 45.0, 8.0, 46.0)) + + def test_meters_per_pixel(self): + mpp = calculate_meters_per_pixel(45.0, 10, tile_pixel_size=256) + self.assertIsNotNone(mpp) + self.assertGreater(mpp, 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/visualizer.py b/visualizer.py new file mode 100644 index 0000000..16c44c8 --- /dev/null +++ b/visualizer.py @@ -0,0 +1,46 @@ +"""Lightweight MapVisualizer: minimal display helper using OpenCV if available.""" +from typing import Optional, Callable + +try: + import cv2 + CV2_AVAILABLE = True +except Exception: + cv2 = None + CV2_AVAILABLE = False + +class MapVisualizer: + def __init__(self, engine): + self.engine = engine + self.callback_on_click: Optional[Callable[[float, float], None]] = None + + def set_click_callback(self, callback: Callable[[float, float], None]): + self.callback_on_click = callback + + def show_pil_image(self, pil_image): + if pil_image is None: + return + if CV2_AVAILABLE: + import numpy as np + img = pil_image.convert('RGB') + arr = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) + cv2.imshow('Map', arr) + cv2.waitKey(0) + cv2.destroyAllWindows() + else: + # Fallback: save temp file and open with default image viewer + import tempfile, os + tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.png') + pil_image.save(tmp.name) + tmp.close() + if os.name == 'nt': + os.startfile(tmp.name) # type: ignore + else: + try: + import webbrowser + webbrowser.open('file://' + tmp.name) + except Exception: + pass + + def show_point(self, lat: float, lon: float, zoom: int = 12, tiles_radius: int = 1): + img = self.engine.get_image_for_point(lat, lon, zoom, tiles_radius=tiles_radius) + self.show_pil_image(img)