484 lines
16 KiB
Python
484 lines
16 KiB
Python
import tkinter as tk
|
|
from tkinter import ttk, messagebox, filedialog
|
|
from tkinter.scrolledtext import ScrolledText
|
|
from pathlib import Path
|
|
import threading
|
|
import queue
|
|
import csv
|
|
from datetime import datetime
|
|
import os
|
|
import time
|
|
|
|
from ..core.scanner import find_source_files
|
|
from ..config.languages import LANGUAGE_EXTENSIONS
|
|
from tkinter_logger import TkinterLogger, get_logger
|
|
import logging
|
|
from .topbar import TopBar
|
|
from .file_viewer import FileViewer
|
|
from ..core.worker import WorkerManager
|
|
from ..core.differ import BaselineManager, Differ
|
|
from ..config import settings as app_settings
|
|
from .action_handlers import ActionHandlers
|
|
from .progress_handlers import ProgressHandlers
|
|
from .export_handler import ExportHandler
|
|
|
|
|
|
class App(tk.Tk):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.title("PyUcc - Interfaccia Grafica")
|
|
self.geometry("1024x700")
|
|
|
|
# Shared top bar (profile selection) placed above the actions
|
|
self.topbar = TopBar(self)
|
|
self.topbar.pack(fill="x", side="top")
|
|
|
|
# Centralized Actions frame: single place with buttons for Scan / Countings / Metrics
|
|
actions_frame = ttk.LabelFrame(self, text="Actions")
|
|
actions_frame.pack(fill="x", side="top", padx=8, pady=6)
|
|
|
|
# Buttons for the various functions. Handlers are implemented below.
|
|
self.scan_btn = ttk.Button(actions_frame, text="Scan", command=self._on_action_scan)
|
|
self.scan_btn.grid(row=0, column=0, padx=(6,4))
|
|
self.count_btn = ttk.Button(actions_frame, text="Countings", command=self._on_action_countings)
|
|
self.count_btn.grid(row=0, column=1, padx=(4,4))
|
|
self.metrics_btn = ttk.Button(actions_frame, text="Metrics", command=self._on_action_metrics)
|
|
self.metrics_btn.grid(row=0, column=2, padx=(4,4))
|
|
self.differ_btn = ttk.Button(actions_frame, text="Differing", command=self._on_action_differ)
|
|
self.differ_btn.grid(row=0, column=3, padx=(4,4))
|
|
self.cancel_btn = ttk.Button(actions_frame, text="Cancel", command=self._on_action_cancel, state="disabled")
|
|
self.cancel_btn.grid(row=0, column=4, padx=(20,4))
|
|
self.export_btn = ttk.Button(actions_frame, text="Export CSV", command=self._on_action_export, state="disabled")
|
|
self.export_btn.grid(row=0, column=5, padx=(4,4))
|
|
|
|
# Progress frame: progress bar and file counter below Actions
|
|
progress_frame = ttk.Frame(self)
|
|
progress_frame.pack(fill="x", side="top", padx=8, pady=(0,6))
|
|
|
|
ttk.Label(progress_frame, text="Progress:").pack(side="left", padx=(6,6))
|
|
self.progress = ttk.Progressbar(progress_frame, mode="determinate")
|
|
self.progress.pack(side="left", fill="x", expand=True, padx=(0,12))
|
|
|
|
self._lbl_files = ttk.Label(progress_frame, text="Files: 0/0")
|
|
self._lbl_files.pack(side="right", padx=(6,6))
|
|
|
|
# Results frame contains a single Treeview that will be reused by all actions
|
|
results_frame = ttk.LabelFrame(self, text="Results")
|
|
results_frame.pack(fill="both", expand=True, padx=8, pady=(0,6))
|
|
|
|
# Treeview for results: columns will be reconfigured per-action
|
|
self.results_columns = ("name", "path")
|
|
self.results_tree = ttk.Treeview(results_frame, columns=self.results_columns, show="headings")
|
|
self.results_tree.heading("name", text="File")
|
|
self.results_tree.heading("path", text="Path")
|
|
self.results_tree.column("name", width=400, anchor="w")
|
|
self.results_tree.column("path", width=600, anchor="w")
|
|
self.results_tree.grid(row=0, column=0, sticky="nsew")
|
|
vsb_r = ttk.Scrollbar(results_frame, orient="vertical", command=self.results_tree.yview)
|
|
vsb_r.grid(row=0, column=1, sticky="ns")
|
|
hsb_r = ttk.Scrollbar(results_frame, orient="horizontal", command=self.results_tree.xview)
|
|
hsb_r.grid(row=1, column=0, columnspan=1, sticky="ew")
|
|
self.results_tree.configure(yscrollcommand=vsb_r.set, xscrollcommand=hsb_r.set)
|
|
results_frame.rowconfigure(0, weight=1)
|
|
results_frame.columnconfigure(0, weight=1)
|
|
|
|
# Bind double-click per aprire diff viewer (se è un differ result)
|
|
self.results_tree.bind('<Double-Button-1>', self._on_results_double_click)
|
|
|
|
# Variabile per tracciare la modalità corrente (scan, countings, metrics, differ)
|
|
self._current_mode = None
|
|
# Memorizza i path dei file per il differ
|
|
self._differ_baseline_root = None
|
|
self._differ_current_root = None
|
|
|
|
# Keep the notebook and individual tabs instantiated for compatibility (not packed)
|
|
self.notebook = ttk.Notebook(self)
|
|
|
|
# Status bar at the bottom: operation status (left) and resource monitor (right)
|
|
status_frame = ttk.Frame(self, relief=tk.SUNKEN, borderwidth=1)
|
|
status_frame.pack(fill="x", side="bottom", padx=0, pady=0)
|
|
|
|
# Left side: operation status
|
|
self.phase_var = tk.StringVar(value="Ready")
|
|
self._lbl_phase = ttk.Label(status_frame, textvariable=self.phase_var, anchor="w")
|
|
self._lbl_phase.pack(side="left", padx=(6,12))
|
|
|
|
# Right side: resource monitor
|
|
from resource_monitor import TkinterResourceMonitor
|
|
self.resource_var = tk.StringVar(value="CPU: 0% | RAM: 0 MB | Threads: 0")
|
|
self._lbl_resources = ttk.Label(status_frame, textvariable=self.resource_var, anchor="e")
|
|
self._lbl_resources.pack(side="right", padx=(12,6))
|
|
|
|
# Initialize and start resource monitor
|
|
try:
|
|
self.resource_monitor = TkinterResourceMonitor(
|
|
tk_widget=self,
|
|
string_var=self.resource_var,
|
|
poll_interval=1.0 # Update every second
|
|
)
|
|
self.resource_monitor.start()
|
|
except Exception as e:
|
|
self.log(f"Resource monitor not available: {e}", level="WARNING")
|
|
|
|
# Log frame above the status bar
|
|
log_frame = ttk.LabelFrame(self, text="Log")
|
|
log_frame.pack(fill="x", side="bottom", padx=6, pady=(0,6))
|
|
# ScrolledText for logs (read-only)
|
|
self.log_text = ScrolledText(log_frame, height=8, wrap="word", state="disabled")
|
|
self.log_text.pack(fill="both", expand=True, padx=6, pady=6)
|
|
|
|
# Initialize centralized logging system using tkinter_logger submodule
|
|
# Setup the logger system and attach to the ScrolledText widget
|
|
try:
|
|
self.logger_system = TkinterLogger(self)
|
|
self.logger_system.setup(enable_console=False)
|
|
|
|
# Add Tkinter handler with custom colors
|
|
color_map = {
|
|
logging.INFO: 'black',
|
|
logging.WARNING: '#d87f0a',
|
|
logging.ERROR: '#d62728',
|
|
}
|
|
self.logger_system.add_tkinter_handler(self.log_text, level_colors=color_map)
|
|
except Exception:
|
|
pass
|
|
|
|
# small helper: expose a convenient log method that forwards to
|
|
# the standard logging system so messages flow through the queue.
|
|
def log(self, msg: str, level: str = "INFO"):
|
|
lg = get_logger("pyucc")
|
|
lvl = getattr(logging, level.upper(), logging.INFO)
|
|
try:
|
|
if lvl >= logging.ERROR:
|
|
lg.error(msg)
|
|
elif lvl >= logging.WARNING:
|
|
lg.warning(msg)
|
|
else:
|
|
lg.info(msg)
|
|
except Exception:
|
|
try:
|
|
print(f"[{level}] {msg}")
|
|
except Exception:
|
|
pass
|
|
|
|
self.log = log.__get__(self)
|
|
|
|
# Worker manager (background task runner)
|
|
self.worker = WorkerManager()
|
|
|
|
# Track task names for logging
|
|
self._task_names = {}
|
|
|
|
# Initialize handlers (refactored to separate modules)
|
|
self.action_handlers = ActionHandlers(self)
|
|
self.progress_handlers = ProgressHandlers(self)
|
|
self.export_handler = ExportHandler(self)
|
|
|
|
# small helper to set human-readable phase in the status bar
|
|
def _set_phase(self, text: str):
|
|
try:
|
|
self.phase_var.set(text)
|
|
except Exception:
|
|
pass
|
|
self._set_phase = _set_phase.__get__(self)
|
|
# Tabs removed — functionality is provided by the unified Actions/Results UI
|
|
# poll the worker UI queue and dispatch callbacks in the main thread (50ms for better responsiveness)
|
|
self.after(50, self._poll_worker_ui_queue)
|
|
|
|
def _disable_action_buttons(self):
|
|
"""Disable all action buttons except Cancel when an operation is running."""
|
|
try:
|
|
self.scan_btn.config(state='disabled')
|
|
self.count_btn.config(state='disabled')
|
|
self.metrics_btn.config(state='disabled')
|
|
self.differ_btn.config(state='disabled')
|
|
self.export_btn.config(state='disabled')
|
|
self.cancel_btn.config(state='normal')
|
|
except Exception:
|
|
pass
|
|
|
|
def _enable_action_buttons(self):
|
|
"""Re-enable all action buttons when operation is complete."""
|
|
try:
|
|
self.scan_btn.config(state='normal')
|
|
self.count_btn.config(state='normal')
|
|
self.metrics_btn.config(state='normal')
|
|
self.differ_btn.config(state='normal')
|
|
# Export button state depends on whether we have results
|
|
if self.results_tree.get_children():
|
|
self.export_btn.config(state='normal')
|
|
else:
|
|
self.export_btn.config(state='disabled')
|
|
self.cancel_btn.config(state='disabled')
|
|
except Exception:
|
|
pass
|
|
|
|
def _set_results_columns(self, cols):
|
|
"""Reconfigure the shared results tree to use given columns.
|
|
Cols is an iterable of (col_id, ...) where col_id are string keys.
|
|
"""
|
|
# Clear existing
|
|
for c in self.results_tree.get_children(""):
|
|
self.results_tree.delete(c)
|
|
# reconfigure columns
|
|
self.results_tree.config(columns=cols)
|
|
for cid in cols:
|
|
self.results_tree.heading(cid, text=cid.title())
|
|
self.results_tree.column(cid, width=120, anchor='w')
|
|
|
|
def _on_results_double_click(self, event):
|
|
"""Gestisce il doppio click su una riga della tabella results."""
|
|
# Ottieni la riga selezionata
|
|
selection = self.results_tree.selection()
|
|
if not selection:
|
|
return
|
|
|
|
item = selection[0]
|
|
values = self.results_tree.item(item, 'values')
|
|
|
|
if not values:
|
|
return
|
|
|
|
# Gestisci in base alla modalità corrente
|
|
if self._current_mode in ('scan', 'countings', 'metrics'):
|
|
# In modalità scan/counting/metrics, apri il FileViewer
|
|
# Il path del file è nella seconda colonna (indice 1)
|
|
if len(values) < 2:
|
|
return
|
|
|
|
file_path = values[1] # La colonna "path" contiene il path completo
|
|
|
|
if not file_path or not os.path.exists(file_path):
|
|
messagebox.showwarning("File Viewer", f"File not found: {file_path}")
|
|
return
|
|
|
|
# Apri il file viewer
|
|
try:
|
|
FileViewer(self, file_path)
|
|
except Exception as e:
|
|
messagebox.showerror("File Viewer Error", f"Failed to open file viewer: {e}")
|
|
self.log(f"File viewer error: {e}", level='ERROR')
|
|
|
|
elif self._current_mode == 'differ':
|
|
# In modalità differ, apri il DiffViewer
|
|
if len(values) < 2:
|
|
return
|
|
|
|
fileA = values[0] # Path del file nella baseline
|
|
fileB = values[1] # Path del file corrente
|
|
|
|
# Costruisci i path completi
|
|
file_a_path = None
|
|
file_b_path = None
|
|
|
|
if fileA and self._differ_baseline_root:
|
|
file_a_path = os.path.join(self._differ_baseline_root, fileA)
|
|
if not os.path.exists(file_a_path):
|
|
file_a_path = None
|
|
|
|
if fileB and self._differ_current_root:
|
|
file_b_path = os.path.join(self._differ_current_root, fileB)
|
|
if not os.path.exists(file_b_path):
|
|
file_b_path = None
|
|
|
|
# Verifica che almeno uno dei due file esista
|
|
if not file_a_path and not file_b_path:
|
|
messagebox.showwarning("Diff Viewer", "Neither baseline nor current file exists.")
|
|
return
|
|
|
|
# Apri il diff viewer
|
|
try:
|
|
from .diff_viewer import show_diff_viewer
|
|
show_diff_viewer(self, file_a_path, file_b_path,
|
|
title_a="Baseline", title_b="Current")
|
|
except Exception as e:
|
|
messagebox.showerror("Diff Viewer Error", f"Failed to open diff viewer: {e}")
|
|
self.log(f"Diff viewer error: {e}", level='ERROR')
|
|
|
|
def _on_action_scan(self):
|
|
"""Delegate scan action to ActionHandlers."""
|
|
self.action_handlers.handle_scan()
|
|
|
|
def _on_action_countings(self):
|
|
"""Delegate countings action to ActionHandlers."""
|
|
self.action_handlers.handle_countings()
|
|
|
|
def _on_action_metrics(self):
|
|
"""Delegate metrics action to ActionHandlers."""
|
|
self.action_handlers.handle_metrics()
|
|
|
|
def _on_action_differ(self):
|
|
"""Delegate differ action to ActionHandlers."""
|
|
self.action_handlers.handle_differ()
|
|
|
|
# Old differ implementation moved to action_handlers.py
|
|
|
|
def _on_action_cancel(self):
|
|
self.log("Action: Cancel requested", level='WARNING')
|
|
if getattr(self, '_current_task_id', None):
|
|
try:
|
|
self.worker.cancel(self._current_task_id)
|
|
except Exception:
|
|
pass
|
|
self._current_task_id = None
|
|
self._set_phase("Idle")
|
|
self._enable_action_buttons()
|
|
|
|
def _on_action_export(self):
|
|
"""Delegate export action to ExportHandler."""
|
|
self.export_handler.export_to_csv()
|
|
|
|
def _resolve_profile_and_filters(self):
|
|
"""Return (paths, allowed_exts, ignore_patterns, profile) for current selection."""
|
|
pr = getattr(self.topbar, 'current_profile', None)
|
|
if not pr:
|
|
pname = getattr(self.topbar, 'profile_var', None)
|
|
if pname:
|
|
from ..config import profiles as profiles_cfg
|
|
pr = profiles_cfg.find_profile(pname.get()) if hasattr(pname, 'get') else profiles_cfg.find_profile(str(pname))
|
|
|
|
paths = []
|
|
if pr:
|
|
paths = pr.get('paths') or []
|
|
else:
|
|
p = getattr(self.topbar, 'path_var', None)
|
|
if p:
|
|
val = p.get().strip() if hasattr(p, 'get') else str(p).strip()
|
|
if val:
|
|
paths = [val]
|
|
|
|
# build allowed extensions from profile languages (if any)
|
|
allowed_exts = None
|
|
if pr:
|
|
langs = pr.get('languages', []) or []
|
|
exts = []
|
|
for ln in langs:
|
|
if ln in LANGUAGE_EXTENSIONS:
|
|
exts.extend(LANGUAGE_EXTENSIONS[ln])
|
|
else:
|
|
val = ln.strip()
|
|
if val.startswith('.'):
|
|
exts.append(val.lower())
|
|
elif len(val) <= 5 and not val.isalpha():
|
|
exts.append(f".{val.lower()}")
|
|
else:
|
|
pass
|
|
if exts:
|
|
allowed_exts = set(exts)
|
|
ignore_patterns = pr.get('ignore', []) if pr else None
|
|
return paths, allowed_exts, ignore_patterns, pr
|
|
|
|
def _scan_done(self, results):
|
|
# results: list of Path
|
|
try:
|
|
files = results or []
|
|
for p in self.results_tree.get_children(""):
|
|
self.results_tree.delete(p)
|
|
for f in files:
|
|
try:
|
|
self.results_tree.insert('', 'end', values=(f.name, str(f)))
|
|
except Exception:
|
|
pass
|
|
self._current_task_id = None
|
|
self._set_phase("Idle")
|
|
self._enable_action_buttons()
|
|
except Exception as e:
|
|
messagebox.showerror('Scan Error', str(e))
|
|
self._set_phase("Idle")
|
|
self._enable_action_buttons()
|
|
|
|
def _on_count_progress(self, res):
|
|
"""Delegate count progress to ProgressHandlers."""
|
|
self.progress_handlers.on_count_progress(res)
|
|
|
|
def _on_count_done(self, results):
|
|
"""Delegate count done to ProgressHandlers."""
|
|
self.progress_handlers.on_count_done(results)
|
|
|
|
def _on_metrics_progress(self, res):
|
|
"""Delegate metrics progress to ProgressHandlers."""
|
|
self.progress_handlers.on_metrics_progress(res)
|
|
|
|
def _on_metrics_done(self, results):
|
|
"""Delegate metrics done to ProgressHandlers."""
|
|
self.progress_handlers.on_metrics_done(results)
|
|
|
|
def _poll_worker_ui_queue(self):
|
|
try:
|
|
while True:
|
|
msg = self.worker.ui_queue.get_nowait()
|
|
# allow GUI to update task list and status
|
|
self._handle_worker_msg(msg)
|
|
# dispatch registered callbacks
|
|
try:
|
|
self.worker.dispatch_message(msg)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
# queue empty or other; schedule next poll
|
|
pass
|
|
self.after(50, self._poll_worker_ui_queue)
|
|
|
|
def _handle_worker_msg(self, msg: tuple):
|
|
"""Log worker messages to the application's log so there's a single
|
|
place (the Log panel) to follow task lifecycle and results.
|
|
"""
|
|
typ, task_id, payload = msg
|
|
if typ == "started":
|
|
name = payload.get("func") if isinstance(payload, dict) else str(payload)
|
|
# Store task name for later use
|
|
self._task_names[task_id] = name
|
|
self.log(f"Task {task_id[:8]} started: {name}", level="INFO")
|
|
elif typ == "progress":
|
|
# payload is expected to be a partial result or status dictionary
|
|
# Progress updates can be very frequent and slow down the GUI
|
|
# so we skip logging them to avoid clutter
|
|
pass # Skip progress logging
|
|
elif typ == "done":
|
|
# Retrieve task name from stored mapping
|
|
name = self._task_names.get(task_id, "unknown")
|
|
self.log(f"Task {task_id[:8]} {name} done.", level="INFO")
|
|
# Clean up task name from memory
|
|
self._task_names.pop(task_id, None)
|
|
elif typ == "error":
|
|
# payload is typically a traceback string or exception info
|
|
name = self._task_names.get(task_id, "unknown")
|
|
self.log(f"Task {task_id[:8]} {name} error: {payload}", level="ERROR")
|
|
# Clean up task name from memory
|
|
self._task_names.pop(task_id, None)
|
|
elif typ == "cancelled":
|
|
name = self._task_names.get(task_id, "unknown")
|
|
self.log(f"Task {task_id[:8]} {name} cancelled", level="WARNING")
|
|
# Clean up task name from memory
|
|
self._task_names.pop(task_id, None)
|
|
|
|
def on_closing(self):
|
|
"""Cleanup when closing the application."""
|
|
try:
|
|
# Stop resource monitor
|
|
if hasattr(self, 'resource_monitor'):
|
|
self.resource_monitor.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
# Shutdown logger system
|
|
if hasattr(self, 'logger_system'):
|
|
self.logger_system.shutdown()
|
|
except Exception:
|
|
pass
|
|
|
|
self.destroy()
|
|
|
|
|
|
|
|
|
|
def run_app():
|
|
app = App()
|
|
app.protocol("WM_DELETE_WINDOW", app.on_closing)
|
|
app.mainloop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_app()
|