307 lines
12 KiB
Python
307 lines
12 KiB
Python
import tkinter as tk
|
|
from tkinter import ttk, messagebox, filedialog
|
|
from pathlib import Path
|
|
import threading
|
|
import queue
|
|
import csv
|
|
from datetime import datetime
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox, filedialog
|
|
from pathlib import Path
|
|
import threading
|
|
import queue
|
|
import csv
|
|
from datetime import datetime
|
|
|
|
from ..core.scanner import find_source_files
|
|
from ..config.languages import LANGUAGE_EXTENSIONS
|
|
from ..utils import logger as app_logger
|
|
import logging
|
|
from .file_viewer import FileViewer
|
|
|
|
|
|
class ScannerTab(ttk.Frame):
|
|
"""Tab that runs the source files scanner and shows results.
|
|
|
|
The ScannerTab relies on a shared `topbar` instance for folder
|
|
selection. This reduces duplication between tabs.
|
|
"""
|
|
|
|
def __init__(self, parent, topbar, app=None, *args, **kwargs):
|
|
"""Initialize the ScannerTab.
|
|
|
|
Args:
|
|
parent: Notebook widget that contains this tab.
|
|
topbar: Instance of `TopBar` exposing `path_var`.
|
|
"""
|
|
super().__init__(parent, *args, **kwargs)
|
|
|
|
self.topbar = topbar
|
|
self.app = app
|
|
self.queue = queue.Queue()
|
|
self.worker = None
|
|
|
|
# Controls
|
|
controls = ttk.Frame(self)
|
|
controls.grid(row=0, column=0, sticky="ew", padx=8, pady=6)
|
|
self.scan_btn = ttk.Button(controls, text="Start Scan", command=self.start_scan)
|
|
self.scan_btn.grid(row=0, column=0, sticky="w")
|
|
self.cancel_btn = ttk.Button(controls, text="Cancel", command=self.cancel_scan, state="disabled")
|
|
self.cancel_btn.grid(row=0, column=1, padx=(8, 0))
|
|
# Export button next to scanner controls
|
|
self.export_btn = ttk.Button(controls, text="Export CSV", command=self._export_csv, state="disabled")
|
|
self.export_btn.grid(row=0, column=2, padx=(8, 0))
|
|
|
|
# Progress and output
|
|
self.progress = ttk.Progressbar(self, mode="indeterminate")
|
|
self.progress.grid(row=1, column=0, sticky="ew", padx=8, pady=(6, 0))
|
|
|
|
# Treeview for scanner results: filename, path, size, created, modified
|
|
columns = ("name", "path", "size", "created", "modified")
|
|
self.tree = ttk.Treeview(self, columns=columns, show="headings")
|
|
self.tree.heading("name", text="File", command=lambda: self._sort_by("name", False))
|
|
self.tree.heading("path", text="Path", command=lambda: self._sort_by("path", False))
|
|
self.tree.heading("size", text="Size (bytes)", command=lambda: self._sort_by("size", False))
|
|
self.tree.heading("created", text="Created", command=lambda: self._sort_by("created", False))
|
|
self.tree.heading("modified", text="Modified", command=lambda: self._sort_by("modified", False))
|
|
|
|
# column sizing: make numeric columns narrower and right-aligned
|
|
self.tree.column("name", width=300, anchor="w", stretch=True)
|
|
self.tree.column("path", width=400, anchor="w", stretch=True)
|
|
self.tree.column("size", width=90, anchor="e", stretch=False)
|
|
self.tree.column("created", width=160, anchor="w", stretch=False)
|
|
self.tree.column("modified", width=160, anchor="w", stretch=False)
|
|
|
|
self.tree.grid(row=2, column=0, sticky="nsew", padx=8, pady=8)
|
|
vsb = ttk.Scrollbar(self, orient="vertical", command=self.tree.yview)
|
|
hs = ttk.Scrollbar(self, orient="horizontal", command=self.tree.xview)
|
|
self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hs.set)
|
|
vsb.grid(row=2, column=1, sticky="ns")
|
|
hs.grid(row=3, column=0, columnspan=1, sticky="ew", padx=8)
|
|
|
|
self.rowconfigure(2, weight=1)
|
|
self.columnconfigure(0, weight=1)
|
|
|
|
# bind double-click to open file viewer
|
|
self.tree.bind("<Double-1>", self._on_double_click)
|
|
|
|
def start_scan(self):
|
|
"""Start scanning the folder(s) configured in the shared TopBar or current profile.
|
|
|
|
If a profile is active and contains multiple `paths`, those are used.
|
|
"""
|
|
# determine list of paths to scan: prefer profile.paths if available
|
|
paths = []
|
|
pr = getattr(self.topbar, "current_profile", None)
|
|
if not pr:
|
|
pname = getattr(self.topbar, "profile_var", None)
|
|
if pname:
|
|
from ..config import profiles as profiles_cfg
|
|
|
|
pr = profiles_cfg.find_profile(pname.get()) if hasattr(pname, "get") else profiles_cfg.find_profile(str(pname))
|
|
|
|
if pr:
|
|
# only new-style 'paths' supported
|
|
paths = pr.get("paths") or []
|
|
else:
|
|
p = self.topbar.path_var.get().strip()
|
|
if p:
|
|
paths = [p]
|
|
|
|
if not paths:
|
|
messagebox.showwarning("Missing folder", "Please select a folder or profile to analyze first.")
|
|
return
|
|
|
|
# normalize to Path objects
|
|
dirs = [Path(p) for p in paths]
|
|
|
|
self.scan_btn.config(state="disabled")
|
|
# log start
|
|
try:
|
|
if getattr(self, 'app', None):
|
|
self.app.log(f"Scan started on: {', '.join(str(p) for p in dirs)}", "INFO")
|
|
except Exception:
|
|
pass
|
|
self.cancel_btn.config(state="normal")
|
|
# clear tree
|
|
for item in self.tree.get_children():
|
|
self.tree.delete(item)
|
|
self.progress.start(50)
|
|
|
|
self.worker = threading.Thread(target=self._worker_scan, args=(dirs,), daemon=True)
|
|
self.worker.start()
|
|
self.after(200, self._poll_queue)
|
|
|
|
def cancel_scan(self):
|
|
"""Request cancellation (informational only — scanner must support cancel)."""
|
|
if self.worker and self.worker.is_alive():
|
|
messagebox.showinfo("Cancel", "Cancellation requested but not supported by the worker.")
|
|
try:
|
|
if getattr(self, 'app', None):
|
|
self.app.log("Scan cancelled by user", "WARNING")
|
|
except Exception:
|
|
pass
|
|
self._finish_scan()
|
|
|
|
def _worker_scan(self, directories):
|
|
"""Worker thread: call `find_source_files` and push results to queue.
|
|
|
|
If a profile is selected in the TopBar, build an allowed extensions set
|
|
from the profile languages and pass it to the scanner so only
|
|
relevant files are returned.
|
|
"""
|
|
try:
|
|
allowed_exts = None
|
|
pr = getattr(self.topbar, "current_profile", None)
|
|
# fallback: if current_profile not set, try to resolve from combobox selection
|
|
if not pr:
|
|
pname = getattr(self.topbar, "profile_var", None)
|
|
if pname:
|
|
from ..config import profiles as profiles_cfg
|
|
|
|
pr = profiles_cfg.find_profile(pname.get()) if hasattr(pname, "get") else profiles_cfg.find_profile(str(pname))
|
|
if pr:
|
|
langs = pr.get("languages", []) or []
|
|
exts = []
|
|
for ln in langs:
|
|
if ln in LANGUAGE_EXTENSIONS:
|
|
exts.extend(LANGUAGE_EXTENSIONS[ln])
|
|
else:
|
|
val = ln.strip()
|
|
if val.startswith('.'):
|
|
exts.append(val.lower())
|
|
elif len(val) <= 5 and val.isalnum():
|
|
# simple heuristic: treat short alnum tokens as extension-like
|
|
exts.append(f".{val.lower()}")
|
|
else:
|
|
pass
|
|
if exts:
|
|
allowed_exts = set(exts)
|
|
ignore_patterns = pr.get("ignore", []) if pr else None
|
|
|
|
files = []
|
|
# directories may include files or folders; handle both
|
|
for d in directories:
|
|
try:
|
|
if d.is_dir():
|
|
files.extend(find_source_files(d, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns))
|
|
elif d.is_file():
|
|
files.append(d)
|
|
except Exception:
|
|
# skip problematic entries but continue
|
|
continue
|
|
|
|
# deduplicate while preserving order
|
|
seen = set()
|
|
unique_files = []
|
|
for f in files:
|
|
sf = str(f)
|
|
if sf not in seen:
|
|
seen.add(sf)
|
|
unique_files.append(f)
|
|
|
|
self.queue.put(("done", unique_files))
|
|
except Exception as e:
|
|
self.queue.put(("error", str(e)))
|
|
|
|
def _poll_queue(self):
|
|
"""Poll the internal queue and update UI when results arrive."""
|
|
try:
|
|
while True:
|
|
item = self.queue.get_nowait()
|
|
tag, payload = item
|
|
if tag == "done":
|
|
files = payload
|
|
# populate tree with file metadata
|
|
for p in files:
|
|
try:
|
|
stat = p.stat()
|
|
size = stat.st_size
|
|
created = datetime.fromtimestamp(stat.st_ctime).isoformat(sep=' ', timespec='seconds')
|
|
modified = datetime.fromtimestamp(stat.st_mtime).isoformat(sep=' ', timespec='seconds')
|
|
except Exception:
|
|
size = 0
|
|
created = ""
|
|
modified = ""
|
|
self.tree.insert("", "end", values=(p.name, str(p), size, created, modified))
|
|
self.export_btn.config(state="normal")
|
|
try:
|
|
if getattr(self, 'app', None):
|
|
self.app.log(f"Scan completed: {len(files)} files found", "INFO")
|
|
except Exception:
|
|
pass
|
|
self._finish_scan()
|
|
elif tag == "error":
|
|
messagebox.showerror("Error", f"Error during scanning: {payload}")
|
|
try:
|
|
if getattr(self, 'app', None):
|
|
self.app.log(f"Error during scanning: {payload}", "ERROR")
|
|
except Exception:
|
|
pass
|
|
self._finish_scan()
|
|
except queue.Empty:
|
|
if self.worker and self.worker.is_alive():
|
|
self.after(200, self._poll_queue)
|
|
|
|
def _on_double_click(self, _evt=None):
|
|
sel = self.tree.selection()
|
|
if not sel:
|
|
return
|
|
item = sel[0]
|
|
vals = self.tree.item(item, "values")
|
|
# values: (name, path, size, created, modified)
|
|
if len(vals) >= 2:
|
|
path = vals[1]
|
|
FileViewer(self, path)
|
|
|
|
def _finish_scan(self):
|
|
"""Finalize UI state after scan finished or cancelled."""
|
|
self.progress.stop()
|
|
self.scan_btn.config(state="normal")
|
|
self.cancel_btn.config(state="disabled")
|
|
|
|
def _sort_by(self, col, descending):
|
|
col_map = {"name": 0, "path": 1, "size": 2, "created": 3, "modified": 4}
|
|
idx = col_map.get(col, 0)
|
|
children = list(self.tree.get_children(""))
|
|
def _key(item):
|
|
val = self.tree.item(item, "values")[idx]
|
|
# numeric for size
|
|
if col == "size":
|
|
try:
|
|
return int(val)
|
|
except Exception:
|
|
return 0
|
|
# otherwise string
|
|
try:
|
|
return str(val).lower()
|
|
except Exception:
|
|
return ""
|
|
children.sort(key=_key, reverse=descending)
|
|
for index, item in enumerate(children):
|
|
self.tree.move(item, "", index)
|
|
self.tree.heading(col, command=lambda c=col: self._sort_by(c, not descending))
|
|
|
|
def _export_csv(self):
|
|
path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV files", "*.csv"), ("All files", "*")])
|
|
if not path:
|
|
return
|
|
headers = ["name", "path", "size", "created", "modified"]
|
|
from ..utils.csv_exporter import export_rows_to_csv
|
|
|
|
try:
|
|
rows = (self.tree.item(child, "values") for child in self.tree.get_children())
|
|
written = export_rows_to_csv(path, headers, rows)
|
|
messagebox.showinfo("Export", f"Exported {written} rows to {path}")
|
|
try:
|
|
if getattr(self, 'app', None):
|
|
self.app.log(f"Exported {written} rows to {path}", "INFO")
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
messagebox.showerror("Export Error", str(e))
|
|
try:
|
|
if getattr(self, 'app', None):
|
|
self.app.log(f"Export error: {e}", "ERROR")
|
|
except Exception:
|
|
pass |