import tkinter as tk from tkinter import ttk, messagebox, filedialog from tkinter.scrolledtext import ScrolledText from pathlib import Path import threading import queue import csv from datetime import datetime import os import time from ..core.scanner import find_source_files from ..config.languages import LANGUAGE_EXTENSIONS from tkinter_logger import TkinterLogger, get_logger import logging from .topbar import TopBar from .file_viewer import FileViewer from ..core.worker import WorkerManager from ..core.differ import BaselineManager, Differ from ..config import settings as app_settings class App(tk.Tk): def __init__(self): super().__init__() self.title("PyUcc - Interfaccia Grafica") self.geometry("1024x700") # Shared top bar (profile selection) placed above the actions self.topbar = TopBar(self) self.topbar.pack(fill="x", side="top") # Centralized Actions frame: single place with buttons for Scan / Countings / Metrics actions_frame = ttk.LabelFrame(self, text="Actions") actions_frame.pack(fill="x", side="top", padx=8, pady=6) # Buttons for the various functions. Handlers are implemented below. self.scan_btn = ttk.Button(actions_frame, text="Scan", command=self._on_action_scan) self.scan_btn.grid(row=0, column=0, padx=(6,4)) self.count_btn = ttk.Button(actions_frame, text="Countings", command=self._on_action_countings) self.count_btn.grid(row=0, column=1, padx=(4,4)) self.metrics_btn = ttk.Button(actions_frame, text="Metrics", command=self._on_action_metrics) self.metrics_btn.grid(row=0, column=2, padx=(4,4)) self.differ_btn = ttk.Button(actions_frame, text="Differing", command=self._on_action_differ) self.differ_btn.grid(row=0, column=3, padx=(4,4)) self.cancel_btn = ttk.Button(actions_frame, text="Cancel", command=self._on_action_cancel, state="disabled") self.cancel_btn.grid(row=0, column=4, padx=(20,4)) self.export_btn = ttk.Button(actions_frame, text="Export CSV", command=self._on_action_export, state="disabled") self.export_btn.grid(row=0, column=5, padx=(4,4)) # Progress frame: progress bar and file counter below Actions progress_frame = ttk.Frame(self) progress_frame.pack(fill="x", side="top", padx=8, pady=(0,6)) ttk.Label(progress_frame, text="Progress:").pack(side="left", padx=(6,6)) self.progress = ttk.Progressbar(progress_frame, mode="determinate") self.progress.pack(side="left", fill="x", expand=True, padx=(0,12)) self._lbl_files = ttk.Label(progress_frame, text="Files: 0/0") self._lbl_files.pack(side="right", padx=(6,6)) # Results frame contains a single Treeview that will be reused by all actions results_frame = ttk.LabelFrame(self, text="Results") results_frame.pack(fill="both", expand=True, padx=8, pady=(0,6)) # Treeview for results: columns will be reconfigured per-action self.results_columns = ("name", "path") self.results_tree = ttk.Treeview(results_frame, columns=self.results_columns, show="headings") self.results_tree.heading("name", text="File") self.results_tree.heading("path", text="Path") self.results_tree.column("name", width=400, anchor="w") self.results_tree.column("path", width=600, anchor="w") self.results_tree.grid(row=0, column=0, sticky="nsew") vsb_r = ttk.Scrollbar(results_frame, orient="vertical", command=self.results_tree.yview) vsb_r.grid(row=0, column=1, sticky="ns") hsb_r = ttk.Scrollbar(results_frame, orient="horizontal", command=self.results_tree.xview) hsb_r.grid(row=1, column=0, columnspan=1, sticky="ew") self.results_tree.configure(yscrollcommand=vsb_r.set, xscrollcommand=hsb_r.set) results_frame.rowconfigure(0, weight=1) results_frame.columnconfigure(0, weight=1) # Keep the notebook and individual tabs instantiated for compatibility (not packed) self.notebook = ttk.Notebook(self) # Status bar at the bottom: operation status (left) and resource monitor (right) status_frame = ttk.Frame(self, relief=tk.SUNKEN, borderwidth=1) status_frame.pack(fill="x", side="bottom", padx=0, pady=0) # Left side: operation status self.phase_var = tk.StringVar(value="Ready") self._lbl_phase = ttk.Label(status_frame, textvariable=self.phase_var, anchor="w") self._lbl_phase.pack(side="left", padx=(6,12)) # Right side: resource monitor from resource_monitor import TkinterResourceMonitor self.resource_var = tk.StringVar(value="CPU: 0% | RAM: 0 MB | Threads: 0") self._lbl_resources = ttk.Label(status_frame, textvariable=self.resource_var, anchor="e") self._lbl_resources.pack(side="right", padx=(12,6)) # Initialize and start resource monitor try: self.resource_monitor = TkinterResourceMonitor( tk_widget=self, string_var=self.resource_var, poll_interval=1.0 # Update every second ) self.resource_monitor.start() except Exception as e: self.log(f"Resource monitor not available: {e}", level="WARNING") # Log frame above the status bar log_frame = ttk.LabelFrame(self, text="Log") log_frame.pack(fill="x", side="bottom", padx=6, pady=(0,6)) # ScrolledText for logs (read-only) self.log_text = ScrolledText(log_frame, height=8, wrap="word", state="disabled") self.log_text.pack(fill="both", expand=True, padx=6, pady=6) # Initialize centralized logging system using tkinter_logger submodule # Setup the logger system and attach to the ScrolledText widget try: self.logger_system = TkinterLogger(self) self.logger_system.setup(enable_console=False) # Add Tkinter handler with custom colors color_map = { logging.INFO: 'black', logging.WARNING: '#d87f0a', logging.ERROR: '#d62728', } self.logger_system.add_tkinter_handler(self.log_text, level_colors=color_map) except Exception: pass # small helper: expose a convenient log method that forwards to # the standard logging system so messages flow through the queue. def log(self, msg: str, level: str = "INFO"): lg = get_logger("pyucc") lvl = getattr(logging, level.upper(), logging.INFO) try: if lvl >= logging.ERROR: lg.error(msg) elif lvl >= logging.WARNING: lg.warning(msg) else: lg.info(msg) except Exception: try: print(f"[{level}] {msg}") except Exception: pass self.log = log.__get__(self) # Worker manager (background task runner) self.worker = WorkerManager() # small helper to set human-readable phase in the status bar def _set_phase(self, text: str): try: self.phase_var.set(text) except Exception: pass self._set_phase = _set_phase.__get__(self) # Tabs removed — functionality is provided by the unified Actions/Results UI # poll the worker UI queue and dispatch callbacks in the main thread self.after(100, self._poll_worker_ui_queue) def _set_results_columns(self, cols): """Reconfigure the shared results tree to use given columns. Cols is an iterable of (col_id, ...) where col_id are string keys. """ # Clear existing for c in self.results_tree.get_children(""): self.results_tree.delete(c) # reconfigure columns self.results_tree.config(columns=cols) for cid in cols: self.results_tree.heading(cid, text=cid.title()) self.results_tree.column(cid, width=120, anchor='w') def _on_action_scan(self): """Placeholder: run scan across profile paths and populate results tree. Actual worker invocation will be implemented in subsequent steps. """ self.log("Action: Scan started", level='INFO') self._set_phase("Scanning...") # reset counters/progress self._processed_files = 0 self._total_files = 0 try: self.progress['maximum'] = 1 self.progress['value'] = 0 self._lbl_files.config(text="Files: 0/0") except Exception: pass self._set_results_columns(("name", "path")) for c in self.results_tree.get_children(""): self.results_tree.delete(c) self.export_btn.config(state='disabled') # force GUI update before starting heavy work self.update_idletasks() # resolve profile/paths and filters paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters() if not paths: messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.") return # submit scan task def _scan_task(paths): files = [] from pathlib import Path for p in paths: pth = Path(p) try: if pth.is_dir(): files.extend(find_source_files(pth, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns)) elif pth.is_file(): files.append(pth) except Exception: continue # deduplicate seen = set() unique = [] for f in files: s = str(f) if s not in seen: seen.add(s) unique.append(f) return unique self._current_task_id = self.worker.submit(_scan_task, paths, kind='thread', on_done=self._scan_done) self.cancel_btn.config(state='normal') def _on_action_countings(self): self.log("Action: Countings started", level='INFO') self._set_phase("Counting...") # reset counters/progress self._processed_files = 0 self._total_files = 0 try: self.progress['maximum'] = 1 self.progress['value'] = 0 self._lbl_files.config(text="Files: 0/0") except Exception: pass self._set_results_columns(("name", "path", "code", "comment", "blank", "total", "language")) for c in self.results_tree.get_children(""): self.results_tree.delete(c) self.export_btn.config(state='disabled') # force GUI update before starting heavy work self.update_idletasks() paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters() if not paths: messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.") return # build targets by scanning paths synchronously in a background submit then map counts def _gather_files(paths): files = [] from pathlib import Path for p in paths: pth = Path(p) try: if pth.is_dir(): files.extend(find_source_files(pth, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns)) elif pth.is_file(): files.append(pth) except Exception: continue # deduplicate seen = set() unique = [] for f in files: s = str(f) if s not in seen: seen.add(s) unique.append(f) return unique # submit gather as blocking task and then map analyze_file_counts self._set_phase("Gathering files...") self.update_idletasks() def _on_gather_done(results): files = results or [] if not files: messagebox.showinfo("Countings", "No files found to analyze.") self.cancel_btn.config(state='disabled') self._set_phase('Idle') return # import analyzer from ..core.countings_impl import analyze_file_counts # prepare progress handlers self._total_files = len(files) self._processed_files = 0 try: self.progress['maximum'] = max(1, self._total_files) self.progress['value'] = 0 self._lbl_files.config(text=f"Files: 0/{self._total_files}") except Exception: pass self._set_phase(f"Analyzing {self._total_files} files...") self.update_idletasks() # map iterate self._current_task_id = self.worker.map_iterable(func=analyze_file_counts, items=files, kind='thread', on_progress=self._on_count_progress, on_done=self._on_count_done) # submit gather self._current_task_id = self.worker.submit(_gather_files, paths, kind='thread', on_done=_on_gather_done) self.cancel_btn.config(state='normal') def _on_action_metrics(self): self.log("Action: Metrics started", level='INFO') self._set_phase("Computing metrics...") # reset counters/progress self._processed_files = 0 self._total_files = 0 try: self.progress['maximum'] = 1 self.progress['value'] = 0 self._lbl_files.config(text="Files: 0/0") except Exception: pass self._set_results_columns(("name", "path", "avg_cc", "max_cc", "func_count", "mi", "language")) for c in self.results_tree.get_children(""): self.results_tree.delete(c) self.export_btn.config(state='disabled') # force GUI update before starting heavy work self.update_idletasks() paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters() if not paths: messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.") return # gather files then map metrics analyzer def _gather_files(paths): files = [] from pathlib import Path for p in paths: pth = Path(p) try: if pth.is_dir(): files.extend(find_source_files(pth, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns)) elif pth.is_file(): files.append(pth) except Exception: continue # deduplicate seen = set() unique = [] for f in files: s = str(f) if s not in seen: seen.add(s) unique.append(f) return unique # submit file gathering task with status update self._set_phase("Gathering files...") self.update_idletasks() def _on_gather_done(results): files = results or [] if not files: messagebox.showinfo("Metrics", "No files found to analyze.") self.cancel_btn.config(state='disabled') self._set_phase('Idle') return # use core metrics analyzer try: from ..core.metrics import analyze_file_metrics as analyzer except Exception: messagebox.showerror("Metrics", "Metrics analyzer not available") self._set_phase('Idle') return self._total_files = len(files) self._processed_files = 0 try: self.progress['maximum'] = max(1, self._total_files) self.progress['value'] = 0 self._lbl_files.config(text=f"Files: 0/{self._total_files}") except Exception: pass self._set_phase(f"Computing metrics for {self._total_files} files...") self.update_idletasks() self._current_task_id = self.worker.map_iterable(func=analyzer, items=files, kind='thread', on_progress=self._on_metrics_progress, on_done=self._on_metrics_done) self._current_task_id = self.worker.submit(_gather_files, paths, kind='thread', on_done=_on_gather_done) self.cancel_btn.config(state='normal') def _on_action_differ(self): """Run differ against existing baseline (or create baseline if none). This runs the heavy work in background via WorkerManager and uses on_done callbacks to update the GUI in main thread. """ self.log("Action: Differing started", level='INFO') self._set_phase("Differing...") self._set_results_columns(("fileA", "fileB", "added", "deleted", "modified", "unmodified", "Δ code_lines", "Δ comment_lines", "Δ blank_lines", "Δ func_count", "Δ avg_cc", "Δ mi")) for c in self.results_tree.get_children(""): self.results_tree.delete(c) self.export_btn.config(state='disabled') try: self._lbl_files.config(text="Files: 0/0") self.progress['maximum'] = 1 self.progress['value'] = 0 except Exception: pass # force GUI update before starting heavy work self.update_idletasks() paths, allowed_exts, ignore_patterns, pr = self._resolve_profile_and_filters() if not paths: messagebox.showwarning("Missing path", "Please select a folder or profile to analyze first.") return project = paths[0] self._set_phase("Loading baselines...") self.update_idletasks() bm = BaselineManager(project) baselines = bm.list_baselines() profile_name = pr.get('name') if pr else None max_keep = app_settings.get_max_keep() # helper callbacks def _on_create_done(baseline_id): try: self._set_phase('Idle') self._current_task_id = None self.cancel_btn.config(state='disabled') messagebox.showinfo("Differing", f"Baseline created: {baseline_id}") except Exception: self._set_phase('Idle') self._current_task_id = None self.cancel_btn.config(state='disabled') def _on_diff_done(result): # result is the dict produced by Differ.diff() try: total = result.get('total', {}) self.log(f"Differ finished: added={total.get('added',0)} deleted={total.get('deleted',0)} modified={total.get('modified',0)}", level='INFO') # populate results tree for pair in result.get('pairs', []): fileA = pair.get('fileA') or '' fileB = pair.get('fileB') or '' counts = pair.get('counts', {}) # Extract countings and metrics deltas countings_delta = pair.get('countings_delta') metrics_delta = pair.get('metrics_delta') # Format delta values: show actual values if available, empty string if not def format_delta(value, is_float=False): if value is None: return '' if is_float: formatted = f"{value:.2f}" if value > 0: return f"+{formatted}" return formatted else: if value > 0: return f"+{value}" return str(value) delta_code = format_delta(countings_delta.get('code_lines') if countings_delta else None) delta_comment = format_delta(countings_delta.get('comment_lines') if countings_delta else None) delta_blank = format_delta(countings_delta.get('blank_lines') if countings_delta else None) delta_func = format_delta(metrics_delta.get('func_count') if metrics_delta else None) delta_cc = format_delta(metrics_delta.get('avg_cc') if metrics_delta else None, is_float=True) delta_mi = format_delta(metrics_delta.get('mi') if metrics_delta else None, is_float=True) self.results_tree.insert('', 'end', values=(fileA, fileB, counts.get('added',0), counts.get('deleted',0), counts.get('modified',0), counts.get('unmodified',0), delta_code, delta_comment, delta_blank, delta_func, delta_cc, delta_mi)) self.export_btn.config(state='normal') # After a successful diff, save the current workspace as a new baseline try: # schedule baseline creation in background so UI remains responsive self._set_phase('Creating baseline...') self._current_task_id = self.worker.submit(bm.create_baseline_from_dir, project, None, True, True, ignore_patterns, profile_name, max_keep, kind='thread', on_done=_on_create_done) self.cancel_btn.config(state='normal') except Exception: self._set_phase('Idle') self.cancel_btn.config(state='disabled') self._current_task_id = None except Exception as e: messagebox.showerror('Differ Error', str(e)) # If no baseline exists: ask to create (run create in background) if not baselines: create = messagebox.askyesno("Differing", "No baseline found for this project. Create baseline now?") if not create: self._set_phase('Idle') return # submit create baseline task, pass ignore_patterns and profile name from profile self._set_phase("Creating baseline...") self.update_idletasks() profile_name = pr.get('name') if pr else None self._current_task_id = self.worker.submit(bm.create_baseline_from_dir, project, None, True, True, ignore_patterns, profile_name, 5, kind='thread', on_done=_on_create_done) self.cancel_btn.config(state='normal') return # If baselines exist: ask user to pick one (default: latest) if baselines: sorted_b = sorted(baselines) # modal selection dialog dlg = tk.Toplevel(self) dlg.title("Select baseline to compare") dlg.transient(self) dlg.grab_set() # center dialog over parent window dlg.update_idletasks() pw = self.winfo_width() ph = self.winfo_height() px = self.winfo_rootx() py = self.winfo_rooty() dw = dlg.winfo_reqwidth() dh = dlg.winfo_reqheight() x = px + (pw - dw) // 2 y = py + (ph - dh) // 2 dlg.geometry(f"+{x}+{y}") ttk.Label(dlg, text="Select a baseline:").grid(row=0, column=0, padx=12, pady=(12,4), sticky='w') listbox = tk.Listbox(dlg, height=min(10, len(sorted_b)), width=60, exportselection=False) for item in sorted_b: listbox.insert('end', item) listbox.grid(row=1, column=0, padx=12, pady=(0,12)) # default select latest listbox.select_set(len(sorted_b)-1) selected = {'id': None} def _on_ok(): sel = listbox.curselection() if not sel: messagebox.showwarning('Select baseline', 'Please select a baseline to compare') return selected['id'] = listbox.get(sel[0]) dlg.destroy() def _on_cancel(): selected['id'] = None dlg.destroy() btn_frame = ttk.Frame(dlg) btn_frame.grid(row=2, column=0, pady=(0,12), sticky='e', padx=12) ok_btn = ttk.Button(btn_frame, text='OK', command=_on_ok) ok_btn.grid(row=0, column=0, padx=(0,8)) cancel_btn = ttk.Button(btn_frame, text='Cancel', command=_on_cancel) cancel_btn.grid(row=0, column=1) # double-click to accept def _on_dbl(evt): idx = listbox.curselection() if idx: selected['id'] = listbox.get(idx[0]) dlg.destroy() listbox.bind('', _on_dbl) # wait for dialog self.wait_window(dlg) chosen = selected.get('id') if not chosen: self._set_phase('Idle') return try: meta = bm.load_metadata(chosen) self.log(f"Loaded baseline: {chosen}", level='INFO') except Exception as e: messagebox.showerror("Differing", f"Failed to load baseline metadata: {e}") self._set_phase('Idle') return # Build pairs and process them in parallel with per-item progress updates differ = Differ(meta, project, ignore_patterns=ignore_patterns) # build lists (may take a bit) - done in main thread for simplicity self._set_phase("Scanning current files...") self.update_idletasks() try: current_files = differ.build_current_file_list() except Exception: messagebox.showerror('Differing', 'Failed to scan current project files') self._set_phase('Idle') return self._set_phase("Matching files...") self.update_idletasks() pairs = differ.match_files(meta.files, current_files) # per-pair processing function def _process_pair(pair): a, b = pair res = { 'fileA': a.path if a is not None else None, 'fileB': b.path if b is not None else None, 'counts': None, 'file_meta': None, 'baseline_countings': None, 'current_countings': None, 'countings_delta': None, 'baseline_metrics': None, 'current_metrics': None, 'metrics_delta': None, } # diff counts fa = os.path.join(meta.project_root, a.path) if a is not None else None fb = os.path.join(project, b.path) if b is not None else None res['counts'] = Differ._diff_file_pair(fa, fb) # Extract baseline countings and metrics from file a if a is not None: if hasattr(a, 'countings') and a.countings: res['baseline_countings'] = a.countings if hasattr(a, 'metrics') and a.metrics: res['baseline_metrics'] = a.metrics # gather file metadata for the current file (b) if exists if b is not None: file_entry = { 'path': b.path, 'size': b.size, 'mtime': b.mtime, 'sha1': b.sha1, } # run countings and metrics for current file try: from ..core.countings_impl import analyze_file_counts except Exception: analyze_file_counts = None try: from ..core.metrics import analyze_file_metrics except Exception: analyze_file_metrics = None if analyze_file_counts: try: from pathlib import Path as _Path counts = analyze_file_counts(_Path(os.path.join(project, b.path))) file_entry['countings'] = { 'physical_lines': int(counts.get('physical_lines', 0)), 'code_lines': int(counts.get('code_lines', 0)), 'comment_lines': int(counts.get('comment_lines', 0)), 'blank_lines': int(counts.get('blank_lines', 0)), } res['current_countings'] = file_entry['countings'] except Exception: file_entry['countings'] = None if analyze_file_metrics: try: metrics = analyze_file_metrics(os.path.join(project, b.path)) file_entry['metrics'] = { 'avg_cc': metrics.get('avg_cc', 0.0), 'max_cc': int(metrics.get('max_cc', 0)), 'func_count': int(metrics.get('func_count', 0)), 'mi': metrics.get('mi', 0.0), } res['current_metrics'] = file_entry['metrics'] except Exception: file_entry['metrics'] = None res['file_meta'] = file_entry # Compute deltas if res['baseline_countings'] and res['current_countings']: res['countings_delta'] = { 'physical_lines': res['current_countings']['physical_lines'] - res['baseline_countings'].get('physical_lines', 0), 'code_lines': res['current_countings']['code_lines'] - res['baseline_countings'].get('code_lines', 0), 'comment_lines': res['current_countings']['comment_lines'] - res['baseline_countings'].get('comment_lines', 0), 'blank_lines': res['current_countings']['blank_lines'] - res['baseline_countings'].get('blank_lines', 0), } if res['baseline_metrics'] and res['current_metrics']: res['metrics_delta'] = { 'func_count': res['current_metrics']['func_count'] - res['baseline_metrics'].get('func_count', 0), 'avg_cc': res['current_metrics']['avg_cc'] - res['baseline_metrics'].get('avg_cc', 0.0), 'max_cc': res['current_metrics']['max_cc'] - res['baseline_metrics'].get('max_cc', 0), 'mi': res['current_metrics']['mi'] - res['baseline_metrics'].get('mi', 0.0), } return res # progress handler: called on each completed item (in main thread via dispatch) def _on_progress(item_res): # update progress bar and tree with incremental row try: fileA = item_res.get('fileA') or '' fileB = item_res.get('fileB') or '' counts = item_res.get('counts') or {} # Extract countings and metrics deltas countings_delta = item_res.get('countings_delta') metrics_delta = item_res.get('metrics_delta') # Format delta values def format_delta(value, is_float=False): if value is None: return '' if is_float: formatted = f"{value:.2f}" if value > 0: return f"+{formatted}" return formatted else: if value > 0: return f"+{value}" return str(value) delta_code = format_delta(countings_delta.get('code_lines') if countings_delta else None) delta_comment = format_delta(countings_delta.get('comment_lines') if countings_delta else None) delta_blank = format_delta(countings_delta.get('blank_lines') if countings_delta else None) delta_func = format_delta(metrics_delta.get('func_count') if metrics_delta else None) delta_cc = format_delta(metrics_delta.get('avg_cc') if metrics_delta else None, is_float=True) delta_mi = format_delta(metrics_delta.get('mi') if metrics_delta else None, is_float=True) self.results_tree.insert('', 'end', values=(fileA, fileB, counts.get('added',0), counts.get('deleted',0), counts.get('modified',0), counts.get('unmodified',0), delta_code, delta_comment, delta_blank, delta_func, delta_cc, delta_mi)) # progress update try: self._processed_files = getattr(self, '_processed_files', 0) + 1 self._lbl_files.config(text=f"Files: {self._processed_files}/{len(pairs)}") self.progress['maximum'] = max(1, len(pairs)) self.progress['value'] = self._processed_files except Exception: pass except Exception: pass # done handler: assemble final result and call existing on_done def _on_done(all_results): # compute totals total = {"added":0, "deleted":0, "modified":0, "unmodified":0} current_files_list = [] for it in all_results: c = it.get('counts', {}) total['added'] += c.get('added',0) total['deleted'] += c.get('deleted',0) total['modified'] += c.get('modified',0) total['unmodified'] += c.get('unmodified',0) fm = it.get('file_meta') if fm: current_files_list.append(fm) result = {"baseline_id": meta.baseline_id, "compared_at": time.time(), "total": total, "pairs": all_results} # Build summary statistics from baseline and current file metadata try: # Baseline summary (from meta.files which have embedded countings/metrics) baseline_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0} baseline_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0} baseline_metrics_count = 0 for fm in meta.files: if hasattr(fm, 'countings') and fm.countings: baseline_counts["physical_lines"] += fm.countings.get("physical_lines", 0) baseline_counts["code_lines"] += fm.countings.get("code_lines", 0) baseline_counts["comment_lines"] += fm.countings.get("comment_lines", 0) baseline_counts["blank_lines"] += fm.countings.get("blank_lines", 0) baseline_counts["file_count"] += 1 if hasattr(fm, 'metrics') and fm.metrics: baseline_metrics["total_func_count"] += fm.metrics.get("func_count", 0) baseline_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0) baseline_metrics["avg_mi"] += fm.metrics.get("mi", 0.0) baseline_metrics_count += 1 if baseline_metrics_count > 0: baseline_metrics["avg_avg_cc"] /= baseline_metrics_count baseline_metrics["avg_mi"] /= baseline_metrics_count baseline_metrics["file_count"] = baseline_metrics_count # Current summary (from current_files_list) current_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0} current_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0} current_metrics_count = 0 for f in current_files_list: c = f.get('countings') or {} m = f.get('metrics') or {} if c: current_counts["physical_lines"] += int(c.get('physical_lines', 0)) current_counts["code_lines"] += int(c.get('code_lines', 0)) current_counts["comment_lines"] += int(c.get('comment_lines', 0)) current_counts["blank_lines"] += int(c.get('blank_lines', 0)) current_counts["file_count"] += 1 if m: current_metrics["total_func_count"] += int(m.get('func_count', 0) or 0) current_metrics["avg_avg_cc"] += float(m.get('avg_cc', 0.0) or 0.0) current_metrics["avg_mi"] += float(m.get('mi', 0.0) or 0.0) current_metrics_count += 1 if current_metrics_count > 0: current_metrics["avg_avg_cc"] /= current_metrics_count current_metrics["avg_mi"] /= current_metrics_count current_metrics["file_count"] = current_metrics_count # Compute deltas delta_counts = { "physical_lines": current_counts["physical_lines"] - baseline_counts["physical_lines"], "code_lines": current_counts["code_lines"] - baseline_counts["code_lines"], "comment_lines": current_counts["comment_lines"] - baseline_counts["comment_lines"], "blank_lines": current_counts["blank_lines"] - baseline_counts["blank_lines"], "file_count": current_counts["file_count"] - baseline_counts["file_count"], } delta_metrics = { "total_func_count": current_metrics["total_func_count"] - baseline_metrics["total_func_count"], "avg_avg_cc": current_metrics["avg_avg_cc"] - baseline_metrics["avg_avg_cc"], "avg_mi": current_metrics["avg_mi"] - baseline_metrics["avg_mi"], } result['summary'] = { 'baseline': {'countings': baseline_counts, 'metrics': baseline_metrics}, 'current': {'countings': current_counts, 'metrics': current_metrics}, 'delta': {'countings': delta_counts, 'metrics': delta_metrics} } except Exception: pass # call existing on_done behavior _on_diff_done(result) # submit parallel per-pair tasks with progress self._processed_files = 0 self._total_files = len(pairs) self.progress['maximum'] = max(1, len(pairs)) self._lbl_files.config(text=f"Files: 0/{len(pairs)}") self._set_phase(f"Analyzing {len(pairs)} file pairs...") self.update_idletasks() self._current_task_id = self.worker.map_iterable(_process_pair, pairs, kind='thread', on_progress=_on_progress, on_done=_on_done) self.cancel_btn.config(state='normal') def _on_action_cancel(self): self.log("Action: Cancel requested", level='WARNING') if getattr(self, '_current_task_id', None): try: self.worker.cancel(self._current_task_id) except Exception: pass self._current_task_id = None # reset phase to idle when finished try: self._set_phase("Idle") except Exception: pass self.cancel_btn.config(state='disabled') def _on_action_export(self): from ..utils.csv_exporter import export_rows_to_csv path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV files", "*.csv"), ("All files", "*")]) if not path: return headers = [c for c in self.results_tree['columns']] rows = (self.results_tree.item(child, 'values') for child in self.results_tree.get_children()) try: written = export_rows_to_csv(path, headers, rows) messagebox.showinfo("Export", f"Exported {written} rows to {path}") except Exception as e: messagebox.showerror("Export Error", str(e)) def _resolve_profile_and_filters(self): """Return (paths, allowed_exts, ignore_patterns, profile) for current selection.""" pr = getattr(self.topbar, 'current_profile', None) if not pr: pname = getattr(self.topbar, 'profile_var', None) if pname: from ..config import profiles as profiles_cfg pr = profiles_cfg.find_profile(pname.get()) if hasattr(pname, 'get') else profiles_cfg.find_profile(str(pname)) paths = [] if pr: paths = pr.get('paths') or [] else: p = getattr(self.topbar, 'path_var', None) if p: val = p.get().strip() if hasattr(p, 'get') else str(p).strip() if val: paths = [val] # build allowed extensions from profile languages (if any) allowed_exts = None if pr: langs = pr.get('languages', []) or [] exts = [] for ln in langs: if ln in LANGUAGE_EXTENSIONS: exts.extend(LANGUAGE_EXTENSIONS[ln]) else: val = ln.strip() if val.startswith('.'): exts.append(val.lower()) elif len(val) <= 5 and not val.isalpha(): exts.append(f".{val.lower()}") else: pass if exts: allowed_exts = set(exts) ignore_patterns = pr.get('ignore', []) if pr else None return paths, allowed_exts, ignore_patterns, pr def _scan_done(self, results): # results: list of Path try: files = results or [] for p in self.results_tree.get_children(""): self.results_tree.delete(p) for f in files: try: self.results_tree.insert('', 'end', values=(f.name, str(f))) except Exception: pass self.export_btn.config(state='normal' if files else 'disabled') self.cancel_btn.config(state='disabled') self._current_task_id = None except Exception as e: messagebox.showerror('Scan Error', str(e)) self.cancel_btn.config(state='disabled') def _on_count_progress(self, res): # res is expected to be dict from analyze_file_counts try: if not isinstance(res, dict): return file = res.get('file') name = Path(file).name if file else '' code = res.get('code_lines') or res.get('code') or 0 comment = res.get('comment_lines') or 0 blank = res.get('blank_lines') or 0 total = res.get('physical_lines') or 0 lang = res.get('language') or '' self.results_tree.insert('', 'end', values=(name, file, int(code), int(comment), int(blank), int(total), lang)) # update progress try: self._processed_files = getattr(self, '_processed_files', 0) + 1 self._lbl_files.config(text=f"Files: {self._processed_files}/{self._total_files}") self.progress['value'] = self._processed_files except Exception: pass except Exception: pass def _on_count_done(self, results): self.export_btn.config(state='normal') self.cancel_btn.config(state='disabled') self._current_task_id = None try: self._set_phase("Idle") except Exception: pass def _on_metrics_progress(self, res): try: if not isinstance(res, dict): return file = res.get('file') name = Path(file).name if file else '' avg = res.get('avg_cc') or res.get('avg') or 0 maxcc = res.get('max_cc') or 0 func = res.get('func_count') or 0 mi = res.get('mi') or 0 lang = res.get('language') or '' self.results_tree.insert('', 'end', values=(name, file, avg, maxcc, func, mi, lang)) self._processed_files = getattr(self, '_processed_files', 0) + 1 try: self._lbl_files.config(text=f"Files: {self._processed_files}/{self._total_files}") self.progress['value'] = self._processed_files except Exception: pass except Exception: pass def _on_metrics_done(self, results): self.export_btn.config(state='normal') self.cancel_btn.config(state='disabled') self._current_task_id = None try: self._set_phase("Idle") except Exception: pass def _poll_worker_ui_queue(self): try: while True: msg = self.worker.ui_queue.get_nowait() # allow GUI to update task list and status self._handle_worker_msg(msg) # dispatch registered callbacks try: self.worker.dispatch_message(msg) except Exception: pass except Exception: # queue empty or other; schedule next poll pass self.after(100, self._poll_worker_ui_queue) def _handle_worker_msg(self, msg: tuple): """Log worker messages to the application's log so there's a single place (the Log panel) to follow task lifecycle and results. """ typ, task_id, payload = msg if typ == "started": name = payload.get("func") if isinstance(payload, dict) else str(payload) self.log(f"Task {task_id[:8]} started: {name}", level="INFO") elif typ == "progress": # payload is expected to be a partial result or status dictionary # Progress updates can be very frequent and slow down the GUI # so log them at DEBUG level (hidden by default) and only # surface warnings/errors at higher levels. self.log(f"Task {task_id[:8]} progress: {payload}", level="DEBUG") elif typ == "done": # payload may contain final results self.log(f"Task {task_id[:8]} done. Result: {payload}", level="INFO") elif typ == "error": # payload is typically a traceback string or exception info self.log(f"Task {task_id[:8]} error: {payload}", level="ERROR") elif typ == "cancelled": self.log(f"Task {task_id[:8]} cancelled", level="WARNING") def on_closing(self): """Cleanup when closing the application.""" try: # Stop resource monitor if hasattr(self, 'resource_monitor'): self.resource_monitor.stop() except Exception: pass try: # Shutdown logger system if hasattr(self, 'logger_system'): self.logger_system.shutdown() except Exception: pass self.destroy() def run_app(): app = App() app.protocol("WM_DELETE_WINDOW", app.on_closing) app.mainloop() if __name__ == "__main__": run_app()