Chore: Stop tracking files based on .gitignore update.

Untracked files matching the following rules:
- Rule "*.zip": 1 file
This commit is contained in:
VALLONGOL 2025-11-24 10:15:59 +01:00
parent 5f7b623641
commit 4fdd646d60
26 changed files with 2631 additions and 12 deletions

7
.envrc Normal file
View File

@ -0,0 +1,7 @@
# Auto-activate local virtualenv when entering project directory
# Requires: https://direnv.net/ (recommended)
# If you prefer not to use direnv, simply run: `source .venv/bin/activate`
if [ -f .venv/bin/activate ]; then
source .venv/bin/activate
fi

2
.gitignore vendored
View File

@ -148,3 +148,5 @@ dmypy.json
# Temporary files
*.swp
*~
.DS_Store
*.zip

19
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,19 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "PyUcc: Launch GUI",
"type": "python",
"request": "launch",
"module": "pyucc",
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"justMyCode": false,
"env": {
"PYTHONPATH": "${workspaceFolder}",
"VIRTUAL_ENV": "${workspaceFolder}/.venv",
"PATH": "${workspaceFolder}/.venv/bin:${env:PATH}"
}
}
]
}

4
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,4 @@
{
"python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python",
"python.terminal.activateEnvironment": true
}

View File

@ -1,17 +1,77 @@
# pyucc/__main__.py
# ucc_py/__main__.py
# Example import assuming your main logic is in a 'main' function
# within a 'app' module in your 'pyucc.core' package.
# from pyucc.core.app import main as start_application
#
# Or, if you have a function in pyucc.core.core:
# from pyucc.core.core import main_function
import argparse
import sys
from pathlib import Path
# Placeholder for future core logic imports
# from .core import scanner, counter, metrics, differ, outputs
def main():
print(f"Running PyUcc...")
# Placeholder: Replace with your application's entry point
# Example: start_application()
print("To customize, edit 'pyucc/__main__.py' and your core modules.")
"""
Main entry point for the ucc-py CLI application.
"""
parser = argparse.ArgumentParser(
description="A Python tool to replicate and enhance UCC functionalities."
)
parser.add_argument(
"baseline_dirs",
metavar="DIR",
nargs="*",
type=Path,
help="One (for analysis) or two (for diff) directories to process."
)
parser.add_argument(
"--gui",
action="store_true",
help="Launch the graphical user interface (Tkinter)."
)
parser.add_argument(
"--outdir",
type=Path,
default=Path("./ucc_py_output"),
help="Directory to store output reports."
)
args = parser.parse_args()
# If user asked for GUI, or no positional directories were provided, launch GUI and exit
if args.gui or len(args.baseline_dirs) == 0:
try:
from .gui.gui import run_app
except Exception as e:
print(f"Errore avviando la GUI: {e}", file=sys.stderr)
sys.exit(1)
run_app()
return
# --- Argument Validation for CLI mode ---
if len(args.baseline_dirs) > 2:
print("Error: Maximum of two baseline directories are supported.", file=sys.stderr)
sys.exit(1)
for directory in args.baseline_dirs:
if not directory.is_dir():
print(f"Error: Path '{directory}' is not a valid directory.", file=sys.stderr)
sys.exit(1)
# --- Execution Logic (Placeholder) ---
print(f"Starting analysis for: {[str(d) for d in args.baseline_dirs]}")
print(f"Output will be saved to: {args.outdir}")
# Ensure output directory exists
args.outdir.mkdir(parents=True, exist_ok=True)
# In the next steps, we will call the core modules from here.
# For example:
# file_list_a = scanner.scan_directory(args.baseline_dirs[0])
# if len(args.baseline_dirs) > 1:
# file_list_b = scanner.scan_directory(args.baseline_dirs[1])
# # ... and so on
if __name__ == "__main__":
main()
main()

13
pyucc/config/languages.py Normal file
View File

@ -0,0 +1,13 @@
# Mapping from human language names to common file extensions
LANGUAGE_EXTENSIONS = {
"Python": [".py", ".pyw"],
"C": [".c", ".h"],
"C++": [".cpp", ".cc", ".cxx", ".hpp", ".hh", ".inl"],
"Java": [".java"],
"JavaScript": [".js", ".mjs", ".cjs", ".jsx"],
"HTML": [".html", ".htm"],
"Shell": [".sh"],
"TypeScript": [".ts", ".tsx"],
"Go": [".go"],
"Rust": [".rs"],
}

77
pyucc/config/profiles.py Normal file
View File

@ -0,0 +1,77 @@
"""Profiles persistence for PyUcc.
Stores user profiles as JSON in the user's home directory
(`~/.pyucc_profiles.json`). Each profile is a dict with keys:
- name: str
- path: str
- languages: list[str]
- ignore: list[str]
This module exposes simple load/save/manage helpers.
"""
from pathlib import Path
import json
from typing import List, Dict, Optional
_DEFAULT_PATH = Path.home() / ".pyucc_profiles.json"
def _read_file(path: Path) -> List[Dict]:
if not path.exists():
return []
try:
with path.open("r", encoding="utf-8") as fh:
data = json.load(fh)
if isinstance(data, list):
return data
except Exception:
return []
return []
def _write_file(path: Path, profiles: List[Dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as fh:
json.dump(profiles, fh, indent=2, ensure_ascii=False)
def load_profiles(path: Optional[Path] = None) -> List[Dict]:
"""Return list of profiles from storage."""
p = path or _DEFAULT_PATH
return _read_file(p)
def save_profiles(profiles: List[Dict], path: Optional[Path] = None) -> None:
"""Persist profiles to storage."""
p = path or _DEFAULT_PATH
_write_file(p, profiles)
def find_profile(name: str, path: Optional[Path] = None) -> Optional[Dict]:
"""Return profile dict by name or None if not found."""
profiles = load_profiles(path)
for pr in profiles:
if pr.get("name") == name:
return pr
return None
def add_or_update_profile(profile: Dict, path: Optional[Path] = None) -> None:
"""Add new profile or update existing one by name."""
p = path or _DEFAULT_PATH
profiles = load_profiles(p)
for i, pr in enumerate(profiles):
if pr.get("name") == profile.get("name"):
profiles[i] = profile
save_profiles(profiles, p)
return
profiles.append(profile)
save_profiles(profiles, p)
def delete_profile(name: str, path: Optional[Path] = None) -> None:
"""Delete profile by name if exists."""
p = path or _DEFAULT_PATH
profiles = load_profiles(p)
profiles = [pr for pr in profiles if pr.get("name") != name]
save_profiles(profiles, p)

87
pyucc/core/countings.py Normal file
View File

@ -0,0 +1,87 @@
"""Modulo `countings` — funzioni per il conteggio SLOC e metadati per file.
Questo file contiene funzioni wrapper che incapsulano l'uso di `pygount`
o implementazioni di fallback per ottenere i conteggi richiesti dal
requisito "Counting" descritto in `doc/Sviluppo.md`.
Per ora fornisce uno skeleton minimale con API chiaramente documentata.
"""
from pathlib import Path
from typing import Dict, Any
try:
from pygount import analysis
_HAS_PYGOUNT = True
except Exception:
_HAS_PYGOUNT = False
def analyze_file_counts(path: Path) -> Dict[str, Any]:
"""Analizza un singolo file e ritorna un dizionario con i conteggi.
Restituisce almeno le chiavi:
- physical_lines
- code_lines
- comment_lines
- blank_lines
- language
Nota: la funzione usa `pygount` se disponibile; altrimenti ritorna
valori di fallback (zeros/unknown).
"""
result: Dict[str, Any] = {
"file": str(path),
"physical_lines": 0,
"code_lines": 0,
"comment_lines": 0,
"blank_lines": 0,
"language": "unknown",
}
if not path.exists():
raise FileNotFoundError(f"File non trovato: {path}")
if _HAS_PYGOUNT:
# Esempio di uso minimale di pygount: per produzione si dovranno
# adattare le opzioni e il parsing del risultato.
try:
stats = analysis.FileAnalyzer(str(path)).get_summary()
# Nota: pygount API può variare; qui usiamo campi comuni se presenti
result.update({
"physical_lines": getattr(stats, "raw_total_lines", 0),
"code_lines": getattr(stats, "code", 0),
"comment_lines": getattr(stats, "comment", 0),
"blank_lines": getattr(stats, "blank", 0),
"language": getattr(stats, "language", "unknown"),
})
except Exception:
# In caso di problemi con pygount, manteniamo i fallback
pass
else:
# Fallback molto semplice: leggi file e conta righe
with path.open("r", errors="ignore") as fh:
lines = fh.readlines()
result["physical_lines"] = len(lines)
# Semplice approssimazione: righe vuote vs non vuote
blanks = sum(1 for l in lines if l.strip() == "")
result["blank_lines"] = blanks
result["code_lines"] = len(lines) - blanks
return result
def analyze_paths(paths):
"""Analizza più paths (file) e ritorna una lista di risultati.
`paths` può essere un iterabile di `Path` o stringhe; la funzione
normalizza e invoca `analyze_file_counts` per ciascuno.
"""
results = []
for p in paths:
path = Path(p)
try:
results.append(analyze_file_counts(path))
except Exception as e:
results.append({"file": str(path), "error": str(e)})
return results

View File

@ -0,0 +1,149 @@
"""Implementazione `countings` usando il CLI `pygount` (JSON) con fallback.
Questo modulo fornisce `analyze_file_counts` e `analyze_paths`.
"""
from pathlib import Path
from typing import Dict, Any, Iterable, List
import json
import subprocess
import logging
try:
import pygount # type: ignore
_HAS_PYGOUNT = True
except Exception:
_HAS_PYGOUNT = False
_LOG = logging.getLogger(__name__)
def _map_pygount_json_item(item: Dict[str, Any]) -> Dict[str, Any]:
# Support multiple pygount JSON shapes and key names
physical = (
item.get("raw_total_lines")
or item.get("n_lines")
or item.get("lines")
or item.get("raw_lines")
or item.get("lineCount")
or item.get("line_count")
or 0
)
code = (
item.get("code")
or item.get("n_code")
or item.get("n_code_lines")
or item.get("code_lines")
or item.get("codeCount")
or item.get("sourceCount")
or 0
)
comment = (
item.get("comment")
or item.get("n_comment")
or item.get("n_comment_lines")
or item.get("comment_lines")
or item.get("documentationCount")
or 0
)
blank = (
item.get("blank")
or item.get("n_blank")
or item.get("blank_lines")
or item.get("emptyCount")
or item.get("empty_count")
or 0
)
language = item.get("language") or item.get("lang") or item.get("languageName") or "unknown"
file_path = (
item.get("filename")
or item.get("file")
or item.get("path")
or item.get("name")
or ""
)
return {
"file": file_path,
"physical_lines": int(physical),
"code_lines": int(code),
"comment_lines": int(comment),
"blank_lines": int(blank),
"language": language,
}
def analyze_file_counts(path: Path) -> Dict[str, Any]:
if not path.exists():
raise FileNotFoundError(f"File non trovato: {path}")
result: Dict[str, Any] = {
"file": str(path),
"physical_lines": 0,
"code_lines": 0,
"comment_lines": 0,
"blank_lines": 0,
"language": "unknown",
}
if _HAS_PYGOUNT:
try:
proc = subprocess.run(["pygount", "--format", "json", str(path)], check=True, capture_output=True, text=True)
parsed = json.loads(proc.stdout)
# Support expected JSON shapes from pygount: list or dict with 'files'
if isinstance(parsed, list) and parsed:
item = parsed[0]
result.update(_map_pygount_json_item(item))
return result
if isinstance(parsed, dict):
files = parsed.get("files")
if files and isinstance(files, list) and files:
item = files[0]
result.update(_map_pygount_json_item(item))
return result
# If pygount ran but returned no usable data, log stdout/stderr at DEBUG
_LOG.debug("pygount returned empty or unexpected JSON for %s", path)
_LOG.debug("pygount stdout:\n%s", proc.stdout)
_LOG.debug("pygount stderr:\n%s", proc.stderr)
# force fallback to simple counting
raise RuntimeError("pygount returned no data")
except Exception:
# Log exception and stderr if available, then fall back to simple counting
try:
# If proc exists, include its stderr for diagnostics
if 'proc' in locals():
_LOG.exception("pygount invocation failed for %s; stderr:\n%s", path, getattr(proc, 'stderr', None))
else:
_LOG.exception("pygount invocation failed for %s", path)
except Exception:
# ensure we don't break on logging
pass
# fall back to simple counting
pass
# Fallback: basic counting
with path.open("r", errors="ignore") as fh:
lines = fh.readlines()
physical = len(lines)
blanks = sum(1 for l in lines if l.strip() == "")
code_lines = physical - blanks
result.update({
"physical_lines": physical,
"code_lines": code_lines,
"comment_lines": 0,
"blank_lines": blanks,
"language": "unknown",
})
return result
def analyze_paths(paths: Iterable[Path]) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for p in paths:
path = Path(p)
try:
results.append(analyze_file_counts(path))
except Exception as e:
results.append({"file": str(path), "error": str(e)})
return results

99
pyucc/core/scanner.py Normal file
View File

@ -0,0 +1,99 @@
# ucc_py/core/scanner.py
from pathlib import Path
from typing import List, Iterable, Optional
import fnmatch
def _normalize_extensions(exts: Iterable[str]) -> set:
out = set()
for ext in exts:
e = ext.strip().lower()
if not e:
continue
if not e.startswith('.'):
e = f'.{e}'
out.add(e)
return out
def find_source_files(
directory: Path,
allowed_extensions: Optional[Iterable[str]] = None,
ignore_patterns: Optional[Iterable[str]] = None,
) -> List[Path]:
"""
Recursively finds files in a directory.
If `allowed_extensions` is provided (an iterable of extensions like ['.py', '.cpp']),
only files with those suffixes (case-insensitive) are returned.
If `ignore_patterns` is provided, any file or directory matching any of the
patterns (fnmatch-style) will be skipped. Patterns are matched case-insensitively
against each path component (file and directory names).
Args:
directory: The root directory to start scanning from.
allowed_extensions: Optional iterable of extensions to include.
ignore_patterns: Optional iterable of fnmatch patterns to exclude.
Returns:
A list of Path objects for each source file found.
"""
# A very basic set of extensions to ignore when no allowed_extensions is provided.
ignored_extensions = {
".exe", ".dll", ".so", ".o", ".a", ".lib",
".jpg", ".jpeg", ".png", ".gif", ".bmp",
".zip", ".tar", ".gz", ".rar", ".7z",
".pdf", ".doc", ".docx", ".xls", ".xlsx",
".db", ".sqlite",
}
allowed = None
if allowed_extensions:
allowed = _normalize_extensions(allowed_extensions)
ignores = None
if ignore_patterns:
# Accept either a single comma-separated string or an iterable of strings
if isinstance(ignore_patterns, str):
parts = [p.strip() for p in ignore_patterns.split(",")]
else:
parts = list(ignore_patterns)
# normalize patterns to lower-case and strip
ignores = [p.strip().lower() for p in parts if p and p.strip()]
source_files: List[Path] = []
for path in directory.rglob("*"):
if not path.is_file():
continue
name_lower = path.name.lower()
# skip if any ignore pattern matches the file name or any parent directory name
skip = False
if ignores is not None:
for pat in ignores:
if fnmatch.fnmatch(name_lower, pat):
skip = True
break
# check parents
for parent in path.parents:
if fnmatch.fnmatch(parent.name.lower(), pat):
skip = True
break
if skip:
break
if skip:
continue
suffix = path.suffix.lower()
if allowed is not None:
if suffix in allowed:
source_files.append(path)
else:
if suffix not in ignored_extensions:
source_files.append(path)
return source_files

172
pyucc/core/worker.py Normal file
View File

@ -0,0 +1,172 @@
import uuid
import queue
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import Callable, Any, Iterable
class WorkerManager:
"""Simple worker manager with UI queue dispatch.
- Uses ThreadPoolExecutor for IO-bound tasks by default.
- Can also use ProcessPoolExecutor for CPU-bound tasks when `kind='process'`.
- Exposes a `ui_queue` (thread-safe `queue.Queue`) that the GUI should poll.
- Supports `submit` (single task) and `map_iterable` (parallel map with per-item progress).
"""
def __init__(self, max_threads: int = None, max_processes: int = None):
self._threads = ThreadPoolExecutor(max_workers=max_threads)
self._processes = ProcessPoolExecutor(max_workers=max_processes)
self.ui_queue = queue.Queue()
self._tasks = {} # task_id -> metadata (callbacks, futures)
self._lock = threading.Lock()
def _new_task_id(self) -> str:
return uuid.uuid4().hex
def submit(self, func: Callable, *args, kind: str = "thread", on_done: Callable[[Any], None] | None = None,
on_error: Callable[[Exception], None] | None = None) -> str:
"""Submit a callable to the chosen executor. Returns a task_id."""
task_id = self._new_task_id()
executor = self._threads if kind == "thread" else self._processes
def _wrap():
try:
res = func(*args)
self.ui_queue.put(("done", task_id, res))
except Exception as e:
tb = traceback.format_exc()
self.ui_queue.put(("error", task_id, tb))
# announce started
self.ui_queue.put(("started", task_id, {"func": getattr(func, "__name__", str(func)), "args": len(args)}))
fut = executor.submit(_wrap)
with self._lock:
self._tasks[task_id] = {"futures": [fut], "on_done": on_done, "on_error": on_error}
return task_id
def map_iterable(self, func: Callable, items: Iterable, kind: str = "thread", on_progress: Callable[[Any], None] | None = None,
on_done: Callable[[list], None] | None = None) -> str:
"""Map func over items in parallel and emit per-item progress messages to ui_queue.
The GUI should poll `ui_queue` and call `dispatch_message` to run callbacks in main thread.
"""
task_id = self._new_task_id()
executor = self._threads if kind == "thread" else self._processes
items = list(items)
if not items:
# Immediately finish
self.ui_queue.put(("done", task_id, []))
return task_id
futures = []
def _submit_item(it):
def _call():
return func(it)
return executor.submit(_call)
for it in items:
fut = _submit_item(it)
futures.append((it, fut))
with self._lock:
self._tasks[task_id] = {"futures": [f for _, f in futures], "on_progress": on_progress, "on_done": on_done}
# announce started (with estimated total)
self.ui_queue.put(("started", task_id, {"func": getattr(func, "__name__", str(func)), "total": len(items)}))
# Start a watcher thread that collects results and pushes to ui_queue
def _watcher():
results = []
for it, fut in futures:
try:
res = fut.result()
results.append(res)
# emit progress message
self.ui_queue.put(("progress", task_id, res))
except Exception:
tb = traceback.format_exc()
self.ui_queue.put(("error", task_id, tb))
# all done
self.ui_queue.put(("done", task_id, results))
threading.Thread(target=_watcher, daemon=True).start()
return task_id
def dispatch_message(self, msg: tuple):
"""Dispatch a single message (called from GUI/main thread)."""
typ, task_id, payload = msg
meta = None
with self._lock:
meta = self._tasks.get(task_id)
if not meta:
return
if typ == "progress":
cb = meta.get("on_progress")
if cb:
try:
cb(payload)
except Exception:
# swallow exceptions from UI callbacks
pass
elif typ == "done":
cb = meta.get("on_done")
if cb:
try:
cb(payload)
except Exception:
pass
# cleanup
with self._lock:
self._tasks.pop(task_id, None)
elif typ == "started":
# no-op for dispatch — GUI may handle started notifications separately
return
elif typ == "error":
cb = meta.get("on_error") or meta.get("on_done")
if cb:
try:
cb(payload)
except Exception:
pass
with self._lock:
self._tasks.pop(task_id, None)
def cancel(self, task_id: str) -> bool:
"""Attempt to cancel a task. Returns True if cancellation was requested.
This will call `cancel()` on futures where supported. For processes this
may not immediately terminate the work.
"""
with self._lock:
meta = self._tasks.get(task_id)
if not meta:
return False
futs = meta.get("futures", [])
cancelled_any = False
for f in futs:
try:
ok = f.cancel()
cancelled_any = cancelled_any or ok
except Exception:
pass
# inform UI
self.ui_queue.put(("cancelled", task_id, None))
with self._lock:
self._tasks.pop(task_id, None)
return cancelled_any
def shutdown(self):
try:
self._threads.shutdown(wait=False)
except Exception:
pass
try:
self._processes.shutdown(wait=False)
except Exception:
pass

354
pyucc/gui/countings_tab.py Normal file
View File

@ -0,0 +1,354 @@
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import csv
from pathlib import Path
import threading
import queue
from ..core.countings_impl import analyze_paths, analyze_file_counts
from ..core.scanner import find_source_files
from ..config.languages import LANGUAGE_EXTENSIONS
class CountingsTab(ttk.Frame):
"""Tab that executes counting (SLOC) and displays results.
The CountingsTab relies on a shared TopBar for folder selection and
uses `analyze_paths` from the core implementation.
"""
def __init__(self, parent, topbar, app=None, *args, **kwargs):
"""Initialize the CountingsTab.
Args:
parent: The notebook widget containing this tab.
topbar: Shared TopBar instance exposing `path_var`.
"""
super().__init__(parent, *args, **kwargs)
self.topbar = topbar
self.app = app
self.worker = None
controls = ttk.Frame(self)
controls.grid(row=0, column=0, sticky="ew", padx=8, pady=6)
self.run_btn = ttk.Button(controls, text="Run Countings", command=self.start_countings)
self.run_btn.grid(row=0, column=0, sticky="w")
self.cancel_btn = ttk.Button(controls, text="Cancel", command=self.cancel, state="disabled")
self.cancel_btn.grid(row=0, column=1, padx=(8, 0))
self.export_btn = ttk.Button(controls, text="Export CSV", command=self.export_csv, state="disabled")
self.export_btn.grid(row=0, column=2, padx=(8, 0))
# Progress bar: will be set to determinate and updated per-file
self.progress = ttk.Progressbar(self, mode="determinate")
self.progress.grid(row=1, column=0, sticky="ew", padx=8, pady=(6, 0))
# Counters placed under the progress bar
counters = ttk.Frame(self)
counters.grid(row=2, column=0, sticky="ew", padx=8, pady=(6, 0))
self._lbl_physical = ttk.Label(counters, text="Physical: 0")
self._lbl_code = ttk.Label(counters, text="Code: 0")
self._lbl_comment = ttk.Label(counters, text="Comments: 0")
self._lbl_blank = ttk.Label(counters, text="Blank: 0")
self._lbl_files = ttk.Label(counters, text="Files: 0/0")
self._lbl_physical.pack(side="left", padx=(0,10))
self._lbl_code.pack(side="left", padx=(0,10))
self._lbl_comment.pack(side="left", padx=(0,10))
self._lbl_blank.pack(side="left", padx=(0,10))
self._lbl_files.pack(side="right", padx=(0,10))
# Adjust tree positioning (moved down one row)
# Treeview for tabular display (name, path, numeric columns)
columns = ("name", "path", "code", "comment", "blank", "total", "lang")
self.tree = ttk.Treeview(self, columns=columns, show="headings")
self.tree.heading("name", text="File", command=lambda: self._sort_by("name", False))
self.tree.heading("path", text="Path", command=lambda: self._sort_by("path", False))
self.tree.heading("code", text="Code", command=lambda: self._sort_by("code", False))
self.tree.heading("comment", text="Comment", command=lambda: self._sort_by("comment", False))
self.tree.heading("blank", text="Blank", command=lambda: self._sort_by("blank", False))
self.tree.heading("total", text="Total", command=lambda: self._sort_by("total", False))
self.tree.heading("lang", text="Language", command=lambda: self._sort_by("lang", False))
# column sizing: numeric columns smaller and right-aligned
self.tree.column("name", width=300, anchor="w", stretch=True)
self.tree.column("path", width=400, anchor="w", stretch=True)
self.tree.column("code", width=80, anchor="e", stretch=False)
self.tree.column("comment", width=80, anchor="e", stretch=False)
self.tree.column("blank", width=60, anchor="e", stretch=False)
self.tree.column("total", width=90, anchor="e", stretch=False)
self.tree.column("lang", width=120, anchor="w", stretch=False)
# Layout tree and scrollbars (both vertical and horizontal)
self.tree.grid(row=3, column=0, sticky="nsew", padx=8, pady=8)
vsb = ttk.Scrollbar(self, orient="vertical", command=self.tree.yview)
hsb = ttk.Scrollbar(self, orient="horizontal", command=self.tree.xview)
self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.grid(row=2, column=1, sticky="ns")
hsb.grid(row=3, column=0, columnspan=1, sticky="ew", padx=8)
# make tree expand to fill available space
self.rowconfigure(3, weight=1)
self.columnconfigure(0, weight=1)
# bind double-click to open file viewer
self.tree.bind("<Double-1>", self._on_double_click)
# internal state for counts/progress
self._cumulative_counts = {"physical": 0, "code": 0, "comment": 0, "blank": 0}
self._total_files = 0
self._processed_files = 0
def start_countings(self):
"""Start counting on the folder/file selected in the shared TopBar."""
path = self.topbar.path_var.get().strip()
if not path:
messagebox.showwarning("Missing path", "Please select a folder or file first.")
return
# log start
try:
if getattr(self, 'app', None):
self.app.log(f"Countings started on: {path}", "INFO")
except Exception:
pass
p = Path(path)
targets = []
if p.is_dir():
# If a profile is selected, build allowed extensions from it
allowed_exts = None
pr = getattr(self.topbar, "current_profile", None)
# fallback: if current_profile not set, try to resolve from combobox selection
if not pr:
pname = getattr(self.topbar, "profile_var", None)
if pname:
from ..config import profiles as profiles_cfg
pr = profiles_cfg.find_profile(pname.get()) if hasattr(pname, "get") else profiles_cfg.find_profile(str(pname))
if pr:
langs = pr.get("languages", []) or []
exts = []
for ln in langs:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
else:
# treat custom entries like ".ext" or "ext"
val = ln.strip()
if val.startswith('.'):
exts.append(val.lower())
elif len(val) <= 5 and not val.isalpha():
# probably extension-ish like 'py'
exts.append(f".{val.lower()}")
else:
# could be a language name unknown; skip
pass
if exts:
allowed_exts = set(exts)
ignore_patterns = pr.get("ignore", []) if pr else None
targets = find_source_files(p, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns)
elif p.is_file():
targets = [p]
else:
messagebox.showerror("Error", f"The path '{path}' is not valid.")
return
self.run_btn.config(state="disabled")
self.cancel_btn.config(state="normal")
self.export_btn.config(state="disabled")
# clear tree
for item in self.tree.get_children():
self.tree.delete(item)
# prepare progress and counters using scanner results
self._total_files = len(targets)
self._processed_files = 0
self._cumulative_counts = {"physical": 0, "code": 0, "comment": 0, "blank": 0}
try:
self.progress['maximum'] = max(1, self._total_files)
self.progress['value'] = 0
except Exception:
pass
try:
self._lbl_files.config(text=f"Files: {self._processed_files}/{self._total_files}")
self._lbl_physical.config(text="Physical: 0")
self._lbl_code.config(text="Code: 0")
self._lbl_comment.config(text="Comments: 0")
self._lbl_blank.config(text="Blank: 0")
except Exception:
pass
# Submit per-file work via the central WorkerManager so the GUI stays responsive.
try:
if not getattr(self, 'app', None) or not getattr(self.app, 'worker', None):
# fallback to previous behavior
self.worker = threading.Thread(target=self._worker_countings, args=(targets,), daemon=True)
self.worker.start()
self.after(200, self._poll_queue)
return
# use the worker manager to map analyze_file_counts over targets
# on_progress will be called for each file result; on_done when all finished
# Map per-file counting so we can show progress incrementally.
self._task_id = self.app.worker.map_iterable(
func=analyze_file_counts,
items=targets,
kind='thread',
on_progress=self._on_file_result,
on_done=self._on_all_done,
)
except Exception:
# if worker API not available or mapping failed, fallback
self.worker = threading.Thread(target=self._worker_countings, args=(targets,), daemon=True)
self.worker.start()
self.after(200, self._poll_queue)
def cancel(self):
"""Request cancellation (informational only)."""
# Cancellation not yet supported for task submitted via WorkerManager
messagebox.showinfo("Cancel", "Cancellation requested but not supported yet.")
try:
if getattr(self, 'app', None):
self.app.log("Countings cancelled by user", "WARNING")
except Exception:
pass
self._finish()
def _worker_countings(self, targets):
"""Legacy fallback worker that runs analyze_paths on the whole list."""
try:
results = analyze_paths(targets)
# Directly populate via main thread scheduling
self.after(0, lambda: self._on_all_done(results))
except Exception as e:
self.after(0, lambda: messagebox.showerror("Error", f"Error during countings: {e}"))
# New callbacks used by WorkerManager
def _on_file_result(self, res):
"""Called in main thread for each file result emitted by WorkerManager."""
try:
if isinstance(res, dict):
r = res
else:
r = res
if "error" in r:
file = r.get("file")
name = Path(file).name if file else ""
self.tree.insert("", "end", values=(name, file, 0, 0, 0, 0, r.get("error")))
else:
file = r.get("file")
name = Path(file).name if file else ""
code = r.get("code_lines") or 0
comment = r.get("comment_lines") or 0
blank = r.get("blank_lines") or 0
total = r.get("physical_lines") or 0
lang = r.get("language") or "unknown"
self.tree.insert("", "end", values=(name, file, int(code), int(comment), int(blank), int(total), lang))
# update cumulative counters and progress
try:
self._cumulative_counts['physical'] += int(total)
self._cumulative_counts['code'] += int(code)
self._cumulative_counts['comment'] += int(comment)
self._cumulative_counts['blank'] += int(blank)
self._processed_files += 1
# update labels
self._lbl_physical.config(text=f"Physical: {self._cumulative_counts['physical']}")
self._lbl_code.config(text=f"Code: {self._cumulative_counts['code']}")
self._lbl_comment.config(text=f"Comments: {self._cumulative_counts['comment']}")
self._lbl_blank.config(text=f"Blank: {self._cumulative_counts['blank']}")
self._lbl_files.config(text=f"Files: {self._processed_files}/{self._total_files}")
try:
self.progress['value'] = self._processed_files
except Exception:
pass
except Exception:
pass
except Exception:
pass
def _on_all_done(self, results):
"""Called in main thread when all results are available."""
try:
written = len(self.tree.get_children())
self.export_btn.config(state="normal")
try:
if getattr(self, 'app', None):
self.app.log(f"Countings finished: {written} items", "INFO")
except Exception:
pass
finally:
self._finish()
# ensure progress reflects completion
try:
self._processed_files = self._total_files
self._lbl_files.config(text=f"Files: {self._processed_files}/{self._total_files}")
self.progress['value'] = self._processed_files
except Exception:
pass
def _finish(self):
"""Finalize UI state after countings finished or cancelled."""
self.progress.stop()
self.run_btn.config(state="normal")
self.cancel_btn.config(state="disabled")
def _sort_by(self, col, descending):
"""Sort tree contents by given column. Toggle ascending/descending.
Args:
col: column key (file, code, comment, blank, total, lang)
descending: bool whether to sort descending
"""
col_map = {"name": 0, "path": 1, "code": 2, "comment": 3, "blank": 4, "total": 5, "lang": 6}
idx = col_map.get(col, 0)
children = list(self.tree.get_children(""))
def _key(item):
val = self.tree.item(item, "values")[idx]
try:
return int(val)
except Exception:
return str(val).lower()
children.sort(key=_key, reverse=descending)
for index, item in enumerate(children):
self.tree.move(item, "", index)
# update heading to reverse sort next time
self.tree.heading(col, command=lambda c=col: self._sort_by(c, not descending))
def export_csv(self):
"""Export current table contents to CSV file chosen by the user."""
path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV files", "*.csv"), ("All files", "*")])
if not path:
return
headers = ["name", "path", "code", "comment", "blank", "total", "language"]
from ..utils.csv_exporter import export_rows_to_csv
try:
rows = (self.tree.item(child, "values") for child in self.tree.get_children())
written = export_rows_to_csv(path, headers, rows)
messagebox.showinfo("Export", f"Exported {written} rows to {path}")
try:
if getattr(self, 'app', None):
self.app.log(f"Exported {written} rows to {path}", "INFO")
except Exception:
pass
except Exception as e:
messagebox.showerror("Export Error", str(e))
try:
if getattr(self, 'app', None):
self.app.log(f"Export error: {e}", "ERROR")
except Exception:
pass
def _on_double_click(self, _evt=None):
sel = self.tree.selection()
if not sel:
return
item = sel[0]
vals = self.tree.item(item, "values")
# values: (name, path, ...)
if len(vals) >= 2:
path = vals[1]
from .file_viewer import FileViewer
FileViewer(self, path)

216
pyucc/gui/file_viewer.py Normal file
View File

@ -0,0 +1,216 @@
import tkinter as tk
from tkinter import ttk
from tkinter.scrolledtext import ScrolledText
from pathlib import Path
from typing import Optional
try:
from pygments import lex
from pygments.lexers import get_lexer_for_filename, TextLexer
from pygments.token import Token
_HAS_PYGMENTS = True
except Exception:
_HAS_PYGMENTS = False
class FileViewer(tk.Toplevel):
"""Toplevel window to show file contents with optional syntax highlighting
and a simple minimap indicating code/comment/blank lines.
"""
COLORS = {
"code": "#1f77b4",
"comment": "#2ca02c",
"blank": "#d3d3d3",
}
def __init__(self, parent, path: str):
super().__init__(parent)
self.title(f"Viewer - {Path(path).name}")
self.geometry("900x600")
self.path = Path(path)
# Main panes: Text on the left, minimap on the right
main = ttk.Frame(self)
main.pack(fill="both", expand=True)
text_frame = ttk.Frame(main)
text_frame.pack(side="left", fill="both", expand=True)
self.text = ScrolledText(text_frame, wrap="none", undo=True)
self.text.pack(fill="both", expand=True)
self.text.config(state="disabled")
minimap_frame = ttk.Frame(main, width=120)
minimap_frame.pack(side="right", fill="y")
self.canvas = tk.Canvas(minimap_frame, width=120, bg="#ffffff")
self.canvas.pack(fill="y", expand=True)
# Legend under the minimap showing color mapping
legend_frame = ttk.Frame(minimap_frame)
legend_frame.pack(side="bottom", fill="x", padx=4, pady=4)
# Ordered legend entries
for key, label_text in (('code', 'Code'), ('comment', 'Comment'), ('blank', 'Blank')):
color = self.COLORS.get(key, '#cccccc')
sw = tk.Label(legend_frame, bg=color, width=2, relief='ridge')
sw.pack(side='left', padx=(2, 4))
lbl = ttk.Label(legend_frame, text=label_text)
lbl.pack(side='left', padx=(0, 8))
self._load_file()
def _load_file(self):
try:
text = self.path.read_text(errors="ignore")
except Exception as e:
self._set_text(f"Error opening file: {e}")
return
# Insert text
self._set_text(text)
# Apply highlighting
if _HAS_PYGMENTS:
self._apply_pygments_highlighting(text)
else:
self._apply_simple_highlighting()
# Build minimap
self._build_minimap(text.splitlines())
# start periodic viewport updater
self._viewport_job = None
self._schedule_viewport_update()
# cancel the job when window is closed
self.bind("<Destroy>", lambda e: self._cancel_viewport_update())
def _set_text(self, text: str):
self.text.config(state="normal")
self.text.delete("1.0", "end")
self.text.insert("1.0", text)
self.text.config(state="disabled")
def _apply_pygments_highlighting(self, text: str):
for tag in list(self.text.tag_names()):
self.text.tag_delete(tag)
try:
lexer = get_lexer_for_filename(str(self.path))
except Exception:
lexer = TextLexer()
token_map = {
Token.Comment: "comment",
Token.String: "string",
Token.Keyword: "keyword",
Token.Name: "name",
}
self.text.tag_configure("comment", foreground="#2ca02c")
self.text.tag_configure("keyword", foreground="#d62728")
self.text.tag_configure("string", foreground="#9467bd")
pos = 0
for ttype, value in lex(text, lexer):
length = len(value)
if length == 0:
continue
start_idx = self._index_from_pos(pos)
end_idx = self._index_from_pos(pos + length)
tag = None
for key, tname in token_map.items():
if ttype in key:
tag = tname
break
if tag:
try:
self.text.tag_add(tag, start_idx, end_idx)
except Exception:
pass
pos += length
def _index_from_pos(self, pos: int) -> str:
return f"1.0 + {pos} chars"
def _apply_simple_highlighting(self):
self.text.tag_configure("comment", foreground=self.COLORS["comment"])
self.text.config(state="normal")
for i, line in enumerate(self.text.get("1.0", "end").splitlines(), start=1):
s = line.lstrip()
if not s:
continue
if s.startswith("#") or s.startswith("//") or s.startswith("/*"):
start = f"{i}.0"
end = f"{i}.end"
self.text.tag_add("comment", start, end)
self.text.config(state="disabled")
def _build_minimap(self, lines):
self.canvas.delete("all")
h = max(1, len(lines))
width = int(self.canvas.winfo_reqwidth()) or 120
# determine per-line rectangle height based on canvas height and number of lines
canvas_h = max(1, self.canvas.winfo_height() or 600)
rect_h = max(1, int(max(2, min(8, canvas_h / h))))
y = 0
# store for viewport computations
self._minimap_line_height = rect_h
self._minimap_total_lines = h
for line in lines:
typ = "code"
s = line.strip()
if not s:
typ = "blank"
elif s.startswith("#") or s.startswith("//") or s.startswith("/*"):
typ = "comment"
color = self.COLORS.get(typ, "#cccccc")
self.canvas.create_rectangle(2, y, width-2, y+rect_h, fill=color, outline=color)
y += rect_h + 1
# create viewport rectangle overlay (on top)
# remove any existing viewport and create a fresh one
try:
self.canvas.delete("viewport")
except Exception:
pass
self.canvas.create_rectangle(0, 0, 0, 0, outline="#ff0000", width=1, tags=("viewport",))
# if canvas size changes, rebuild minimap
self.canvas.bind("<Configure>", lambda e: self._build_minimap(lines))
def _schedule_viewport_update(self, interval_ms: int = 200):
# schedule periodic viewport updates
def _job():
try:
self._update_viewport_rect()
finally:
self._viewport_job = self.after(interval_ms, _job)
# cancel any existing job then start
self._cancel_viewport_update()
self._viewport_job = self.after(interval_ms, _job)
def _cancel_viewport_update(self):
if getattr(self, "_viewport_job", None):
try:
self.after_cancel(self._viewport_job)
except Exception:
pass
self._viewport_job = None
def _update_viewport_rect(self):
# get top/bottom fractions from text widget
try:
top, bottom = self.text.yview()
except Exception:
return
c_h = self.canvas.winfo_height() or 1
y1 = int(top * c_h)
y2 = int(bottom * c_h)
# ensure reasonable bounds
y1 = max(0, min(y1, c_h))
y2 = max(0, min(y2, c_h))
# update the viewport rectangle
try:
self.canvas.coords("viewport", 1, y1, self.canvas.winfo_width()-1, y2)
except Exception:
pass

View File

@ -0,0 +1,232 @@
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from tkinter.scrolledtext import ScrolledText
from pathlib import Path
import threading
import queue
import csv
from datetime import datetime
from ..core.scanner import find_source_files
from ..config.languages import LANGUAGE_EXTENSIONS
from ..utils import logger as app_logger
import logging
from .countings_tab import CountingsTab
from .topbar import TopBar
from .file_viewer import FileViewer
from .scanner_tab import ScannerTab
from ..core.worker import WorkerManager
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("PyUcc - Interfaccia Grafica")
self.geometry("800x600")
# Shared top bar (folder selection) placed above the notebook
self.topbar = TopBar(self)
self.topbar.pack(fill="x", side="top")
# Status frame: cumulative counters and a scanning progress bar
status_frame = ttk.Frame(self)
status_frame.pack(fill="x", side="top", padx=6, pady=(4,4))
self._lbl_physical = ttk.Label(status_frame, text="Physical: 0")
self._lbl_code = ttk.Label(status_frame, text="Code: 0")
self._lbl_comment = ttk.Label(status_frame, text="Comments: 0")
self._lbl_blank = ttk.Label(status_frame, text="Blank: 0")
self._lbl_physical.pack(side="left", padx=(0,10))
self._lbl_code.pack(side="left", padx=(0,10))
self._lbl_comment.pack(side="left", padx=(0,10))
self._lbl_blank.pack(side="left", padx=(0,10))
# Progress bar showing progress for the current scanning task
self._scan_progress = ttk.Progressbar(status_frame, orient='horizontal', mode='determinate', length=200)
self._scan_progress.pack(side="right", padx=(0,6))
# Aggregation state
self._cumulative_counts = {"physical": 0, "code": 0, "comment": 0, "blank": 0}
# per-task progress tracking: task_id -> {total, processed}
self._task_progress = {}
self._running_tasks = set()
self._current_progress_task_id = None
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill="both", expand=True)
# Application log panel below the notebook (grouped in a labeled frame)
log_frame = ttk.LabelFrame(self, text="Log")
log_frame.pack(fill="x", side="bottom", padx=6, pady=6)
# ScrolledText for logs (read-only)
self.log_text = ScrolledText(log_frame, height=8, wrap="word", state="disabled")
self.log_text.pack(fill="both", expand=True, padx=6, pady=6)
# Initialize centralized logging system from utils.logger and attach
# the module-provided Tkinter handler to the ScrolledText widget.
try:
app_logger.setup_basic_logging(self)
except Exception:
pass
try:
color_map = {
logging.INFO: 'black',
logging.WARNING: '#d87f0a',
logging.ERROR: '#d62728',
}
app_logger.add_tkinter_handler(self.log_text, {"colors": color_map})
except Exception:
pass
# small helper: expose a convenient log method that forwards to
# the standard logging system so messages flow through the queue.
def log(self, msg: str, level: str = "INFO"):
lg = app_logger.get_logger("pyucc")
lvl = getattr(logging, level.upper(), logging.INFO)
try:
if lvl >= logging.ERROR:
lg.error(msg)
elif lvl >= logging.WARNING:
lg.warning(msg)
else:
lg.info(msg)
except Exception:
try:
print(f"[{level}] {msg}")
except Exception:
pass
self.log = log.__get__(self)
# Worker manager (background task runner)
self.worker = WorkerManager()
# Create and add application tabs
self.scanner_tab = ScannerTab(self.notebook, self.topbar, app=self)
self.notebook.add(self.scanner_tab, text="Scanner")
# Add Countings tab
self.countings_tab = CountingsTab(self.notebook, self.topbar, app=self)
self.notebook.add(self.countings_tab, text="Countings")
# poll the worker UI queue and dispatch callbacks in the main thread
self.after(100, self._poll_worker_ui_queue)
def _poll_worker_ui_queue(self):
try:
while True:
msg = self.worker.ui_queue.get_nowait()
# allow GUI to update task list and status
self._handle_worker_msg(msg)
# dispatch registered callbacks
try:
self.worker.dispatch_message(msg)
except Exception:
pass
except Exception:
# queue empty or other; schedule next poll
pass
self.after(100, self._poll_worker_ui_queue)
def _handle_worker_msg(self, msg: tuple):
"""Log worker messages to the application's log so there's a single
place (the Log panel) to follow task lifecycle and results.
"""
typ, task_id, payload = msg
if typ == "started":
name = payload.get("func") if isinstance(payload, dict) else str(payload)
total = payload.get("total") if isinstance(payload, dict) else None
if total:
self.log(f"Task {task_id[:8]} started: {name} (total={total})", level="INFO")
# initialize per-task progress tracking and mark as running
self._task_progress[task_id] = {"total": total, "processed": 0}
self._running_tasks.add(task_id)
# show this task in the progress bar
self._current_progress_task_id = task_id
try:
self._scan_progress['maximum'] = total
self._scan_progress['value'] = 0
except Exception:
pass
else:
self.log(f"Task {task_id[:8]} started: {name}", level="INFO")
elif typ == "progress":
# payload is expected to be a partial result or status dictionary
self.log(f"Task {task_id[:8]} progress: {payload}", level="INFO")
# update cumulative counters if we have counts
if isinstance(payload, dict):
phys = payload.get('physical_lines') or payload.get('physical') or 0
code = payload.get('code_lines') or payload.get('code') or 0
comm = payload.get('comment_lines') or payload.get('comment') or 0
blank = payload.get('blank_lines') or payload.get('blank') or 0
# increment global cumulative counters
self._cumulative_counts['physical'] += phys
self._cumulative_counts['code'] += code
self._cumulative_counts['comment'] += comm
self._cumulative_counts['blank'] += blank
# update labels
try:
self._lbl_physical.config(text=f"Physical: {self._cumulative_counts['physical']}")
self._lbl_code.config(text=f"Code: {self._cumulative_counts['code']}")
self._lbl_comment.config(text=f"Comments: {self._cumulative_counts['comment']}")
self._lbl_blank.config(text=f"Blank: {self._cumulative_counts['blank']}")
except Exception:
pass
# update per-task processed count and progress bar (if tracking)
meta = self._task_progress.get(task_id)
if meta is not None:
meta['processed'] += 1
if self._current_progress_task_id == task_id:
try:
self._scan_progress['value'] = meta['processed']
except Exception:
pass
elif typ == "done":
# payload may contain final results
self.log(f"Task {task_id[:8]} done. Result: {payload}", level="INFO")
# mark task finished
if task_id in self._running_tasks:
self._running_tasks.remove(task_id)
# ensure per-task processed equals total
meta = self._task_progress.get(task_id)
if meta is not None:
try:
self._scan_progress['value'] = meta['total']
except Exception:
pass
# if this was the current progress task, pick another running task if present
if self._current_progress_task_id == task_id:
self._current_progress_task_id = next(iter(self._running_tasks), None)
if self._current_progress_task_id:
meta2 = self._task_progress.get(self._current_progress_task_id)
if meta2:
try:
self._scan_progress['maximum'] = meta2['total']
self._scan_progress['value'] = meta2['processed']
except Exception:
pass
elif typ == "error":
# payload is typically a traceback string or exception info
self.log(f"Task {task_id[:8]} error: {payload}", level="ERROR")
# mark task finished
if task_id in self._running_tasks:
self._running_tasks.remove(task_id)
if task_id in self._task_progress:
self._task_progress.pop(task_id, None)
elif typ == "cancelled":
self.log(f"Task {task_id[:8]} cancelled", level="WARNING")
if task_id in self._running_tasks:
self._running_tasks.remove(task_id)
if task_id in self._task_progress:
self._task_progress.pop(task_id, None)
def run_app():
app = App()
app.mainloop()
if __name__ == "__main__":
run_app()

View File

@ -0,0 +1,217 @@
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from pathlib import Path
from typing import List
from ..config import profiles as profiles_cfg
from ..config.languages import LANGUAGE_EXTENSIONS
class ProfileManager(tk.Toplevel):
"""Dialog window to create/edit/delete scanning profiles.
Usage: pm = ProfileManager(root); pm.wait_window()
"""
COMMON_LANGS = ["Python", "C", "C++", "Java", "JavaScript", "HTML", "Shell"]
# sensible defaults for ignore patterns covering common build/artifact dirs and files
DEFAULT_IGNORES = [
"__pycache__",
"*.pyc",
"*.pyo",
"*.pyd",
".Python",
"env",
"venv",
".venv",
"build",
"dist",
"*.egg-info",
".eggs",
"node_modules",
".git",
".hg",
".svn",
".idea",
".vscode",
".DS_Store",
"*.class",
"*.o",
"*.so",
"*.dylib",
".pytest_cache",
".mypy_cache",
".cache",
"coverage",
".tox",
"pip-wheel-metadata",
]
def __init__(self, parent, on_change=None):
super().__init__(parent)
self.title("Profile Manager")
self.geometry("900x600")
self.on_change = on_change
self.profiles = profiles_cfg.load_profiles()
# Left: list of profiles
left = ttk.Frame(self)
left.pack(side="left", fill="y", padx=8, pady=8)
# Make listbox taller so names are easily visible
self.listbox = tk.Listbox(left, width=30, height=20)
self.listbox.pack(fill="y", expand=True)
for p in self.profiles:
self.listbox.insert("end", p.get("name"))
self.listbox.bind("<<ListboxSelect>>", self._on_select)
# Right: edit form
right = ttk.Frame(self)
right.pack(side="left", fill="both", expand=True, padx=8, pady=8)
ttk.Label(right, text="Profile name:").grid(row=0, column=0, sticky="w", pady=4)
self.name_var = tk.StringVar()
ttk.Entry(right, textvariable=self.name_var, width=48).grid(row=0, column=1, columnspan=2, sticky="ew", pady=4)
# Path: label above large entry spanning full right area
ttk.Label(right, text="Path:").grid(row=1, column=0, sticky="w", pady=(8, 2))
self.path_var = tk.StringVar()
ttk.Entry(right, textvariable=self.path_var, width=80).grid(row=2, column=0, columnspan=2, sticky="ew", pady=2)
ttk.Button(right, text="Browse...", command=self._browse_path).grid(row=2, column=2, sticky="w", padx=4)
ttk.Label(right, text="Languages:").grid(row=3, column=0, sticky="nw", pady=(8, 2))
# Scrollable frame for languages so checkboxes remain readable
langs_container = ttk.Frame(right)
langs_container.grid(row=3, column=1, columnspan=2, sticky="ew", pady=4)
canvas = tk.Canvas(langs_container, height=140)
vsb = ttk.Scrollbar(langs_container, orient="vertical", command=canvas.yview)
inner = ttk.Frame(canvas)
inner.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=inner, anchor="nw")
canvas.configure(yscrollcommand=vsb.set)
canvas.grid(row=0, column=0, sticky="nsew")
vsb.grid(row=0, column=1, sticky="ns")
langs_container.columnconfigure(0, weight=1)
self.lang_vars = {}
for i, ln in enumerate(self.COMMON_LANGS):
var = tk.BooleanVar()
exts = LANGUAGE_EXTENSIONS.get(ln, [])
exts_label = f" ({', '.join(exts)})" if exts else ""
cb = ttk.Checkbutton(inner, text=ln + exts_label, variable=var)
cb.grid(row=i, column=0, sticky="w", padx=2, pady=2)
self.lang_vars[ln] = var
# Custom languages: label above large text box spanning full width
ttk.Label(right, text="Custom languages (comma-separated):").grid(row=4, column=0, sticky="w", pady=(8, 2))
self.custom_text = tk.Text(right, height=3, wrap="none")
self.custom_text.grid(row=5, column=0, columnspan=3, sticky="nsew", pady=2)
# Ignore patterns: label above large text box with scrollbars
ttk.Label(right, text="Ignore patterns (comma-separated):").grid(row=6, column=0, sticky="w", pady=(8, 2))
ignore_container = ttk.Frame(right)
ignore_container.grid(row=7, column=0, columnspan=3, sticky="nsew", pady=2)
# Text widget with both vertical and horizontal scrollbars so long patterns are readable
self.ignore_text = tk.Text(ignore_container, height=4, wrap="none")
self.ignore_text.grid(row=0, column=0, sticky="nsew")
vsb_ignore = ttk.Scrollbar(ignore_container, orient="vertical", command=self.ignore_text.yview)
vsb_ignore.grid(row=0, column=1, sticky="ns")
hsb_ignore = ttk.Scrollbar(ignore_container, orient="horizontal", command=self.ignore_text.xview)
hsb_ignore.grid(row=1, column=0, columnspan=2, sticky="ew")
self.ignore_text.configure(yscrollcommand=vsb_ignore.set, xscrollcommand=hsb_ignore.set)
ignore_container.columnconfigure(0, weight=1)
ignore_container.rowconfigure(0, weight=1)
# Buttons (place below the large edit boxes)
btn_frame = ttk.Frame(right)
btn_frame.grid(row=8, column=0, columnspan=3, pady=(12, 0))
ttk.Button(btn_frame, text="New", command=self._new).grid(row=0, column=0, padx=4)
ttk.Button(btn_frame, text="Save", command=self._save).grid(row=0, column=1, padx=4)
ttk.Button(btn_frame, text="Delete", command=self._delete).grid(row=0, column=2, padx=4)
ttk.Button(btn_frame, text="Close", command=self.destroy).grid(row=0, column=3, padx=4)
right.columnconfigure(1, weight=1)
# If no profiles exist, prefill form with sensible defaults (widgets are ready)
if not self.profiles:
self._new()
def _browse_path(self):
d = filedialog.askdirectory()
if d:
self.path_var.set(str(Path(d)))
def _on_select(self, _evt=None):
sel = self.listbox.curselection()
if not sel:
return
idx = sel[0]
pr = self.profiles[idx]
self._load_profile(pr)
def _load_profile(self, pr):
self.name_var.set(pr.get("name", ""))
self.path_var.set(pr.get("path", ""))
langs = pr.get("languages", []) or []
for ln, var in self.lang_vars.items():
var.set(ln in langs)
custom = ",".join([l for l in langs if l not in self.COMMON_LANGS])
# populate custom_text (large edit box)
self.custom_text.delete("1.0", "end")
if custom:
self.custom_text.insert("1.0", custom)
# populate ignore_text
self.ignore_text.delete("1.0", "end")
ignores = ",".join(pr.get("ignore", []))
if ignores:
self.ignore_text.insert("1.0", ignores)
def _new(self):
self.name_var.set("")
self.path_var.set("")
for var in self.lang_vars.values():
var.set(False)
self.custom_text.delete("1.0", "end")
self.ignore_text.delete("1.0", "end")
# insert the DEFAULT_IGNORES joined by commas
self.ignore_text.insert("1.0", ",".join(self.DEFAULT_IGNORES))
def _save(self):
name = self.name_var.get().strip()
if not name:
messagebox.showwarning("Validation", "Profile must have a name.")
return
path = self.path_var.get().strip()
langs: List[str] = [ln for ln, var in self.lang_vars.items() if var.get()]
raw_custom = self.custom_text.get("1.0", "end").strip()
raw_custom = raw_custom.replace("\n", ",")
custom = [c.strip() for c in raw_custom.split(",") if c.strip()]
langs.extend([c for c in custom if c])
raw_ignore = self.ignore_text.get("1.0", "end").strip()
raw_ignore = raw_ignore.replace("\n", ",")
ignore = [s.strip() for s in raw_ignore.split(",") if s.strip()]
profile = {"name": name, "path": path, "languages": langs, "ignore": ignore}
profiles_cfg.add_or_update_profile(profile)
self.profiles = profiles_cfg.load_profiles()
# refresh listbox
self.listbox.delete(0, "end")
for p in self.profiles:
self.listbox.insert("end", p.get("name"))
messagebox.showinfo("Saved", f"Profile '{name}' saved.")
if self.on_change:
self.on_change()
def _delete(self):
name = self.name_var.get().strip()
if not name:
return
if not messagebox.askyesno("Delete", f"Delete profile '{name}'?"):
return
profiles_cfg.delete_profile(name)
self.profiles = profiles_cfg.load_profiles()
self.listbox.delete(0, "end")
for p in self.profiles:
self.listbox.insert("end", p.get("name"))
self._new()
if self.on_change:
self.on_change()

269
pyucc/gui/scanner_tab.py Normal file
View File

@ -0,0 +1,269 @@
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from pathlib import Path
import threading
import queue
import csv
from datetime import datetime
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from pathlib import Path
import threading
import queue
import csv
from datetime import datetime
from ..core.scanner import find_source_files
from ..config.languages import LANGUAGE_EXTENSIONS
from ..utils import logger as app_logger
import logging
from .file_viewer import FileViewer
class ScannerTab(ttk.Frame):
"""Tab that runs the source files scanner and shows results.
The ScannerTab relies on a shared `topbar` instance for folder
selection. This reduces duplication between tabs.
"""
def __init__(self, parent, topbar, app=None, *args, **kwargs):
"""Initialize the ScannerTab.
Args:
parent: Notebook widget that contains this tab.
topbar: Instance of `TopBar` exposing `path_var`.
"""
super().__init__(parent, *args, **kwargs)
self.topbar = topbar
self.app = app
self.queue = queue.Queue()
self.worker = None
# Controls
controls = ttk.Frame(self)
controls.grid(row=0, column=0, sticky="ew", padx=8, pady=6)
self.scan_btn = ttk.Button(controls, text="Start Scan", command=self.start_scan)
self.scan_btn.grid(row=0, column=0, sticky="w")
self.cancel_btn = ttk.Button(controls, text="Cancel", command=self.cancel_scan, state="disabled")
self.cancel_btn.grid(row=0, column=1, padx=(8, 0))
# Export button next to scanner controls
self.export_btn = ttk.Button(controls, text="Export CSV", command=self._export_csv, state="disabled")
self.export_btn.grid(row=0, column=2, padx=(8, 0))
# Progress and output
self.progress = ttk.Progressbar(self, mode="indeterminate")
self.progress.grid(row=1, column=0, sticky="ew", padx=8, pady=(6, 0))
# Treeview for scanner results: filename, path, size, created, modified
columns = ("name", "path", "size", "created", "modified")
self.tree = ttk.Treeview(self, columns=columns, show="headings")
self.tree.heading("name", text="File", command=lambda: self._sort_by("name", False))
self.tree.heading("path", text="Path", command=lambda: self._sort_by("path", False))
self.tree.heading("size", text="Size (bytes)", command=lambda: self._sort_by("size", False))
self.tree.heading("created", text="Created", command=lambda: self._sort_by("created", False))
self.tree.heading("modified", text="Modified", command=lambda: self._sort_by("modified", False))
# column sizing: make numeric columns narrower and right-aligned
self.tree.column("name", width=300, anchor="w", stretch=True)
self.tree.column("path", width=400, anchor="w", stretch=True)
self.tree.column("size", width=90, anchor="e", stretch=False)
self.tree.column("created", width=160, anchor="w", stretch=False)
self.tree.column("modified", width=160, anchor="w", stretch=False)
self.tree.grid(row=2, column=0, sticky="nsew", padx=8, pady=8)
vsb = ttk.Scrollbar(self, orient="vertical", command=self.tree.yview)
hs = ttk.Scrollbar(self, orient="horizontal", command=self.tree.xview)
self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hs.set)
vsb.grid(row=2, column=1, sticky="ns")
hs.grid(row=3, column=0, columnspan=1, sticky="ew", padx=8)
self.rowconfigure(2, weight=1)
self.columnconfigure(0, weight=1)
# bind double-click to open file viewer
self.tree.bind("<Double-1>", self._on_double_click)
def start_scan(self):
"""Start scanning the folder configured in the shared TopBar."""
path = self.topbar.path_var.get().strip()
if not path:
messagebox.showwarning("Missing folder", "Please select a folder to analyze first.")
return
directory = Path(path)
if not directory.is_dir():
messagebox.showerror("Error", f"The path '{path}' is not a valid folder.")
return
self.scan_btn.config(state="disabled")
# log start
try:
if getattr(self, 'app', None):
self.app.log(f"Scan started on: {path}", "INFO")
except Exception:
pass
self.cancel_btn.config(state="normal")
# clear tree
for item in self.tree.get_children():
self.tree.delete(item)
self.progress.start(50)
self.worker = threading.Thread(target=self._worker_scan, args=(directory,), daemon=True)
self.worker.start()
self.after(200, self._poll_queue)
def cancel_scan(self):
"""Request cancellation (informational only — scanner must support cancel)."""
if self.worker and self.worker.is_alive():
messagebox.showinfo("Cancel", "Cancellation requested but not supported by the worker.")
try:
if getattr(self, 'app', None):
self.app.log("Scan cancelled by user", "WARNING")
except Exception:
pass
self._finish_scan()
def _worker_scan(self, directory: Path):
"""Worker thread: call `find_source_files` and push results to queue.
If a profile is selected in the TopBar, build an allowed extensions set
from the profile languages and pass it to the scanner so only
relevant files are returned.
"""
try:
allowed_exts = None
pr = getattr(self.topbar, "current_profile", None)
# fallback: if current_profile not set, try to resolve from combobox selection
if not pr:
pname = getattr(self.topbar, "profile_var", None)
if pname:
from ..config import profiles as profiles_cfg
pr = profiles_cfg.find_profile(pname.get()) if hasattr(pname, "get") else profiles_cfg.find_profile(str(pname))
if pr:
langs = pr.get("languages", []) or []
exts = []
for ln in langs:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
else:
val = ln.strip()
if val.startswith('.'):
exts.append(val.lower())
elif len(val) <= 5 and val.isalnum():
# simple heuristic: treat short alnum tokens as extension-like
exts.append(f".{val.lower()}")
else:
pass
if exts:
allowed_exts = set(exts)
ignore_patterns = pr.get("ignore", []) if pr else None
files = find_source_files(directory, allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns)
self.queue.put(("done", files))
except Exception as e:
self.queue.put(("error", str(e)))
def _poll_queue(self):
"""Poll the internal queue and update UI when results arrive."""
try:
while True:
item = self.queue.get_nowait()
tag, payload = item
if tag == "done":
files = payload
# populate tree with file metadata
for p in files:
try:
stat = p.stat()
size = stat.st_size
created = datetime.fromtimestamp(stat.st_ctime).isoformat(sep=' ', timespec='seconds')
modified = datetime.fromtimestamp(stat.st_mtime).isoformat(sep=' ', timespec='seconds')
except Exception:
size = 0
created = ""
modified = ""
self.tree.insert("", "end", values=(p.name, str(p), size, created, modified))
self.export_btn.config(state="normal")
try:
if getattr(self, 'app', None):
self.app.log(f"Scan completed: {len(files)} files found", "INFO")
except Exception:
pass
self._finish_scan()
elif tag == "error":
messagebox.showerror("Error", f"Error during scanning: {payload}")
try:
if getattr(self, 'app', None):
self.app.log(f"Error during scanning: {payload}", "ERROR")
except Exception:
pass
self._finish_scan()
except queue.Empty:
if self.worker and self.worker.is_alive():
self.after(200, self._poll_queue)
def _on_double_click(self, _evt=None):
sel = self.tree.selection()
if not sel:
return
item = sel[0]
vals = self.tree.item(item, "values")
# values: (name, path, size, created, modified)
if len(vals) >= 2:
path = vals[1]
FileViewer(self, path)
def _finish_scan(self):
"""Finalize UI state after scan finished or cancelled."""
self.progress.stop()
self.scan_btn.config(state="normal")
self.cancel_btn.config(state="disabled")
def _sort_by(self, col, descending):
col_map = {"name": 0, "path": 1, "size": 2, "created": 3, "modified": 4}
idx = col_map.get(col, 0)
children = list(self.tree.get_children(""))
def _key(item):
val = self.tree.item(item, "values")[idx]
# numeric for size
if col == "size":
try:
return int(val)
except Exception:
return 0
# otherwise string
try:
return str(val).lower()
except Exception:
return ""
children.sort(key=_key, reverse=descending)
for index, item in enumerate(children):
self.tree.move(item, "", index)
self.tree.heading(col, command=lambda c=col: self._sort_by(c, not descending))
def _export_csv(self):
path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV files", "*.csv"), ("All files", "*")])
if not path:
return
headers = ["name", "path", "size", "created", "modified"]
from ..utils.csv_exporter import export_rows_to_csv
try:
rows = (self.tree.item(child, "values") for child in self.tree.get_children())
written = export_rows_to_csv(path, headers, rows)
messagebox.showinfo("Export", f"Exported {written} rows to {path}")
try:
if getattr(self, 'app', None):
self.app.log(f"Exported {written} rows to {path}", "INFO")
except Exception:
pass
except Exception as e:
messagebox.showerror("Export Error", str(e))
try:
if getattr(self, 'app', None):
self.app.log(f"Export error: {e}", "ERROR")
except Exception:
pass

95
pyucc/gui/topbar.py Normal file
View File

@ -0,0 +1,95 @@
import tkinter as tk
from tkinter import ttk, filedialog
from pathlib import Path
from .profile_manager import ProfileManager
from ..config import profiles as profiles_cfg
class TopBar(ttk.Frame):
"""Shared top bar containing profile selection and folder selection.
The TopBar exposes `path_var` (StringVar) and `current_profile` dict
that other tabs can read to apply profile-specific settings.
"""
def __init__(self, parent, *args, **kwargs):
"""Initialize the TopBar.
Args:
parent: The parent Tk widget where the top bar will be placed.
"""
super().__init__(parent, *args, **kwargs)
self.path_var = tk.StringVar()
self.current_profile = None
# Profiles combobox
ttk.Label(self, text="Profile:").grid(row=0, column=0, sticky="w", padx=(8, 4), pady=8)
self.profile_var = tk.StringVar()
self.profile_cb = ttk.Combobox(self, textvariable=self.profile_var, state="readonly")
self._load_profiles()
self.profile_cb.grid(row=0, column=1, sticky="ew", padx=(0, 6))
self.profile_cb.bind("<<ComboboxSelected>>", self._on_profile_selected)
manage_btn = ttk.Button(self, text="Manage...", command=self._open_manager)
manage_btn.grid(row=0, column=2, sticky="e", padx=(0, 4))
# Info area: project type above folder label (read-only, driven by profile)
info = ttk.Frame(self)
info.grid(row=0, column=3, columnspan=3, sticky="ew", padx=(8, 8))
ttk.Label(info, text="Type:").grid(row=0, column=0, sticky="w")
self.project_type_var = tk.StringVar(value="-")
ttk.Label(info, textvariable=self.project_type_var).grid(row=0, column=1, sticky="w", padx=(6, 0))
ttk.Label(info, text="Folder:").grid(row=1, column=0, sticky="w")
# the folder label still mirrors `path_var` for backward compatibility
ttk.Label(info, textvariable=self.path_var).grid(row=1, column=1, sticky="w", padx=(6, 0))
self.columnconfigure(1, weight=0)
self.columnconfigure(3, weight=1)
def _load_profiles(self):
profs = profiles_cfg.load_profiles()
names = [p.get("name") for p in profs]
self.profile_cb["values"] = names
def _on_profile_selected(self, _evt=None):
name = self.profile_var.get()
if not name:
return
pr = profiles_cfg.find_profile(name)
if not pr:
return
self.current_profile = pr
# Set folder and optionally other UI hints
self.path_var.set(pr.get("path", ""))
# determine a simple project type hint from profile languages
langs = pr.get("languages", []) or []
ptype = ""
if "Python" in langs:
ptype = "Python"
elif "C++" in langs or "C" in langs:
ptype = "C/C++"
elif "Java" in langs:
ptype = "Java"
elif len(langs) == 1:
ptype = langs[0]
elif langs:
ptype = ",".join(langs)
else:
ptype = "Unknown"
self.project_type_var.set(ptype)
def _open_manager(self):
def _refresh():
self._load_profiles()
pm = ProfileManager(self.master, on_change=_refresh)
pm.grab_set()
def browse(self) -> None:
"""Open a directory selection dialog and update `path_var`.
The selected path is stored as an absolute string in `path_var`.
"""
directory = filedialog.askdirectory()
if directory:
self.path_var.set(str(Path(directory)))

0
pyucc/utils/__init__.py Normal file
View File

View File

@ -0,0 +1,25 @@
from typing import Iterable, Sequence
import csv
from pathlib import Path
def export_rows_to_csv(path: str | Path, headers: Sequence[str], rows: Iterable[Sequence]) -> int:
"""Export rows to a CSV file.
Args:
path: destination file path (string or Path)
headers: list/sequence of header strings
rows: iterable of row sequences (each row is a sequence of values)
Returns:
int: number of rows written (excluding header)
"""
p = Path(path)
written = 0
with p.open("w", newline="", encoding="utf-8") as fh:
writer = csv.writer(fh)
writer.writerow(list(headers))
for row in rows:
writer.writerow(list(row))
written += 1
return written

462
pyucc/utils/logger.py Normal file
View File

@ -0,0 +1,462 @@
"""Centralized logging helpers used by the GUI and core.
This module implements a queue-based logging system that safely forwards
LogRecord objects from background threads into GUI-updated handlers. It
provides a small Tkinter-friendly handler, queue integration helpers and
convenience utilities for applying saved logger levels.
Public API highlights:
- :func:`setup_basic_logging` configure the global queue-based system.
- :func:`add_tkinter_handler` attach a Tkinter-based log view.
- :func:`get_logger` convenience wrapper around :mod:`logging`.
"""
import logging
import logging.handlers # For RotatingFileHandler
import tkinter as tk
from tkinter.scrolledtext import ScrolledText
from queue import Queue, Empty as QueueEmpty
from typing import Optional, Dict, Any
from contextlib import contextmanager
from logging import Logger
try:
from target_simulator.utils.config_manager import ConfigManager
except Exception:
# Fallback minimal ConfigManager for environments where the
# external dependency is not available. The real project provides
# richer behavior; this stub supplies only the surface used by
# apply_saved_logger_levels and avoids import-time failures.
class ConfigManager:
def __init__(self):
self.filepath = None
def get_general_settings(self):
return {}
import os
import json
# Module-level logger for utils.logging helpers
logger = logging.getLogger(__name__)
# --- Module-level globals for the centralized logging queue system ---
_global_log_queue: Optional[Queue[logging.LogRecord]] = None
_actual_console_handler: Optional[logging.StreamHandler] = None
_actual_file_handler: Optional[logging.handlers.RotatingFileHandler] = None
_actual_tkinter_handler: Optional["TkinterTextHandler"] = None
_log_processor_after_id: Optional[str] = None
_logging_system_active: bool = False
_tk_root_instance_for_processing: Optional[tk.Tk] = None
_base_formatter: Optional[logging.Formatter] = None
# Ottimizzazioni: polling adattivo e batching
GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS = 200 # Ridotto a 200ms (5Hz invece di 10Hz)
LOG_BATCH_SIZE = 50 # Processa max 50 log per ciclo per evitare blocchi GUI
_last_log_time = 0.0 # Per polling adattivo
class TkinterTextHandler(logging.Handler):
"""
A logging handler that directs log messages to a Tkinter Text widget.
This handler is called directly from the GUI thread's processing loop.
Optimizations:
- Batches multiple log entries to reduce Tkinter widget operations
- Limits total widget size to prevent memory bloat
- Only scrolls to end if user hasn't scrolled up manually
"""
def __init__(
self, text_widget: tk.Text, level_colors: Dict[int, str], max_lines: int = 1000
):
super().__init__()
self.text_widget = text_widget
self.level_colors = level_colors
self.max_lines = max_lines
self._pending_records = [] # Buffer per batching
self._last_yview = None # Track user scroll position
self._configure_tags()
def _configure_tags(self):
for level, color_value in self.level_colors.items():
level_name = logging.getLevelName(level)
if color_value:
try:
self.text_widget.tag_config(level_name, foreground=color_value)
except tk.TclError:
pass # Widget might not be ready
def emit(self, record: logging.LogRecord):
"""Buffer the record for batch processing."""
try:
if not self.text_widget.winfo_exists():
return
self._pending_records.append(record)
except Exception as e:
print(f"Error in TkinterTextHandler.emit: {e}", flush=True)
def flush_pending(self):
"""Flush all pending log records to the widget in a single operation."""
if not self._pending_records:
return
try:
if not self.text_widget.winfo_exists():
self._pending_records.clear()
return
# Check if user has scrolled away from bottom
yview = self.text_widget.yview()
user_at_bottom = yview[1] >= 0.98 # Within 2% of bottom
# Single state change for all inserts
self.text_widget.configure(state=tk.NORMAL)
# Batch insert all pending records
for record in self._pending_records:
msg = self.format(record)
level_name = record.levelname
self.text_widget.insert(tk.END, msg + "\n", (level_name,))
# Trim old lines if exceeded max
line_count = int(self.text_widget.index("end-1c").split(".")[0])
if line_count > self.max_lines:
excess = line_count - self.max_lines
self.text_widget.delete("1.0", f"{excess}.0")
self.text_widget.configure(state=tk.DISABLED)
# Only auto-scroll if user was at bottom
if user_at_bottom:
self.text_widget.see(tk.END)
self._pending_records.clear()
except Exception as e:
print(f"Error in TkinterTextHandler.flush_pending: {e}", flush=True)
self._pending_records.clear()
class QueuePuttingHandler(logging.Handler):
"""
A simple handler that puts any received LogRecord into a global queue.
"""
def __init__(self, handler_queue: Queue[logging.LogRecord]):
super().__init__()
self.handler_queue = handler_queue
def emit(self, record: logging.LogRecord):
self.handler_queue.put_nowait(record)
def _process_global_log_queue():
"""
GUI Thread: Periodically processes LogRecords from the _global_log_queue
and dispatches them to the actual configured handlers.
Optimizations:
- Processes logs in batches (max LOG_BATCH_SIZE per cycle)
- Adaptive polling: faster when logs are active, slower when idle
- Single flush operation for Tkinter handler (batched writes)
"""
global _logging_system_active, _log_processor_after_id, _last_log_time
import time
if (
not _logging_system_active
or not _tk_root_instance_for_processing
or not _tk_root_instance_for_processing.winfo_exists()
):
return
processed_count = 0
try:
# Process up to LOG_BATCH_SIZE records per cycle to avoid GUI freezes
while (
_global_log_queue
and not _global_log_queue.empty()
and processed_count < LOG_BATCH_SIZE
):
record = _global_log_queue.get_nowait()
# Console and file handlers write immediately (fast, non-blocking)
if _actual_console_handler:
_actual_console_handler.handle(record)
if _actual_file_handler:
_actual_file_handler.handle(record)
# Tkinter handler buffers the record (no widget operations yet)
if _actual_tkinter_handler:
_actual_tkinter_handler.handle(record)
_global_log_queue.task_done()
processed_count += 1
_last_log_time = time.time()
except QueueEmpty:
pass
except Exception as e:
print(f"Error in log processing queue: {e}", flush=True)
# Flush all pending Tkinter records in a single batch operation
try:
if _actual_tkinter_handler and hasattr(
_actual_tkinter_handler, "flush_pending"
):
_actual_tkinter_handler.flush_pending()
except Exception as e:
print(f"Error flushing Tkinter logs: {e}", flush=True)
# Adaptive polling: faster interval if logs are recent, slower when idle
try:
time_since_last_log = time.time() - _last_log_time
if time_since_last_log < 2.0 or processed_count >= LOG_BATCH_SIZE:
# Recent activity or queue backlog: poll faster
next_interval = GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS
elif time_since_last_log < 10.0:
# Moderate activity: normal polling
next_interval = GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS * 2
else:
# Idle: slow polling to reduce CPU
next_interval = GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS * 5
except Exception:
next_interval = GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS
# Schedule next processing cycle
if _logging_system_active:
_log_processor_after_id = _tk_root_instance_for_processing.after(
int(next_interval), _process_global_log_queue
)
def setup_basic_logging(
root_tk_instance_for_processor: tk.Tk,
logging_config_dict: Optional[Dict[str, Any]] = None,
):
"""Configure the global, queue-based logging system.
This sets up a small logging queue and a background processor that is
polled from the provided Tk root. The function also attaches a console
handler immediately so logs are visible before the GUI polling loop
begins.
Args:
root_tk_instance_for_processor: Tk root used to schedule the queue
processing callback via :meth:`tkinter.Tk.after`.
logging_config_dict: Optional mapping controlling format, levels and
enabled handlers.
"""
global _global_log_queue, _actual_console_handler, _actual_file_handler, _logging_system_active
global _tk_root_instance_for_processing, _log_processor_after_id, _base_formatter
if _logging_system_active:
return
if logging_config_dict is None:
logging_config_dict = {}
log_format_str = logging_config_dict.get(
"format", "%(asctime)s [%(levelname)-8s] %(name)-25s : %(message)s"
)
log_date_format_str = logging_config_dict.get("date_format", "%Y-%m-%d %H:%M:%S")
_base_formatter = logging.Formatter(log_format_str, datefmt=log_date_format_str)
_global_log_queue = Queue()
_tk_root_instance_for_processing = root_tk_instance_for_processor
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
root_logger.setLevel(logging_config_dict.get("default_root_level", logging.INFO))
if logging_config_dict.get("enable_console", True):
_actual_console_handler = logging.StreamHandler()
_actual_console_handler.setFormatter(_base_formatter)
_actual_console_handler.setLevel(logging.DEBUG)
# DO NOT attach console handler directly to root logger - it will be
# processed through the queue system to avoid duplicate output
queue_putter = QueuePuttingHandler(handler_queue=_global_log_queue)
queue_putter.setLevel(logging.DEBUG)
root_logger.addHandler(queue_putter)
# Emit a small startup message so users running from console see logging is active
try:
root_logger.debug("Logging system initialized (queue-based).")
except Exception:
pass
_logging_system_active = True
_log_processor_after_id = _tk_root_instance_for_processing.after(
GLOBAL_LOG_QUEUE_POLL_INTERVAL_MS, _process_global_log_queue
)
def add_tkinter_handler(gui_log_widget: tk.Text, logging_config_dict: Dict[str, Any]):
global _actual_tkinter_handler, _base_formatter
if not _logging_system_active or not _base_formatter:
return
if _actual_tkinter_handler:
_actual_tkinter_handler.close()
if (
isinstance(gui_log_widget, (tk.Text, ScrolledText))
and gui_log_widget.winfo_exists()
):
level_colors = logging_config_dict.get("colors", {})
_actual_tkinter_handler = TkinterTextHandler(
text_widget=gui_log_widget, level_colors=level_colors
)
_actual_tkinter_handler.setFormatter(_base_formatter)
_actual_tkinter_handler.setLevel(logging.DEBUG)
logger.info("Tkinter log handler added successfully.")
else:
print(
"ERROR: GUI log widget invalid, cannot add TkinterTextHandler.", flush=True
)
def get_logger(name: str) -> logging.Logger:
"""Return a :class:`logging.Logger` instance for the given name.
This is a thin wrapper over :func:`logging.getLogger` kept for callers in
the project for clarity and possible future extension.
"""
return logging.getLogger(name)
def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)
@contextmanager
def temporary_log_level(logger: Logger, level: int):
"""Context manager to temporarily set a logger's level.
Usage:
with temporary_log_level(logging.getLogger('some.name'), logging.DEBUG):
# inside this block the logger will be DEBUG
...
"""
old_level = logger.level
logger.setLevel(level)
try:
yield
finally:
logger.setLevel(old_level)
def shutdown_logging_system():
global _logging_system_active, _log_processor_after_id
if not _logging_system_active:
return
_logging_system_active = False
if (
_log_processor_after_id
and _tk_root_instance_for_processing
and _tk_root_instance_for_processing.winfo_exists()
):
_tk_root_instance_for_processing.after_cancel(_log_processor_after_id)
# Final flush of the queue
_process_global_log_queue()
logging.shutdown()
def apply_saved_logger_levels():
"""Apply saved logger levels read from configuration at startup.
Loads preferences from a `logger_prefs.json` next to the settings file or
falls back to ``general.logger_panel.saved_levels`` inside settings.
Each configured logger name will be set to the configured level if valid.
"""
try:
cfg = ConfigManager()
except Exception:
return
try:
# Prefer a dedicated logger_prefs.json next to the main settings file
prefs_path = None
cfg_path = getattr(cfg, "filepath", None)
if cfg_path:
prefs_path = os.path.join(os.path.dirname(cfg_path), "logger_prefs.json")
saved = {}
if prefs_path and os.path.exists(prefs_path):
try:
with open(prefs_path, "r", encoding="utf-8") as f:
jp = json.load(f)
if isinstance(jp, dict):
saved = jp.get("saved_levels", {}) or {}
except Exception:
saved = {}
else:
# Fallback to settings.json general.logger_panel
try:
gen = cfg.get_general_settings()
lp = gen.get("logger_panel", {}) if isinstance(gen, dict) else {}
saved = lp.get("saved_levels", {}) if isinstance(lp, dict) else {}
except Exception:
saved = {}
for name, lvl_name in (saved or {}).items():
try:
lvl_val = logging.getLevelName(lvl_name)
if isinstance(lvl_val, int):
logging.getLogger(name).setLevel(lvl_val)
except Exception:
pass
except Exception:
pass
def apply_saved_logger_levels():
"""Apply saved logger levels from ConfigManager at startup.
Reads `general.logger_panel.saved_levels` from settings.json and sets
each configured logger to the saved level name.
"""
try:
cfg = ConfigManager()
except Exception:
return
try:
# Prefer a dedicated logger_prefs.json next to the main settings file
prefs_path = None
cfg_path = getattr(cfg, "filepath", None)
if cfg_path:
prefs_path = os.path.join(os.path.dirname(cfg_path), "logger_prefs.json")
saved = {}
if prefs_path and os.path.exists(prefs_path):
try:
with open(prefs_path, "r", encoding="utf-8") as f:
jp = json.load(f)
if isinstance(jp, dict):
saved = jp.get("saved_levels", {}) or {}
except Exception:
saved = {}
else:
# Fallback to settings.json general.logger_panel
try:
gen = cfg.get_general_settings()
lp = gen.get("logger_panel", {}) if isinstance(gen, dict) else {}
saved = lp.get("saved_levels", {}) if isinstance(lp, dict) else {}
except Exception:
saved = {}
for name, lvl_name in (saved or {}).items():
try:
lvl_val = logging.getLevelName(lvl_name)
if isinstance(lvl_val, int):
logging.getLogger(name).setLevel(lvl_val)
except Exception:
pass
except Exception:
pass

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
pygount
lizard
Pygments

17
run Normal file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -euo pipefail
# Launcher script to run the project using the local .venv Python
# Usage: ./run [args...]
PROJECT_ROOT="$(cd "$(dirname "$0")" && pwd)"
VENV="$PROJECT_ROOT/.venv"
PYTHON="$VENV/bin/python"
if [ ! -x "$PYTHON" ]; then
echo "Virtualenv not found or incomplete at $VENV." >&2
echo "Create it with: python3 -m venv .venv && .venv/bin/python -m pip install -r requirements.txt" >&2
exit 1
fi
exec "$PYTHON" -m pyucc "$@"

1
test.txt Normal file
View File

@ -0,0 +1 @@
python3 -m pyucc . --outdir ./output_test

34
tests/test_countings.py Normal file
View File

@ -0,0 +1,34 @@
import unittest
import tempfile
from pathlib import Path
from pyucc.core.countings_impl import analyze_file_counts
class TestCountings(unittest.TestCase):
def test_analyze_file_counts_basic(self):
content = """# comment line
print('hello')
# another comment
"""
with tempfile.NamedTemporaryFile("w", delete=False, suffix=".py") as tf:
tf.write(content)
tmpname = tf.name
p = Path(tmpname)
res = analyze_file_counts(p)
# Basic structure checks
self.assertIn("file", res)
self.assertIn("physical_lines", res)
self.assertIn("code_lines", res)
self.assertIn("blank_lines", res)
self.assertIn("language", res)
# Types
self.assertIsInstance(res["physical_lines"], int)
self.assertIsInstance(res["code_lines"], int)
self.assertIsInstance(res["blank_lines"], int)
if __name__ == '__main__':
unittest.main()

5
todo.md Normal file
View File

@ -0,0 +1,5 @@
-[ ] Numeri da inviare per kpi
-[ ] linee logiche codice totali
-[ ] quelle cancellate
-[ ] quelle modificate
-[ ] nuove