SXXXXXXX_PyUCC/pyucc/gui/gui.py

662 lines
25 KiB
Python

import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from tkinter.scrolledtext import ScrolledText
from pathlib import Path
import threading
import queue
import csv
from datetime import datetime
import os
import time
from ..core.scanner import find_source_files
from ..config.languages import LANGUAGE_EXTENSIONS
from tkinter_logger import TkinterLogger, get_logger
import logging
from .topbar import TopBar
from .file_viewer import FileViewer
from ..core.worker import WorkerManager
from ..core.differ import BaselineManager, Differ
from ..config import settings as app_settings
from .action_handlers import ActionHandlers
from .progress_handlers import ProgressHandlers
from .export_handler import ExportHandler
try:
from pyucc import _version as wrapper_version
WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})"
except ImportError:
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
class App(tk.Tk):
def __init__(self):
super().__init__()
base_title = f"PyUcc- {WRAPPER_APP_VERSION_STRING}"
self.title(base_title)
self.geometry("1024x700")
# Shared top bar (profile selection) placed above the actions
self.topbar = TopBar(self)
self.topbar.pack(fill="x", side="top")
# Centralized Actions frame: single place with buttons for Scan / Countings / Metrics
actions_frame = ttk.LabelFrame(self, text="Actions")
actions_frame.pack(fill="x", side="top", padx=8, pady=6)
# Buttons for the various functions. Handlers are implemented below.
self.scan_btn = ttk.Button(
actions_frame, text="🔍 Scan", command=self._on_action_scan
)
self.scan_btn.grid(row=0, column=0, padx=(6, 4))
self.count_btn = ttk.Button(
actions_frame, text="🔢 Countings", command=self._on_action_countings
)
self.count_btn.grid(row=0, column=1, padx=(4, 4))
self.metrics_btn = ttk.Button(
actions_frame, text="📊 Metrics", command=self._on_action_metrics
)
self.metrics_btn.grid(row=0, column=2, padx=(4, 4))
self.differ_btn = ttk.Button(
actions_frame, text="🔀 Differing", command=self._on_action_differ
)
self.differ_btn.grid(row=0, column=3, padx=(4, 4))
self.cancel_btn = ttk.Button(
actions_frame,
text="⛔ Cancel",
command=self._on_action_cancel,
state="disabled",
)
self.cancel_btn.grid(row=0, column=4, padx=(20, 4))
self.export_btn = ttk.Button(
actions_frame,
text="💾 Export CSV",
command=self._on_action_export,
state="disabled",
)
self.export_btn.grid(row=0, column=5, padx=(4, 4))
# Progress frame: progress bar and file counter below Actions
progress_frame = ttk.Frame(self)
progress_frame.pack(fill="x", side="top", padx=8, pady=(0, 6))
ttk.Label(progress_frame, text="Progress:").pack(side="left", padx=(6, 6))
self.progress = ttk.Progressbar(progress_frame, mode="determinate")
self.progress.pack(side="left", fill="x", expand=True, padx=(0, 12))
self._lbl_files = ttk.Label(progress_frame, text="Files: 0/0")
self._lbl_files.pack(side="right", padx=(6, 6))
# Results frame contains a single Treeview that will be reused by all actions
results_frame = ttk.LabelFrame(self, text="Results")
results_frame.pack(fill="both", expand=True, padx=8, pady=(0, 6))
# Treeview for results: columns will be reconfigured per-action
self.results_columns = ("name", "path")
self.results_tree = ttk.Treeview(
results_frame, columns=self.results_columns, show="headings"
)
# initialize sort state
self._sort_column = None
self._sort_reverse = False
# Configure columns via helper to enable clickable headings
self._set_results_columns(self.results_columns)
self.results_tree.grid(row=0, column=0, sticky="nsew")
vsb_r = ttk.Scrollbar(
results_frame, orient="vertical", command=self.results_tree.yview
)
vsb_r.grid(row=0, column=1, sticky="ns")
hsb_r = ttk.Scrollbar(
results_frame, orient="horizontal", command=self.results_tree.xview
)
hsb_r.grid(row=1, column=0, columnspan=1, sticky="ew")
self.results_tree.configure(yscrollcommand=vsb_r.set, xscrollcommand=hsb_r.set)
results_frame.rowconfigure(0, weight=1)
results_frame.columnconfigure(0, weight=1)
# Bind double-click per aprire diff viewer (se è un differ result)
self.results_tree.bind("<Double-Button-1>", self._on_results_double_click)
# Tooltip for column headers
self._column_tooltip = None
self._tooltip_after_id = None
self._column_tooltips = {}
self.results_tree.bind("<Motion>", self._on_tree_motion)
self.results_tree.bind("<Leave>", self._hide_column_tooltip)
# Variabile per tracciare la modalità corrente (scan, countings, metrics, differ)
self._current_mode = None
# Memorizza i path dei file per il differ
self._differ_baseline_root = None
self._differ_current_root = None
# Keep the notebook and individual tabs instantiated for compatibility (not packed)
self.notebook = ttk.Notebook(self)
# Status bar at the bottom: operation status (left) and resource monitor (right)
status_frame = ttk.Frame(self, relief=tk.SUNKEN, borderwidth=1)
status_frame.pack(fill="x", side="bottom", padx=0, pady=0)
# Left side: operation status
self.phase_var = tk.StringVar(value="Ready")
self._lbl_phase = ttk.Label(
status_frame, textvariable=self.phase_var, anchor="w"
)
self._lbl_phase.pack(side="left", padx=(6, 12))
# Right side: resource monitor
from resource_monitor import TkinterResourceMonitor
self.resource_var = tk.StringVar(value="CPU: 0% | RAM: 0 MB | Threads: 0")
self._lbl_resources = ttk.Label(
status_frame, textvariable=self.resource_var, anchor="e"
)
self._lbl_resources.pack(side="right", padx=(12, 6))
# Initialize and start resource monitor
try:
self.resource_monitor = TkinterResourceMonitor(
tk_widget=self,
string_var=self.resource_var,
poll_interval=1.0, # Update every second
)
self.resource_monitor.start()
except Exception as e:
self.log(f"Resource monitor not available: {e}", level="WARNING")
# Log frame above the status bar
log_frame = ttk.LabelFrame(self, text="Log")
log_frame.pack(fill="x", side="bottom", padx=6, pady=(0, 6))
# ScrolledText for logs (read-only)
self.log_text = ScrolledText(log_frame, height=8, wrap="word", state="disabled")
self.log_text.pack(fill="both", expand=True, padx=6, pady=6)
# Initialize centralized logging system using tkinter_logger submodule
# Setup the logger system and attach to the ScrolledText widget
try:
self.logger_system = TkinterLogger(self)
self.logger_system.setup(enable_console=False)
# Add Tkinter handler with custom colors
color_map = {
logging.INFO: "black",
logging.WARNING: "#d87f0a",
logging.ERROR: "#d62728",
}
self.logger_system.add_tkinter_handler(
self.log_text, level_colors=color_map
)
except Exception:
pass
# small helper: expose a convenient log method that forwards to
# the standard logging system so messages flow through the queue.
def log(self, msg: str, level: str = "INFO"):
lg = get_logger("pyucc")
lvl = getattr(logging, level.upper(), logging.INFO)
try:
if lvl >= logging.ERROR:
lg.error(msg)
elif lvl >= logging.WARNING:
lg.warning(msg)
else:
lg.info(msg)
except Exception:
try:
print(f"[{level}] {msg}")
except Exception:
pass
self.log = log.__get__(self)
# Worker manager (background task runner)
self.worker = WorkerManager()
# Track task names for logging
self._task_names = {}
# Initialize handlers (refactored to separate modules)
self.action_handlers = ActionHandlers(self)
self.progress_handlers = ProgressHandlers(self)
self.export_handler = ExportHandler(self)
# small helper to set human-readable phase in the status bar
def _set_phase(self, text: str):
try:
self.phase_var.set(text)
except Exception:
pass
self._set_phase = _set_phase.__get__(self)
# Tabs removed — functionality is provided by the unified Actions/Results UI
# poll the worker UI queue and dispatch callbacks in the main thread (50ms for better responsiveness)
self.after(50, self._poll_worker_ui_queue)
def _disable_action_buttons(self):
"""Disable all action buttons except Cancel when an operation is running."""
try:
self.scan_btn.config(state="disabled")
self.count_btn.config(state="disabled")
self.metrics_btn.config(state="disabled")
self.differ_btn.config(state="disabled")
self.export_btn.config(state="disabled")
self.cancel_btn.config(state="normal")
except Exception:
pass
def _enable_action_buttons(self):
"""Re-enable all action buttons when operation is complete."""
try:
self.scan_btn.config(state="normal")
self.count_btn.config(state="normal")
self.metrics_btn.config(state="normal")
self.differ_btn.config(state="normal")
# Export button state depends on whether we have results
if self.results_tree.get_children():
self.export_btn.config(state="normal")
else:
self.export_btn.config(state="disabled")
self.cancel_btn.config(state="disabled")
except Exception:
pass
def _set_results_columns(self, cols, tooltips=None):
"""Reconfigure the shared results tree to use given columns.
Cols is an iterable of (col_id, ...) where col_id are string keys.
tooltips is an optional dict mapping col_id to tooltip text.
"""
# Clear existing
for c in self.results_tree.get_children(""):
self.results_tree.delete(c)
# reconfigure columns
self.results_tree.config(columns=cols)
self._column_tooltips = tooltips or {}
for cid in cols:
# set heading text and attach click handler for sorting
self.results_tree.heading(
cid, text=cid.title(), command=lambda c=cid: self._on_heading_click(c)
)
self.results_tree.column(cid, width=120, anchor="w")
def _on_tree_motion(self, event):
"""Show tooltip when hovering over column headers."""
region = self.results_tree.identify_region(event.x, event.y)
if region == "heading":
column = self.results_tree.identify_column(event.x)
col_index = int(column.replace("#", "")) - 1
cols = self.results_tree["columns"]
if 0 <= col_index < len(cols):
col_id = cols[col_index]
tooltip_text = self._column_tooltips.get(col_id)
if tooltip_text:
if self._tooltip_after_id:
self.after_cancel(self._tooltip_after_id)
self._tooltip_after_id = self.after(
1000, lambda: self._show_column_tooltip(event, tooltip_text)
)
else:
self._hide_column_tooltip()
else:
self._hide_column_tooltip()
def _show_column_tooltip(self, event, text):
"""Display tooltip near mouse position."""
self._hide_column_tooltip()
x, y = event.x_root + 10, event.y_root + 10
self._column_tooltip = tk.Toplevel(self)
self._column_tooltip.wm_overrideredirect(True)
self._column_tooltip.wm_geometry(f"+{x}+{y}")
label = tk.Label(
self._column_tooltip,
text=text,
justify="left",
background="#ffffe0",
relief="solid",
borderwidth=1,
font=("TkDefaultFont", 9, "normal"),
)
label.pack()
def _hide_column_tooltip(self, event=None):
"""Hide the tooltip."""
if self._tooltip_after_id:
self.after_cancel(self._tooltip_after_id)
self._tooltip_after_id = None
if self._column_tooltip:
self._column_tooltip.destroy()
self._column_tooltip = None
def _on_heading_click(self, col_id):
"""Handle click on a column heading: sort rows by that column.
Click toggles between ascending and descending when repeated on same column.
"""
cols = list(self.results_tree["columns"])
try:
col_index = cols.index(col_id)
except ValueError:
return
# toggle or set sort order
if self._sort_column == col_id:
self._sort_reverse = not self._sort_reverse
else:
self._sort_column = col_id
self._sort_reverse = False
# collect items and values
items = list(self.results_tree.get_children(""))
def convert_val(v):
if v is None:
return ""
try:
# try integer
return int(v)
except Exception:
try:
return float(v)
except Exception:
return str(v).lower()
decorated = []
for it in items:
try:
val = self.results_tree.set(it, col_id)
except Exception:
val = ""
decorated.append((convert_val(val), it))
decorated.sort(key=lambda x: x[0], reverse=self._sort_reverse)
# reposition items
for index, (_val, it) in enumerate(decorated):
try:
self.results_tree.move(it, "", index)
except Exception:
pass
# update heading visuals (arrow)
for cid in cols:
txt = cid.title()
if cid == self._sort_column:
txt += "" if not self._sort_reverse else ""
try:
self.results_tree.heading(cid, text=txt, command=lambda c=cid: self._on_heading_click(c))
except Exception:
pass
def _on_results_double_click(self, event):
"""Gestisce il doppio click su una riga della tabella results."""
# Ottieni la riga selezionata
selection = self.results_tree.selection()
if not selection:
return
item = selection[0]
values = self.results_tree.item(item, "values")
if not values:
return
# Gestisci in base alla modalità corrente
if self._current_mode in ("scan", "countings", "metrics"):
# In modalità scan/counting/metrics, apri il FileViewer
# Il path del file è nella seconda colonna (indice 1)
if len(values) < 2:
return
file_path = values[1] # La colonna "path" contiene il path completo
if not file_path or not os.path.exists(file_path):
messagebox.showwarning("File Viewer", f"File not found: {file_path}")
return
# Apri il file viewer
try:
FileViewer(self, file_path)
except Exception as e:
messagebox.showerror(
"File Viewer Error", f"Failed to open file viewer: {e}"
)
self.log(f"File viewer error: {e}", level="ERROR")
elif self._current_mode == "differ":
# In modalità differ, apri il DiffViewer
if len(values) < 2:
return
fileA = values[0] # Path del file nella baseline
fileB = values[1] # Path del file corrente
# Costruisci i path completi
file_a_path = None
file_b_path = None
if fileA and self._differ_baseline_root:
file_a_path = os.path.join(self._differ_baseline_root, fileA)
if not os.path.exists(file_a_path):
file_a_path = None
if fileB and self._differ_current_root:
file_b_path = os.path.join(self._differ_current_root, fileB)
if not os.path.exists(file_b_path):
file_b_path = None
# Verifica che almeno uno dei due file esista
if not file_a_path and not file_b_path:
messagebox.showwarning(
"Diff Viewer", "Neither baseline nor current file exists."
)
return
# Apri il diff viewer
try:
from .diff_viewer import show_diff_viewer
show_diff_viewer(
self,
file_a_path,
file_b_path,
title_a="Baseline",
title_b="Current",
)
except Exception as e:
messagebox.showerror(
"Diff Viewer Error", f"Failed to open diff viewer: {e}"
)
self.log(f"Diff viewer error: {e}", level="ERROR")
def _on_action_scan(self):
"""Delegate scan action to ActionHandlers."""
self.action_handlers.handle_scan()
def _on_action_countings(self):
"""Delegate countings action to ActionHandlers."""
self.action_handlers.handle_countings()
def _on_action_metrics(self):
"""Delegate metrics action to ActionHandlers."""
self.action_handlers.handle_metrics()
def _on_action_differ(self):
"""Delegate differ action to ActionHandlers."""
self.action_handlers.handle_differ()
# Old differ implementation moved to action_handlers.py
def _on_action_cancel(self):
self.log("Action: Cancel requested", level="WARNING")
if getattr(self, "_current_task_id", None):
try:
self.worker.cancel(self._current_task_id)
except Exception:
pass
self._current_task_id = None
self._set_phase("Idle")
self._enable_action_buttons()
def _on_action_export(self):
"""Delegate export action to ExportHandler."""
self.export_handler.export_to_csv()
def _resolve_profile_and_filters(self):
"""Return (paths, allowed_exts, ignore_patterns, profile) for current selection."""
pr = getattr(self.topbar, "current_profile", None)
if not pr:
pname = getattr(self.topbar, "profile_var", None)
if pname:
from ..config import profiles as profiles_cfg
pr = (
profiles_cfg.find_profile(pname.get())
if hasattr(pname, "get")
else profiles_cfg.find_profile(str(pname))
)
paths = []
if pr:
paths = pr.get("paths") or []
else:
p = getattr(self.topbar, "path_var", None)
if p:
val = p.get().strip() if hasattr(p, "get") else str(p).strip()
if val:
paths = [val]
# build allowed extensions from profile languages (if any)
allowed_exts = None
if pr:
langs = pr.get("languages", []) or []
exts = []
for ln in langs:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
else:
val = ln.strip()
if val.startswith("."):
exts.append(val.lower())
elif len(val) <= 5 and not val.isalpha():
exts.append(f".{val.lower()}")
else:
pass
if exts:
allowed_exts = set(exts)
ignore_patterns = pr.get("ignore", []) if pr else None
return paths, allowed_exts, ignore_patterns, pr
def _scan_done(self, results):
# results: list of Path
try:
files = results or []
for p in self.results_tree.get_children(""):
self.results_tree.delete(p)
for f in files:
try:
self.results_tree.insert("", "end", values=(f.name, str(f)))
except Exception:
pass
self._current_task_id = None
self._set_phase("Idle")
self._enable_action_buttons()
except Exception as e:
messagebox.showerror("Scan Error", str(e))
self._set_phase("Idle")
self._enable_action_buttons()
def _on_count_progress(self, res):
"""Delegate count progress to ProgressHandlers."""
self.progress_handlers.on_count_progress(res)
def _on_count_done(self, results):
"""Delegate count done to ProgressHandlers."""
self.progress_handlers.on_count_done(results)
def _on_metrics_progress(self, res):
"""Delegate metrics progress to ProgressHandlers."""
self.progress_handlers.on_metrics_progress(res)
def _on_metrics_done(self, results):
"""Delegate metrics done to ProgressHandlers."""
self.progress_handlers.on_metrics_done(results)
def _poll_worker_ui_queue(self):
try:
while True:
msg = self.worker.ui_queue.get_nowait()
# allow GUI to update task list and status
self._handle_worker_msg(msg)
# dispatch registered callbacks
try:
self.worker.dispatch_message(msg)
except Exception:
pass
except Exception:
# queue empty or other; schedule next poll
pass
self.after(50, self._poll_worker_ui_queue)
def _handle_worker_msg(self, msg: tuple):
"""Log worker messages to the application's log so there's a single
place (the Log panel) to follow task lifecycle and results.
"""
typ, task_id, payload = msg
if typ == "started":
name = payload.get("func") if isinstance(payload, dict) else str(payload)
# Store task name for later use
self._task_names[task_id] = name
self.log(f"Task {task_id[:8]} started: {name}", level="INFO")
elif typ == "progress":
# payload is expected to be a partial result or status dictionary
# Progress updates can be very frequent and slow down the GUI
# so we skip logging them to avoid clutter
pass # Skip progress logging
elif typ == "done":
# Retrieve task name from stored mapping
name = self._task_names.get(task_id, "unknown")
self.log(f"Task {task_id[:8]} {name} done.", level="INFO")
# Clean up task name from memory
self._task_names.pop(task_id, None)
elif typ == "error":
# payload is typically a traceback string or exception info
name = self._task_names.get(task_id, "unknown")
self.log(f"Task {task_id[:8]} {name} error: {payload}", level="ERROR")
# Clean up task name from memory
self._task_names.pop(task_id, None)
elif typ == "cancelled":
name = self._task_names.get(task_id, "unknown")
self.log(f"Task {task_id[:8]} {name} cancelled", level="WARNING")
# Clean up task name from memory
self._task_names.pop(task_id, None)
def on_closing(self):
"""Cleanup when closing the application."""
try:
# Stop resource monitor
if hasattr(self, "resource_monitor"):
self.resource_monitor.stop()
except Exception:
pass
try:
# Shutdown logger system
if hasattr(self, "logger_system"):
self.logger_system.shutdown()
except Exception:
pass
self.destroy()
def run_app():
app = App()
app.protocol("WM_DELETE_WINDOW", app.on_closing)
app.mainloop()
if __name__ == "__main__":
run_app()