437 lines
17 KiB
Python
437 lines
17 KiB
Python
"""Implementazione `countings` usando il CLI `pygount` (JSON) con fallback.
|
|
|
|
Questo modulo fornisce `analyze_file_counts` e `analyze_paths`.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Iterable, List
|
|
import json
|
|
import subprocess
|
|
import logging
|
|
import threading
|
|
import hashlib
|
|
|
|
try:
|
|
import pygount # type: ignore
|
|
|
|
_HAS_PYGOUNT = True
|
|
except Exception:
|
|
_HAS_PYGOUNT = False
|
|
|
|
_LOG = logging.getLogger(__name__)
|
|
|
|
# Cache to store counting results by file content hash ONLY
|
|
# This ensures deterministic results for identical file contents regardless of file location
|
|
# Key: content_hash (MD5), Value: counting results dict
|
|
# Using only hash (not path) ensures same content always returns same result,
|
|
# even if file is in different directories (e.g., baseline snapshots vs current code)
|
|
# NOTE: pygount IS deterministic - same file content = same numeric results
|
|
_COUNTING_CACHE = {}
|
|
_CACHE_LOCK = threading.Lock()
|
|
|
|
|
|
def _map_pygount_json_item(item: Dict[str, Any]) -> Dict[str, Any]:
|
|
# Support multiple pygount JSON shapes and key names
|
|
# Priority order: pygount 3.x format first, then older formats
|
|
# CRITICAL: Use 'is not None' instead of 'or' to handle 0 values correctly
|
|
physical = (
|
|
item.get("lineCount") # pygount 3.x
|
|
if item.get("lineCount") is not None
|
|
else (
|
|
item.get("raw_total_lines")
|
|
if item.get("raw_total_lines") is not None
|
|
else (
|
|
item.get("n_lines")
|
|
if item.get("n_lines") is not None
|
|
else (
|
|
item.get("lines")
|
|
if item.get("lines") is not None
|
|
else (
|
|
item.get("raw_lines")
|
|
if item.get("raw_lines") is not None
|
|
else (
|
|
item.get("line_count")
|
|
if item.get("line_count") is not None
|
|
else 0
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
# For code: use sourceCount (includes code + string lines)
|
|
# Note: In pygount 3.x, stringCount is NOT provided per-file, only in aggregates
|
|
# sourceCount = codeCount + stringCount (where strings are multi-line strings)
|
|
# Using sourceCount ensures: sourceCount + documentationCount + emptyCount = lineCount
|
|
code = (
|
|
item.get("sourceCount") # pygount 3.x (code + strings)
|
|
if item.get("sourceCount") is not None
|
|
else (
|
|
item.get("codeCount") # pygount 3.x (code only, excludes strings)
|
|
if item.get("codeCount") is not None
|
|
else (
|
|
item.get("code")
|
|
if item.get("code") is not None
|
|
else (
|
|
item.get("n_code")
|
|
if item.get("n_code") is not None
|
|
else (
|
|
item.get("n_code_lines")
|
|
if item.get("n_code_lines") is not None
|
|
else (
|
|
item.get("code_lines")
|
|
if item.get("code_lines") is not None
|
|
else 0
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
comment = (
|
|
item.get("documentationCount") # pygount 3.x
|
|
if item.get("documentationCount") is not None
|
|
else (
|
|
item.get("comment")
|
|
if item.get("comment") is not None
|
|
else (
|
|
item.get("n_comment")
|
|
if item.get("n_comment") is not None
|
|
else (
|
|
item.get("n_comment_lines")
|
|
if item.get("n_comment_lines") is not None
|
|
else (
|
|
item.get("comment_lines")
|
|
if item.get("comment_lines") is not None
|
|
else 0
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
blank = (
|
|
item.get("emptyCount") # pygount 3.x
|
|
if item.get("emptyCount") is not None
|
|
else (
|
|
item.get("blank")
|
|
if item.get("blank") is not None
|
|
else (
|
|
item.get("n_blank")
|
|
if item.get("n_blank") is not None
|
|
else (
|
|
item.get("blank_lines")
|
|
if item.get("blank_lines") is not None
|
|
else (
|
|
item.get("empty_count")
|
|
if item.get("empty_count") is not None
|
|
else 0
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
language = (
|
|
item.get("language")
|
|
or item.get("languageName")
|
|
or item.get("lang")
|
|
or "unknown"
|
|
)
|
|
|
|
file_path = (
|
|
item.get("filename")
|
|
or item.get("file")
|
|
or item.get("path")
|
|
or item.get("name")
|
|
or ""
|
|
)
|
|
|
|
result_dict = {
|
|
"file": file_path,
|
|
"physical_lines": int(physical),
|
|
"code_lines": int(code),
|
|
"comment_lines": int(comment),
|
|
"blank_lines": int(blank),
|
|
"language": language,
|
|
}
|
|
|
|
# Note: pygount 3.x counts separately: sourceCount (code+strings), documentation, empty
|
|
# When using sourceCount: sourceCount + documentationCount + emptyCount = lineCount
|
|
# We map: sourceCount→code_lines, documentationCount→comment_lines, emptyCount→blank_lines
|
|
# Validate the sum matches physical lines (should be exact when using sourceCount)
|
|
sum_categorized = int(code) + int(comment) + int(blank)
|
|
if int(physical) > 0 and sum_categorized != int(physical):
|
|
# Only warn if difference is significant (more than rounding error)
|
|
diff = abs(int(physical) - sum_categorized)
|
|
if diff > 2:
|
|
_LOG.warning(
|
|
f"Counting mismatch for {file_path}: "
|
|
f"physical={physical}, code={code}, comment={comment}, blank={blank}, "
|
|
f"sum={sum_categorized} (diff={diff})"
|
|
)
|
|
|
|
return result_dict
|
|
|
|
|
|
def analyze_file_counts(path: Path) -> Dict[str, Any]:
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"File non trovato: {path}")
|
|
|
|
# Read raw bytes and calculate normalized content hash for caching
|
|
# Normalize by removing UTF-8 BOM and converting CRLF->LF and lone CR->LF
|
|
try:
|
|
with open(path, "rb") as fh:
|
|
raw = fh.read()
|
|
except Exception:
|
|
raw = None
|
|
|
|
def _normalize_bytes(b: bytes) -> bytes:
|
|
if b.startswith(b"\xef\xbb\xbf"):
|
|
b = b[3:]
|
|
b = b.replace(b"\r\n", b"\n")
|
|
b = b.replace(b"\r", b"\n")
|
|
return b
|
|
|
|
if raw is not None:
|
|
norm = _normalize_bytes(raw)
|
|
content_hash = hashlib.md5(norm).hexdigest()
|
|
else:
|
|
content_hash = None
|
|
|
|
# Check cache first (using only normalized hash, not path)
|
|
if content_hash:
|
|
with _CACHE_LOCK:
|
|
if content_hash in _COUNTING_CACHE:
|
|
cached_result = _COUNTING_CACHE[content_hash].copy()
|
|
_LOG.info(
|
|
"CACHE HIT for %s (hash: %s) → code=%d, comment=%d, blank=%d",
|
|
path.name,
|
|
content_hash[:8],
|
|
cached_result.get("code_lines", 0),
|
|
cached_result.get("comment_lines", 0),
|
|
cached_result.get("blank_lines", 0),
|
|
)
|
|
return cached_result
|
|
|
|
result: Dict[str, Any] = {
|
|
"file": str(path),
|
|
"physical_lines": 0,
|
|
"code_lines": 0,
|
|
"comment_lines": 0,
|
|
"blank_lines": 0,
|
|
"language": "unknown",
|
|
# UCC extended metrics (initialized to 0)
|
|
"comment_whole": 0,
|
|
"comment_embedded": 0,
|
|
"compiler_directives": 0,
|
|
"data_declarations": 0,
|
|
"exec_instructions": 0,
|
|
"logical_sloc": 0,
|
|
"physical_sloc": 0,
|
|
}
|
|
|
|
if _HAS_PYGOUNT:
|
|
try:
|
|
# Use pygount Python API directly (more reliable than calling CLI via subprocess)
|
|
_LOG.info(
|
|
"CACHE MISS for %s (hash: %s) - Running pygount API...",
|
|
path.name,
|
|
content_hash[:8] if content_hash else "none",
|
|
)
|
|
# SourceAnalysis.from_file returns per-file counts
|
|
from pygount import (
|
|
analysis as _pg_analysis,
|
|
) # local import to avoid top-level dependency issues
|
|
|
|
sa = _pg_analysis.SourceAnalysis.from_file(
|
|
str(path), group=str(path.parent)
|
|
)
|
|
|
|
phys = int(sa.line_count or 0)
|
|
# prefer source_count (code + strings) when available
|
|
code_val = int(
|
|
getattr(sa, "source_count", None) or getattr(sa, "code_count", 0) or 0
|
|
)
|
|
comment_val = int(getattr(sa, "documentation_count", 0) or 0)
|
|
blank_val = int(getattr(sa, "empty_count", 0) or 0)
|
|
|
|
result.update(
|
|
{
|
|
"file": str(path),
|
|
"physical_lines": phys,
|
|
"code_lines": code_val,
|
|
"comment_lines": comment_val,
|
|
"blank_lines": blank_val,
|
|
"language": getattr(sa, "language", "unknown") or "unknown",
|
|
}
|
|
)
|
|
|
|
# Sanity check: if pygount reports zero physical lines but file
|
|
# actually contains bytes, treat this as an error and fall back
|
|
# to the simple reader. This guards against encoding/pygount
|
|
# failures that return bogus zeroed results.
|
|
try:
|
|
norm_len = len(norm) if 'norm' in locals() and norm is not None else 0
|
|
if norm_len > 0 and int(result.get("physical_lines", 0)) == 0:
|
|
raise RuntimeError("pygount produced zero lines for non-empty file")
|
|
except Exception:
|
|
# Force fallback path by raising to outer except
|
|
raise
|
|
|
|
# Add UCC extended metrics for C/C++ and Python files
|
|
language = result.get("language", "unknown").lower()
|
|
if language in ["c", "c++", "cpp"]:
|
|
try:
|
|
from .ucc_complete_counter import UCCCompleteCounter
|
|
ucc_counter = UCCCompleteCounter(language="C")
|
|
ucc_result = ucc_counter.analyze_file(path)
|
|
|
|
result.update({
|
|
"comment_whole": ucc_result.get("comment_whole", 0),
|
|
"comment_embedded": ucc_result.get("comment_embedded", 0),
|
|
"compiler_directives": ucc_result.get("compiler_directives", 0),
|
|
"data_declarations": ucc_result.get("data_declarations", 0),
|
|
"exec_instructions": ucc_result.get("exec_instructions", 0),
|
|
"logical_sloc": ucc_result.get("logical_sloc", 0),
|
|
"physical_sloc": ucc_result.get("physical_sloc", 0),
|
|
})
|
|
_LOG.info(
|
|
"UCC C/C++ extended metrics for %s: logical=%d, data=%d, exec=%d",
|
|
path.name,
|
|
ucc_result.get("logical_sloc", 0),
|
|
ucc_result.get("data_declarations", 0),
|
|
ucc_result.get("exec_instructions", 0),
|
|
)
|
|
except Exception as e:
|
|
_LOG.warning("Failed to get UCC C/C++ metrics for %s: %s", path.name, e)
|
|
elif language == "python":
|
|
try:
|
|
from .ucc_python_counter import UCCPythonCounter
|
|
ucc_counter = UCCPythonCounter()
|
|
ucc_result = ucc_counter.analyze_file(path)
|
|
|
|
result.update({
|
|
"comment_whole": ucc_result.get("comment_whole", 0),
|
|
"comment_embedded": ucc_result.get("comment_embedded", 0),
|
|
"compiler_directives": ucc_result.get("compiler_directives", 0),
|
|
"data_declarations": ucc_result.get("data_declarations", 0), # Always 0 for Python
|
|
"exec_instructions": ucc_result.get("exec_instructions", 0),
|
|
"logical_sloc": ucc_result.get("logical_sloc", 0),
|
|
"physical_sloc": ucc_result.get("physical_sloc", 0),
|
|
})
|
|
_LOG.info(
|
|
"UCC Python extended metrics for %s: logical=%d, exec=%d, directives=%d",
|
|
path.name,
|
|
ucc_result.get("logical_sloc", 0),
|
|
ucc_result.get("exec_instructions", 0),
|
|
ucc_result.get("compiler_directives", 0),
|
|
)
|
|
except Exception as e:
|
|
_LOG.warning("Failed to get UCC Python metrics for %s: %s", path.name, e)
|
|
elif language == "java":
|
|
try:
|
|
from .ucc_java_counter import UCCJavaCounter
|
|
ucc_counter = UCCJavaCounter()
|
|
ucc_result = ucc_counter.analyze_file(path)
|
|
|
|
result.update({
|
|
"comment_whole": ucc_result.get("comment_whole", 0),
|
|
"comment_embedded": ucc_result.get("comment_embedded", 0),
|
|
"compiler_directives": ucc_result.get("compiler_directives", 0),
|
|
"data_declarations": ucc_result.get("data_declarations", 0),
|
|
"exec_instructions": ucc_result.get("exec_instructions", 0),
|
|
"logical_sloc": ucc_result.get("logical_sloc", 0),
|
|
"physical_sloc": ucc_result.get("physical_sloc", 0),
|
|
})
|
|
_LOG.info(
|
|
"UCC Java extended metrics for %s: logical=%d, data=%d, exec=%d",
|
|
path.name,
|
|
ucc_result.get("logical_sloc", 0),
|
|
ucc_result.get("data_declarations", 0),
|
|
ucc_result.get("exec_instructions", 0),
|
|
)
|
|
except Exception as e:
|
|
_LOG.warning("Failed to get UCC Java metrics for %s: %s", path.name, e)
|
|
elif language in ["assembly", "asm"]:
|
|
try:
|
|
from .ucc_assembly_counter import UCCAssemblyCounter
|
|
ucc_counter = UCCAssemblyCounter()
|
|
ucc_result = ucc_counter.analyze_file(path)
|
|
|
|
result.update({
|
|
"comment_whole": ucc_result.get("comment_whole", 0),
|
|
"comment_embedded": ucc_result.get("comment_embedded", 0),
|
|
"compiler_directives": ucc_result.get("compiler_directives", 0),
|
|
"data_declarations": ucc_result.get("data_declarations", 0),
|
|
"exec_instructions": ucc_result.get("exec_instructions", 0),
|
|
"logical_sloc": ucc_result.get("logical_sloc", 0),
|
|
"physical_sloc": ucc_result.get("physical_sloc", 0),
|
|
})
|
|
_LOG.info(
|
|
"UCC Assembly extended metrics for %s: logical=%d, data=%d, exec=%d",
|
|
path.name,
|
|
ucc_result.get("logical_sloc", 0),
|
|
ucc_result.get("data_declarations", 0),
|
|
ucc_result.get("exec_instructions", 0),
|
|
)
|
|
except Exception as e:
|
|
_LOG.warning("Failed to get UCC Assembly metrics for %s: %s", path.name, e)
|
|
|
|
# Cache the result (using only normalized hash as key)
|
|
if content_hash:
|
|
with _CACHE_LOCK:
|
|
_COUNTING_CACHE[content_hash] = result.copy()
|
|
_LOG.info(
|
|
"PYGOUNT API RESULT for %s (hash: %s) → code=%d, comment=%d, blank=%d [CACHED]",
|
|
path.name,
|
|
content_hash[:8] if content_hash else "none",
|
|
result.get("code_lines", 0),
|
|
result.get("comment_lines", 0),
|
|
result.get("blank_lines", 0),
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
# Log exception, then fall back to simple counting
|
|
_LOG.warning(
|
|
"pygount failed for %s, using fallback counting. Error: %s",
|
|
path,
|
|
str(e),
|
|
)
|
|
_LOG.debug("pygount exception", exc_info=True)
|
|
# fall back to simple counting
|
|
pass
|
|
|
|
# Fallback: basic counting (NOTE: cannot distinguish code from comments)
|
|
with path.open("r", errors="ignore") as fh:
|
|
lines = fh.readlines()
|
|
physical = len(lines)
|
|
blanks = sum(1 for l in lines if l.strip() == "")
|
|
code_lines = physical - blanks
|
|
|
|
result.update(
|
|
{
|
|
"physical_lines": physical,
|
|
"code_lines": code_lines,
|
|
"comment_lines": 0,
|
|
"blank_lines": blanks,
|
|
"language": "unknown",
|
|
}
|
|
)
|
|
|
|
# Cache fallback result too (using only hash as key)
|
|
if content_hash:
|
|
with _CACHE_LOCK:
|
|
_COUNTING_CACHE[content_hash] = result.copy()
|
|
|
|
return result
|
|
|
|
|
|
def analyze_paths(paths: Iterable[Path]) -> List[Dict[str, Any]]:
|
|
results: List[Dict[str, Any]] = []
|
|
for p in paths:
|
|
path = Path(p)
|
|
try:
|
|
results.append(analyze_file_counts(path))
|
|
except Exception as e:
|
|
results.append({"file": str(path), "error": str(e)})
|
|
return results
|