SXXXXXXX_PyUCC/pyucc/gui/gui.py

1031 lines
37 KiB
Python

import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from tkinter.scrolledtext import ScrolledText
from pathlib import Path
import threading
import queue
import csv
from datetime import datetime
import os
import time
from ..core.scanner import find_source_files
from ..config.languages import LANGUAGE_EXTENSIONS
from tkinter_logger import TkinterLogger, get_logger
import logging
from .topbar import TopBar
from .file_viewer import FileViewer
from ..core.worker import WorkerManager
from ..core.differ import BaselineManager, Differ
from ..config import settings as app_settings
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("PyUcc - Interfaccia Grafica")
self.geometry("1024x700")
# Shared top bar (profile selection) placed above the actions
self.topbar = TopBar(self)
self.topbar.pack(fill="x", side="top")
# Centralized Actions frame: single place with buttons for Scan / Countings / Metrics
actions_frame = ttk.LabelFrame(self, text="Actions")
actions_frame.pack(fill="x", side="top", padx=8, pady=6)
# Buttons for the various functions. Handlers are implemented below.
self.scan_btn = ttk.Button(actions_frame, text="Scan", command=self._on_action_scan)
self.scan_btn.grid(row=0, column=0, padx=(6,4))
self.count_btn = ttk.Button(actions_frame, text="Countings", command=self._on_action_countings)
self.count_btn.grid(row=0, column=1, padx=(4,4))
self.metrics_btn = ttk.Button(actions_frame, text="Metrics", command=self._on_action_metrics)
self.metrics_btn.grid(row=0, column=2, padx=(4,4))
self.differ_btn = ttk.Button(actions_frame, text="Differing", command=self._on_action_differ)
self.differ_btn.grid(row=0, column=3, padx=(4,4))
self.cancel_btn = ttk.Button(actions_frame, text="Cancel", command=self._on_action_cancel, state="disabled")
self.cancel_btn.grid(row=0, column=4, padx=(20,4))
self.export_btn = ttk.Button(actions_frame, text="Export CSV", command=self._on_action_export, state="disabled")
self.export_btn.grid(row=0, column=5, padx=(4,4))
# Progress frame: progress bar and file counter below Actions
progress_frame = ttk.Frame(self)
progress_frame.pack(fill="x", side="top", padx=8, pady=(0,6))
ttk.Label(progress_frame, text="Progress:").pack(side="left", padx=(6,6))
self.progress = ttk.Progressbar(progress_frame, mode="determinate")
self.progress.pack(side="left", fill="x", expand=True, padx=(0,12))
self._lbl_files = ttk.Label(progress_frame, text="Files: 0/0")
self._lbl_files.pack(side="right", padx=(6,6))
# Results frame contains a single Treeview that will be reused by all actions
results_frame = ttk.LabelFrame(self, text="Results")
results_frame.pack(fill="both", expand=True, padx=8, pady=(0,6))
# Treeview for results: columns will be reconfigured per-action
self.results_columns = ("name", "path")
self.results_tree = ttk.Treeview(results_frame, columns=self.results_columns, show="headings")
self.results_tree.heading("name", text="File")
self.results_tree.heading("path", text="Path")
self.results_tree.column("name", width=400, anchor="w")
self.results_tree.column("path", width=600, anchor="w")
self.results_tree.grid(row=0, column=0, sticky="nsew")
vsb_r = ttk.Scrollbar(results_frame, orient="vertical", command=self.results_tree.yview)
vsb_r.grid(row=0, column=1, sticky="ns")
hsb_r = ttk.Scrollbar(results_frame, orient="horizontal", command=self.results_tree.xview)
hsb_r.grid(row=1, column=0, columnspan=1, sticky="ew")
self.results_tree.configure(yscrollcommand=vsb_r.set, xscrollcommand=hsb_r.set)
results_frame.rowconfigure(0, weight=1)
results_frame.columnconfigure(0, weight=1)
# Keep the notebook and individual tabs instantiated for compatibility (not packed)
self.notebook = ttk.Notebook(self)
# Status bar at the bottom: operation status (left) and resource monitor (right)
status_frame = ttk.Frame(self, relief=tk.SUNKEN, borderwidth=1)
status_frame.pack(fill="x", side="bottom", padx=0, pady=0)
# Left side: operation status
self.phase_var = tk.StringVar(value="Ready")
self._lbl_phase = ttk.Label(status_frame, textvariable=self.phase_var, anchor="w")
self._lbl_phase.pack(side="left", padx=(6,12))
# Right side: resource monitor
from resource_monitor import TkinterResourceMonitor
self.resource_var = tk.StringVar(value="CPU: 0% | RAM: 0 MB | Threads: 0")
self._lbl_resources = ttk.Label(status_frame, textvariable=self.resource_var, anchor="e")
self._lbl_resources.pack(side="right", padx=(12,6))
# Initialize and start resource monitor
try:
self.resource_monitor = TkinterResourceMonitor(
tk_widget=self,
string_var=self.resource_var,
poll_interval=1.0 # Update every second
)
self.resource_monitor.start()
except Exception as e:
self.log(f"Resource monitor not available: {e}", level="WARNING")
# Log frame above the status bar
log_frame = ttk.LabelFrame(self, text="Log")
log_frame.pack(fill="x", side="bottom", padx=6, pady=(0,6))
# ScrolledText for logs (read-only)
self.log_text = ScrolledText(log_frame, height=8, wrap="word", state="disabled")
self.log_text.pack(fill="both", expand=True, padx=6, pady=6)
# Initialize centralized logging system using tkinter_logger submodule
# Setup the logger system and attach to the ScrolledText widget
try:
self.logger_system = TkinterLogger(self)
self.logger_system.setup(enable_console=False)
# Add Tkinter handler with custom colors
color_map = {
logging.INFO: 'black',
logging.WARNING: '#d87f0a',
logging.ERROR: '#d62728',
}
self.logger_system.add_tkinter_handler(self.log_text, level_colors=color_map)
except Exception:
pass
# small helper: expose a convenient log method that forwards to
# the standard logging system so messages flow through the queue.
def log(self, msg: str, level: str = "INFO"):
lg = get_logger("pyucc")
lvl = getattr(logging, level.upper(), logging.INFO)
try:
if lvl >= logging.ERROR:
lg.error(msg)
elif lvl >= logging.WARNING:
lg.warning(msg)
else:
lg.info(msg)
except Exception:
try:
print(f"[{level}] {msg}")
except Exception:
pass
self.log = log.__get__(self)
# Worker manager (background task runner)
self.worker = WorkerManager()
# small helper to set human-readable phase in the status bar
def _set_phase(self, text: str):
try:
self.phase_var.set(text)
except Exception:
pass
self._set_phase = _set_phase.__get__(self)
# Tabs removed — functionality is provided by the unified Actions/Results UI
# poll the worker UI queue and dispatch callbacks in the main thread
self.after(100, self._poll_worker_ui_queue)
def _set_results_columns(self, cols):
"""Reconfigure the shared results tree to use given columns.
Cols is an iterable of (col_id, ...) where col_id are string keys.
"""
# Clear existing
for c in self.results_tree.get_children(""):
self.results_tree.delete(c)
# reconfigure columns
self.results_tree.config(columns=cols)
for cid in cols:
self.results_tree.heading(cid, text=cid.title())
self.results_tree.column(cid, width=120, anchor='w')
def _on_action_scan(self):
"""Placeholder: run scan across profile paths and populate results tree.
Actual worker invocation will be implemented in subsequent steps.
"""
self.log("Action: Scan started", level='INFO')
self._set_phase("Scanning...")
# reset counters/progress
self._processed_files = 0
self._total_files = 0
try:
self.progress['maximum'] = 1
self.progress['value'] = 0
self._lbl_files.config(text="Files: 0/0")
except Exception:
pass
self._set_results_columns(("name", "path"))
for c in self.results_tree.get_children(""):
self.results_tree.delete(c)
self.export_btn.config(state='disabled')
# force GUI update before starting heavy work
self.update_idletasks()
# resolve profile/paths and filters
paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters()
if not paths:
messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.")
return
# submit scan task
def _scan_task(paths):
files = []
from pathlib import Path
for p in paths:
pth = Path(p)
try:
if pth.is_dir():
files.extend(find_source_files(pth, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns))
elif pth.is_file():
files.append(pth)
except Exception:
continue
# deduplicate
seen = set()
unique = []
for f in files:
s = str(f)
if s not in seen:
seen.add(s)
unique.append(f)
return unique
self._current_task_id = self.worker.submit(_scan_task, paths, kind='thread', on_done=self._scan_done)
self.cancel_btn.config(state='normal')
def _on_action_countings(self):
self.log("Action: Countings started", level='INFO')
self._set_phase("Counting...")
# reset counters/progress
self._processed_files = 0
self._total_files = 0
try:
self.progress['maximum'] = 1
self.progress['value'] = 0
self._lbl_files.config(text="Files: 0/0")
except Exception:
pass
self._set_results_columns(("name", "path", "code", "comment", "blank", "total", "language"))
for c in self.results_tree.get_children(""):
self.results_tree.delete(c)
self.export_btn.config(state='disabled')
# force GUI update before starting heavy work
self.update_idletasks()
paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters()
if not paths:
messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.")
return
# build targets by scanning paths synchronously in a background submit then map counts
def _gather_files(paths):
files = []
from pathlib import Path
for p in paths:
pth = Path(p)
try:
if pth.is_dir():
files.extend(find_source_files(pth, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns))
elif pth.is_file():
files.append(pth)
except Exception:
continue
# deduplicate
seen = set()
unique = []
for f in files:
s = str(f)
if s not in seen:
seen.add(s)
unique.append(f)
return unique
# submit gather as blocking task and then map analyze_file_counts
self._set_phase("Gathering files...")
self.update_idletasks()
def _on_gather_done(results):
files = results or []
if not files:
messagebox.showinfo("Countings", "No files found to analyze.")
self.cancel_btn.config(state='disabled')
self._set_phase('Idle')
return
# import analyzer
from ..core.countings_impl import analyze_file_counts
# prepare progress handlers
self._total_files = len(files)
self._processed_files = 0
try:
self.progress['maximum'] = max(1, self._total_files)
self.progress['value'] = 0
self._lbl_files.config(text=f"Files: 0/{self._total_files}")
except Exception:
pass
self._set_phase(f"Analyzing {self._total_files} files...")
self.update_idletasks()
# map iterate
self._current_task_id = self.worker.map_iterable(func=analyze_file_counts, items=files, kind='thread', on_progress=self._on_count_progress, on_done=self._on_count_done)
# submit gather
self._current_task_id = self.worker.submit(_gather_files, paths, kind='thread', on_done=_on_gather_done)
self.cancel_btn.config(state='normal')
def _on_action_metrics(self):
self.log("Action: Metrics started", level='INFO')
self._set_phase("Computing metrics...")
# reset counters/progress
self._processed_files = 0
self._total_files = 0
try:
self.progress['maximum'] = 1
self.progress['value'] = 0
self._lbl_files.config(text="Files: 0/0")
except Exception:
pass
self._set_results_columns(("name", "path", "avg_cc", "max_cc", "func_count", "mi", "language"))
for c in self.results_tree.get_children(""):
self.results_tree.delete(c)
self.export_btn.config(state='disabled')
# force GUI update before starting heavy work
self.update_idletasks()
paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters()
if not paths:
messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.")
return
# gather files then map metrics analyzer
def _gather_files(paths):
files = []
from pathlib import Path
for p in paths:
pth = Path(p)
try:
if pth.is_dir():
files.extend(find_source_files(pth, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns))
elif pth.is_file():
files.append(pth)
except Exception:
continue
# deduplicate
seen = set()
unique = []
for f in files:
s = str(f)
if s not in seen:
seen.add(s)
unique.append(f)
return unique
# submit file gathering task with status update
self._set_phase("Gathering files...")
self.update_idletasks()
def _on_gather_done(results):
files = results or []
if not files:
messagebox.showinfo("Metrics", "No files found to analyze.")
self.cancel_btn.config(state='disabled')
self._set_phase('Idle')
return
# use core metrics analyzer
try:
from ..core.metrics import analyze_file_metrics as analyzer
except Exception:
messagebox.showerror("Metrics", "Metrics analyzer not available")
self._set_phase('Idle')
return
self._total_files = len(files)
self._processed_files = 0
try:
self.progress['maximum'] = max(1, self._total_files)
self.progress['value'] = 0
self._lbl_files.config(text=f"Files: 0/{self._total_files}")
except Exception:
pass
self._set_phase(f"Computing metrics for {self._total_files} files...")
self.update_idletasks()
self._current_task_id = self.worker.map_iterable(func=analyzer, items=files, kind='thread', on_progress=self._on_metrics_progress, on_done=self._on_metrics_done)
self._current_task_id = self.worker.submit(_gather_files, paths, kind='thread', on_done=_on_gather_done)
self.cancel_btn.config(state='normal')
def _on_action_differ(self):
"""Run differ against existing baseline (or create baseline if none).
This runs the heavy work in background via WorkerManager and uses
on_done callbacks to update the GUI in main thread.
"""
self.log("Action: Differing started", level='INFO')
self._set_phase("Differing...")
self._set_results_columns(("fileA", "fileB", "added", "deleted", "modified", "unmodified", "Δ code_lines", "Δ comment_lines", "Δ blank_lines", "Δ func_count", "Δ avg_cc", "Δ mi"))
for c in self.results_tree.get_children(""):
self.results_tree.delete(c)
self.export_btn.config(state='disabled')
try:
self._lbl_files.config(text="Files: 0/0")
self.progress['maximum'] = 1
self.progress['value'] = 0
except Exception:
pass
# force GUI update before starting heavy work
self.update_idletasks()
paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters()
if not paths:
messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.")
return
project = paths[0]
self._set_phase("Loading baselines...")
self.update_idletasks()
bm = BaselineManager(project)
baselines = bm.list_baselines()
profile_name = pr.get('name') if pr else None
max_keep = app_settings.get_max_keep()
# helper callbacks
def _on_create_done(baseline_id):
try:
messagebox.showinfo("Differing", f"Baseline created: {baseline_id}")
except Exception:
pass
def _on_diff_done(result):
# result is the dict produced by Differ.diff()
try:
total = result.get('total', {})
self.log(f"Differ finished: added={total.get('added',0)} deleted={total.get('deleted',0)} modified={total.get('modified',0)}", level='INFO')
# populate results tree
for pair in result.get('pairs', []):
fileA = pair.get('fileA') or ''
fileB = pair.get('fileB') or ''
counts = pair.get('counts', {})
# Extract countings and metrics deltas
countings_delta = pair.get('countings_delta')
metrics_delta = pair.get('metrics_delta')
# Format delta values: show actual values if available, empty string if not
def format_delta(value, is_float=False):
if value is None:
return ''
if is_float:
formatted = f"{value:.2f}"
if value > 0:
return f"+{formatted}"
return formatted
else:
if value > 0:
return f"+{value}"
return str(value)
delta_code = format_delta(countings_delta.get('code_lines') if countings_delta else None)
delta_comment = format_delta(countings_delta.get('comment_lines') if countings_delta else None)
delta_blank = format_delta(countings_delta.get('blank_lines') if countings_delta else None)
delta_func = format_delta(metrics_delta.get('func_count') if metrics_delta else None)
delta_cc = format_delta(metrics_delta.get('avg_cc') if metrics_delta else None, is_float=True)
delta_mi = format_delta(metrics_delta.get('mi') if metrics_delta else None, is_float=True)
self.results_tree.insert('', 'end', values=(fileA, fileB, counts.get('added',0), counts.get('deleted',0), counts.get('modified',0), counts.get('unmodified',0), delta_code, delta_comment, delta_blank, delta_func, delta_cc, delta_mi))
self.export_btn.config(state='normal')
self.cancel_btn.config(state='disabled')
self._current_task_id = None
self._set_phase('Idle')
# After a successful diff, save the current workspace as a new baseline
try:
# schedule baseline creation in background so UI remains responsive
self.worker.submit(bm.create_baseline_from_dir, project, None, True, True, ignore_patterns, profile_name, max_keep, kind='thread', on_done=_on_create_done)
except Exception:
pass
except Exception as e:
messagebox.showerror('Differ Error', str(e))
# If no baseline exists: ask to create (run create in background)
if not baselines:
create = messagebox.askyesno("Differing", "No baseline found for this project. Create baseline now?")
if not create:
self._set_phase('Idle')
return
# submit create baseline task, pass ignore_patterns and profile name from profile
self._set_phase("Creating baseline...")
self.update_idletasks()
profile_name = pr.get('name') if pr else None
self._current_task_id = self.worker.submit(bm.create_baseline_from_dir, project, None, True, True, ignore_patterns, profile_name, 5, kind='thread', on_done=_on_create_done)
self.cancel_btn.config(state='normal')
return
# If baselines exist: ask user to pick one (default: latest)
if baselines:
sorted_b = sorted(baselines)
# modal selection dialog
dlg = tk.Toplevel(self)
dlg.title("Select baseline to compare")
dlg.transient(self)
dlg.grab_set()
# center dialog over parent window
dlg.update_idletasks()
pw = self.winfo_width()
ph = self.winfo_height()
px = self.winfo_rootx()
py = self.winfo_rooty()
dw = dlg.winfo_reqwidth()
dh = dlg.winfo_reqheight()
x = px + (pw - dw) // 2
y = py + (ph - dh) // 2
dlg.geometry(f"+{x}+{y}")
ttk.Label(dlg, text="Select a baseline:").grid(row=0, column=0, padx=12, pady=(12,4), sticky='w')
listbox = tk.Listbox(dlg, height=min(10, len(sorted_b)), width=60, exportselection=False)
for item in sorted_b:
listbox.insert('end', item)
listbox.grid(row=1, column=0, padx=12, pady=(0,12))
# default select latest
listbox.select_set(len(sorted_b)-1)
selected = {'id': None}
def _on_ok():
sel = listbox.curselection()
if not sel:
messagebox.showwarning('Select baseline', 'Please select a baseline to compare')
return
selected['id'] = listbox.get(sel[0])
dlg.destroy()
def _on_cancel():
selected['id'] = None
dlg.destroy()
btn_frame = ttk.Frame(dlg)
btn_frame.grid(row=2, column=0, pady=(0,12), sticky='e', padx=12)
ok_btn = ttk.Button(btn_frame, text='OK', command=_on_ok)
ok_btn.grid(row=0, column=0, padx=(0,8))
cancel_btn = ttk.Button(btn_frame, text='Cancel', command=_on_cancel)
cancel_btn.grid(row=0, column=1)
# double-click to accept
def _on_dbl(evt):
idx = listbox.curselection()
if idx:
selected['id'] = listbox.get(idx[0])
dlg.destroy()
listbox.bind('<Double-Button-1>', _on_dbl)
# wait for dialog
self.wait_window(dlg)
chosen = selected.get('id')
if not chosen:
self._set_phase('Idle')
return
try:
meta = bm.load_metadata(chosen)
self.log(f"Loaded baseline: {chosen}", level='INFO')
except Exception as e:
messagebox.showerror("Differing", f"Failed to load baseline metadata: {e}")
self._set_phase('Idle')
return
# Build pairs and process them in parallel with per-item progress updates
differ = Differ(meta, project, ignore_patterns=ignore_patterns) # build lists (may take a bit) - done in main thread for simplicity
self._set_phase("Scanning current files...")
self.update_idletasks()
try:
current_files = differ.build_current_file_list()
except Exception:
messagebox.showerror('Differing', 'Failed to scan current project files')
self._set_phase('Idle')
return
self._set_phase("Matching files...")
self.update_idletasks()
pairs = differ.match_files(meta.files, current_files)
# per-pair processing function
def _process_pair(pair):
a, b = pair
res = {
'fileA': a.path if a is not None else None,
'fileB': b.path if b is not None else None,
'counts': None,
'file_meta': None,
'baseline_countings': None,
'current_countings': None,
'countings_delta': None,
'baseline_metrics': None,
'current_metrics': None,
'metrics_delta': None,
}
# diff counts
fa = os.path.join(meta.project_root, a.path) if a is not None else None
fb = os.path.join(project, b.path) if b is not None else None
res['counts'] = Differ._diff_file_pair(fa, fb)
# Extract baseline countings and metrics from file a
if a is not None:
if hasattr(a, 'countings') and a.countings:
res['baseline_countings'] = a.countings
if hasattr(a, 'metrics') and a.metrics:
res['baseline_metrics'] = a.metrics
# gather file metadata for the current file (b) if exists
if b is not None:
file_entry = {
'path': b.path,
'size': b.size,
'mtime': b.mtime,
'sha1': b.sha1,
}
# run countings and metrics for current file
try:
from ..core.countings_impl import analyze_file_counts
except Exception:
analyze_file_counts = None
try:
from ..core.metrics import analyze_file_metrics
except Exception:
analyze_file_metrics = None
if analyze_file_counts:
try:
from pathlib import Path as _Path
counts = analyze_file_counts(_Path(os.path.join(project, b.path)))
file_entry['countings'] = {
'physical_lines': int(counts.get('physical_lines', 0)),
'code_lines': int(counts.get('code_lines', 0)),
'comment_lines': int(counts.get('comment_lines', 0)),
'blank_lines': int(counts.get('blank_lines', 0)),
}
res['current_countings'] = file_entry['countings']
except Exception:
file_entry['countings'] = None
if analyze_file_metrics:
try:
metrics = analyze_file_metrics(os.path.join(project, b.path))
file_entry['metrics'] = {
'avg_cc': metrics.get('avg_cc', 0.0),
'max_cc': int(metrics.get('max_cc', 0)),
'func_count': int(metrics.get('func_count', 0)),
'mi': metrics.get('mi', 0.0),
}
res['current_metrics'] = file_entry['metrics']
except Exception:
file_entry['metrics'] = None
res['file_meta'] = file_entry
# Compute deltas
if res['baseline_countings'] and res['current_countings']:
res['countings_delta'] = {
'physical_lines': res['current_countings']['physical_lines'] - res['baseline_countings'].get('physical_lines', 0),
'code_lines': res['current_countings']['code_lines'] - res['baseline_countings'].get('code_lines', 0),
'comment_lines': res['current_countings']['comment_lines'] - res['baseline_countings'].get('comment_lines', 0),
'blank_lines': res['current_countings']['blank_lines'] - res['baseline_countings'].get('blank_lines', 0),
}
if res['baseline_metrics'] and res['current_metrics']:
res['metrics_delta'] = {
'func_count': res['current_metrics']['func_count'] - res['baseline_metrics'].get('func_count', 0),
'avg_cc': res['current_metrics']['avg_cc'] - res['baseline_metrics'].get('avg_cc', 0.0),
'max_cc': res['current_metrics']['max_cc'] - res['baseline_metrics'].get('max_cc', 0),
'mi': res['current_metrics']['mi'] - res['baseline_metrics'].get('mi', 0.0),
}
return res
# progress handler: called on each completed item (in main thread via dispatch)
def _on_progress(item_res):
# update progress bar and tree with incremental row
try:
fileA = item_res.get('fileA') or ''
fileB = item_res.get('fileB') or ''
counts = item_res.get('counts') or {}
# Extract countings and metrics deltas
countings_delta = item_res.get('countings_delta')
metrics_delta = item_res.get('metrics_delta')
# Format delta values
def format_delta(value, is_float=False):
if value is None:
return ''
if is_float:
formatted = f"{value:.2f}"
if value > 0:
return f"+{formatted}"
return formatted
else:
if value > 0:
return f"+{value}"
return str(value)
delta_code = format_delta(countings_delta.get('code_lines') if countings_delta else None)
delta_comment = format_delta(countings_delta.get('comment_lines') if countings_delta else None)
delta_blank = format_delta(countings_delta.get('blank_lines') if countings_delta else None)
delta_func = format_delta(metrics_delta.get('func_count') if metrics_delta else None)
delta_cc = format_delta(metrics_delta.get('avg_cc') if metrics_delta else None, is_float=True)
delta_mi = format_delta(metrics_delta.get('mi') if metrics_delta else None, is_float=True)
self.results_tree.insert('', 'end', values=(fileA, fileB, counts.get('added',0), counts.get('deleted',0), counts.get('modified',0), counts.get('unmodified',0), delta_code, delta_comment, delta_blank, delta_func, delta_cc, delta_mi))
# progress update
try:
self._processed_files = getattr(self, '_processed_files', 0) + 1
self._lbl_files.config(text=f"Files: {self._processed_files}/{len(pairs)}")
self.progress['maximum'] = max(1, len(pairs))
self.progress['value'] = self._processed_files
except Exception:
pass
except Exception:
pass
# done handler: assemble final result and call existing on_done
def _on_done(all_results):
# compute totals
total = {"added":0, "deleted":0, "modified":0, "unmodified":0}
current_files_list = []
for it in all_results:
c = it.get('counts', {})
total['added'] += c.get('added',0)
total['deleted'] += c.get('deleted',0)
total['modified'] += c.get('modified',0)
total['unmodified'] += c.get('unmodified',0)
fm = it.get('file_meta')
if fm:
current_files_list.append(fm)
result = {"baseline_id": meta.baseline_id, "compared_at": time.time(), "total": total, "pairs": all_results}
# Build summary statistics from baseline and current file metadata
try:
# Baseline summary (from meta.files which have embedded countings/metrics)
baseline_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0}
baseline_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0}
baseline_metrics_count = 0
for fm in meta.files:
if hasattr(fm, 'countings') and fm.countings:
baseline_counts["physical_lines"] += fm.countings.get("physical_lines", 0)
baseline_counts["code_lines"] += fm.countings.get("code_lines", 0)
baseline_counts["comment_lines"] += fm.countings.get("comment_lines", 0)
baseline_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
baseline_counts["file_count"] += 1
if hasattr(fm, 'metrics') and fm.metrics:
baseline_metrics["total_func_count"] += fm.metrics.get("func_count", 0)
baseline_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
baseline_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
baseline_metrics_count += 1
if baseline_metrics_count > 0:
baseline_metrics["avg_avg_cc"] /= baseline_metrics_count
baseline_metrics["avg_mi"] /= baseline_metrics_count
baseline_metrics["file_count"] = baseline_metrics_count
# Current summary (from current_files_list)
current_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0}
current_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0}
current_metrics_count = 0
for f in current_files_list:
c = f.get('countings') or {}
m = f.get('metrics') or {}
if c:
current_counts["physical_lines"] += int(c.get('physical_lines', 0))
current_counts["code_lines"] += int(c.get('code_lines', 0))
current_counts["comment_lines"] += int(c.get('comment_lines', 0))
current_counts["blank_lines"] += int(c.get('blank_lines', 0))
current_counts["file_count"] += 1
if m:
current_metrics["total_func_count"] += int(m.get('func_count', 0) or 0)
current_metrics["avg_avg_cc"] += float(m.get('avg_cc', 0.0) or 0.0)
current_metrics["avg_mi"] += float(m.get('mi', 0.0) or 0.0)
current_metrics_count += 1
if current_metrics_count > 0:
current_metrics["avg_avg_cc"] /= current_metrics_count
current_metrics["avg_mi"] /= current_metrics_count
current_metrics["file_count"] = current_metrics_count
# Compute deltas
delta_counts = {
"physical_lines": current_counts["physical_lines"] - baseline_counts["physical_lines"],
"code_lines": current_counts["code_lines"] - baseline_counts["code_lines"],
"comment_lines": current_counts["comment_lines"] - baseline_counts["comment_lines"],
"blank_lines": current_counts["blank_lines"] - baseline_counts["blank_lines"],
"file_count": current_counts["file_count"] - baseline_counts["file_count"],
}
delta_metrics = {
"total_func_count": current_metrics["total_func_count"] - baseline_metrics["total_func_count"],
"avg_avg_cc": current_metrics["avg_avg_cc"] - baseline_metrics["avg_avg_cc"],
"avg_mi": current_metrics["avg_mi"] - baseline_metrics["avg_mi"],
}
result['summary'] = {
'baseline': {'countings': baseline_counts, 'metrics': baseline_metrics},
'current': {'countings': current_counts, 'metrics': current_metrics},
'delta': {'countings': delta_counts, 'metrics': delta_metrics}
}
except Exception:
pass
# call existing on_done behavior
_on_diff_done(result)
# submit parallel per-pair tasks with progress
self._processed_files = 0
self._total_files = len(pairs)
self.progress['maximum'] = max(1, len(pairs))
self._lbl_files.config(text=f"Files: 0/{len(pairs)}")
self._set_phase(f"Analyzing {len(pairs)} file pairs...")
self.update_idletasks()
self._current_task_id = self.worker.map_iterable(_process_pair, pairs, kind='thread', on_progress=_on_progress, on_done=_on_done)
self.cancel_btn.config(state='normal')
def _on_action_cancel(self):
self.log("Action: Cancel requested", level='WARNING')
if getattr(self, '_current_task_id', None):
try:
self.worker.cancel(self._current_task_id)
except Exception:
pass
self._current_task_id = None
# reset phase to idle when finished
try:
self._set_phase("Idle")
except Exception:
pass
self.cancel_btn.config(state='disabled')
def _on_action_export(self):
from ..utils.csv_exporter import export_rows_to_csv
path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV files", "*.csv"), ("All files", "*")])
if not path:
return
headers = [c for c in self.results_tree['columns']]
rows = (self.results_tree.item(child, 'values') for child in self.results_tree.get_children())
try:
written = export_rows_to_csv(path, headers, rows)
messagebox.showinfo("Export", f"Exported {written} rows to {path}")
except Exception as e:
messagebox.showerror("Export Error", str(e))
def _resolve_profile_and_filters(self):
"""Return (paths, allowed_exts, ignore_patterns, profile) for current selection."""
pr = getattr(self.topbar, 'current_profile', None)
if not pr:
pname = getattr(self.topbar, 'profile_var', None)
if pname:
from ..config import profiles as profiles_cfg
pr = profiles_cfg.find_profile(pname.get()) if hasattr(pname, 'get') else profiles_cfg.find_profile(str(pname))
paths = []
if pr:
paths = pr.get('paths') or []
else:
p = getattr(self.topbar, 'path_var', None)
if p:
val = p.get().strip() if hasattr(p, 'get') else str(p).strip()
if val:
paths = [val]
# build allowed extensions from profile languages (if any)
allowed_exts = None
if pr:
langs = pr.get('languages', []) or []
exts = []
for ln in langs:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
else:
val = ln.strip()
if val.startswith('.'):
exts.append(val.lower())
elif len(val) <= 5 and not val.isalpha():
exts.append(f".{val.lower()}")
else:
pass
if exts:
allowed_exts = set(exts)
ignore_patterns = pr.get('ignore', []) if pr else None
return paths, allowed_exts, ignore_patterns, pr
def _scan_done(self, results):
# results: list of Path
try:
files = results or []
for p in self.results_tree.get_children(""):
self.results_tree.delete(p)
for f in files:
try:
self.results_tree.insert('', 'end', values=(f.name, str(f)))
except Exception:
pass
self.export_btn.config(state='normal' if files else 'disabled')
self.cancel_btn.config(state='disabled')
self._current_task_id = None
except Exception as e:
messagebox.showerror('Scan Error', str(e))
self.cancel_btn.config(state='disabled')
def _on_count_progress(self, res):
# res is expected to be dict from analyze_file_counts
try:
if not isinstance(res, dict):
return
file = res.get('file')
name = Path(file).name if file else ''
code = res.get('code_lines') or res.get('code') or 0
comment = res.get('comment_lines') or 0
blank = res.get('blank_lines') or 0
total = res.get('physical_lines') or 0
lang = res.get('language') or ''
self.results_tree.insert('', 'end', values=(name, file, int(code), int(comment), int(blank), int(total), lang))
# update progress
try:
self._processed_files = getattr(self, '_processed_files', 0) + 1
self._lbl_files.config(text=f"Files: {self._processed_files}/{self._total_files}")
self.progress['value'] = self._processed_files
except Exception:
pass
except Exception:
pass
def _on_count_done(self, results):
self.export_btn.config(state='normal')
self.cancel_btn.config(state='disabled')
self._current_task_id = None
try:
self._set_phase("Idle")
except Exception:
pass
def _on_metrics_progress(self, res):
try:
if not isinstance(res, dict):
return
file = res.get('file')
name = Path(file).name if file else ''
avg = res.get('avg_cc') or res.get('avg') or 0
maxcc = res.get('max_cc') or 0
func = res.get('func_count') or 0
mi = res.get('mi') or 0
lang = res.get('language') or ''
self.results_tree.insert('', 'end', values=(name, file, avg, maxcc, func, mi, lang))
self._processed_files = getattr(self, '_processed_files', 0) + 1
try:
self._lbl_files.config(text=f"Files: {self._processed_files}/{self._total_files}")
self.progress['value'] = self._processed_files
except Exception:
pass
except Exception:
pass
def _on_metrics_done(self, results):
self.export_btn.config(state='normal')
self.cancel_btn.config(state='disabled')
self._current_task_id = None
try:
self._set_phase("Idle")
except Exception:
pass
def _poll_worker_ui_queue(self):
try:
while True:
msg = self.worker.ui_queue.get_nowait()
# allow GUI to update task list and status
self._handle_worker_msg(msg)
# dispatch registered callbacks
try:
self.worker.dispatch_message(msg)
except Exception:
pass
except Exception:
# queue empty or other; schedule next poll
pass
self.after(100, self._poll_worker_ui_queue)
def _handle_worker_msg(self, msg: tuple):
"""Log worker messages to the application's log so there's a single
place (the Log panel) to follow task lifecycle and results.
"""
typ, task_id, payload = msg
if typ == "started":
name = payload.get("func") if isinstance(payload, dict) else str(payload)
self.log(f"Task {task_id[:8]} started: {name}", level="INFO")
elif typ == "progress":
# payload is expected to be a partial result or status dictionary
# Progress updates can be very frequent and slow down the GUI
# so log them at DEBUG level (hidden by default) and only
# surface warnings/errors at higher levels.
self.log(f"Task {task_id[:8]} progress: {payload}", level="DEBUG")
elif typ == "done":
# payload may contain final results
self.log(f"Task {task_id[:8]} done. Result: {payload}", level="INFO")
elif typ == "error":
# payload is typically a traceback string or exception info
self.log(f"Task {task_id[:8]} error: {payload}", level="ERROR")
elif typ == "cancelled":
self.log(f"Task {task_id[:8]} cancelled", level="WARNING")
def on_closing(self):
"""Cleanup when closing the application."""
try:
# Stop resource monitor
if hasattr(self, 'resource_monitor'):
self.resource_monitor.stop()
except Exception:
pass
try:
# Shutdown logger system
if hasattr(self, 'logger_system'):
self.logger_system.shutdown()
except Exception:
pass
self.destroy()
def run_app():
app = App()
app.protocol("WM_DELETE_WINDOW", app.on_closing)
app.mainloop()
if __name__ == "__main__":
run_app()