SXXXXXXX_PyUCC/pyucc/core/countings_impl.py

248 lines
9.3 KiB
Python

"""Implementazione `countings` usando il CLI `pygount` (JSON) con fallback.
Questo modulo fornisce `analyze_file_counts` e `analyze_paths`.
"""
from pathlib import Path
from typing import Dict, Any, Iterable, List
import json
import subprocess
import logging
import threading
import hashlib
try:
import pygount # type: ignore
_HAS_PYGOUNT = True
except Exception:
_HAS_PYGOUNT = False
_LOG = logging.getLogger(__name__)
# Cache to store counting results by file content hash ONLY
# This ensures deterministic results for identical file contents regardless of file location
# Key: content_hash (MD5), Value: counting results dict
# Using only hash (not path) ensures same content always returns same result,
# even if file is in different directories (e.g., baseline snapshots vs current code)
# NOTE: pygount IS deterministic - same file content = same numeric results
_COUNTING_CACHE = {}
_CACHE_LOCK = threading.Lock()
def _map_pygount_json_item(item: Dict[str, Any]) -> Dict[str, Any]:
# Support multiple pygount JSON shapes and key names
# Priority order: pygount 3.x format first, then older formats
# CRITICAL: Use 'is not None' instead of 'or' to handle 0 values correctly
physical = (
item.get("lineCount") # pygount 3.x
if item.get("lineCount") is not None else
item.get("raw_total_lines")
if item.get("raw_total_lines") is not None else
item.get("n_lines")
if item.get("n_lines") is not None else
item.get("lines")
if item.get("lines") is not None else
item.get("raw_lines")
if item.get("raw_lines") is not None else
item.get("line_count")
if item.get("line_count") is not None else
0
)
# For code: use sourceCount (includes code + string lines)
# Note: In pygount 3.x, stringCount is NOT provided per-file, only in aggregates
# sourceCount = codeCount + stringCount (where strings are multi-line strings)
# Using sourceCount ensures: sourceCount + documentationCount + emptyCount = lineCount
code = (
item.get("sourceCount") # pygount 3.x (code + strings)
if item.get("sourceCount") is not None else
item.get("codeCount") # pygount 3.x (code only, excludes strings)
if item.get("codeCount") is not None else
item.get("code")
if item.get("code") is not None else
item.get("n_code")
if item.get("n_code") is not None else
item.get("n_code_lines")
if item.get("n_code_lines") is not None else
item.get("code_lines")
if item.get("code_lines") is not None else
0
)
comment = (
item.get("documentationCount") # pygount 3.x
if item.get("documentationCount") is not None else
item.get("comment")
if item.get("comment") is not None else
item.get("n_comment")
if item.get("n_comment") is not None else
item.get("n_comment_lines")
if item.get("n_comment_lines") is not None else
item.get("comment_lines")
if item.get("comment_lines") is not None else
0
)
blank = (
item.get("emptyCount") # pygount 3.x
if item.get("emptyCount") is not None else
item.get("blank")
if item.get("blank") is not None else
item.get("n_blank")
if item.get("n_blank") is not None else
item.get("blank_lines")
if item.get("blank_lines") is not None else
item.get("empty_count")
if item.get("empty_count") is not None else
0
)
language = item.get("language") or item.get("languageName") or item.get("lang") or "unknown"
file_path = (
item.get("filename")
or item.get("file")
or item.get("path")
or item.get("name")
or ""
)
result_dict = {
"file": file_path,
"physical_lines": int(physical),
"code_lines": int(code),
"comment_lines": int(comment),
"blank_lines": int(blank),
"language": language,
}
# Note: pygount 3.x counts separately: sourceCount (code+strings), documentation, empty
# When using sourceCount: sourceCount + documentationCount + emptyCount = lineCount
# We map: sourceCount→code_lines, documentationCount→comment_lines, emptyCount→blank_lines
# Validate the sum matches physical lines (should be exact when using sourceCount)
sum_categorized = int(code) + int(comment) + int(blank)
if int(physical) > 0 and sum_categorized != int(physical):
# Only warn if difference is significant (more than rounding error)
diff = abs(int(physical) - sum_categorized)
if diff > 2:
_LOG.warning(
f"Counting mismatch for {file_path}: "
f"physical={physical}, code={code}, comment={comment}, blank={blank}, "
f"sum={sum_categorized} (diff={diff})"
)
return result_dict
def analyze_file_counts(path: Path) -> Dict[str, Any]:
if not path.exists():
raise FileNotFoundError(f"File non trovato: {path}")
# Read raw bytes and calculate normalized content hash for caching
# Normalize by removing UTF-8 BOM and converting CRLF->LF and lone CR->LF
try:
with open(path, "rb") as fh:
raw = fh.read()
except Exception:
raw = None
def _normalize_bytes(b: bytes) -> bytes:
if b.startswith(b"\xef\xbb\xbf"):
b = b[3:]
b = b.replace(b"\r\n", b"\n")
b = b.replace(b"\r", b"\n")
return b
if raw is not None:
norm = _normalize_bytes(raw)
content_hash = hashlib.md5(norm).hexdigest()
else:
content_hash = None
# Check cache first (using only normalized hash, not path)
if content_hash:
with _CACHE_LOCK:
if content_hash in _COUNTING_CACHE:
cached_result = _COUNTING_CACHE[content_hash].copy()
_LOG.info("CACHE HIT for %s (hash: %s) → code=%d, comment=%d, blank=%d",
path.name, content_hash[:8],
cached_result.get('code_lines', 0),
cached_result.get('comment_lines', 0),
cached_result.get('blank_lines', 0))
return cached_result
result: Dict[str, Any] = {
"file": str(path),
"physical_lines": 0,
"code_lines": 0,
"comment_lines": 0,
"blank_lines": 0,
"language": "unknown",
}
if _HAS_PYGOUNT:
try:
# Use pygount Python API directly (more reliable than calling CLI via subprocess)
_LOG.info("CACHE MISS for %s (hash: %s) - Running pygount API...", path.name, content_hash[:8] if content_hash else 'none')
# SourceAnalysis.from_file returns per-file counts
from pygount import analysis as _pg_analysis # local import to avoid top-level dependency issues
sa = _pg_analysis.SourceAnalysis.from_file(str(path), group=str(path.parent))
phys = int(sa.line_count or 0)
# prefer source_count (code + strings) when available
code_val = int(getattr(sa, 'source_count', None) or getattr(sa, 'code_count', 0) or 0)
comment_val = int(getattr(sa, 'documentation_count', 0) or 0)
blank_val = int(getattr(sa, 'empty_count', 0) or 0)
result.update({
'file': str(path),
'physical_lines': phys,
'code_lines': code_val,
'comment_lines': comment_val,
'blank_lines': blank_val,
'language': getattr(sa, 'language', 'unknown') or 'unknown',
})
# Cache the result (using only normalized hash as key)
if content_hash:
with _CACHE_LOCK:
_COUNTING_CACHE[content_hash] = result.copy()
_LOG.info("PYGOUNT API RESULT for %s (hash: %s) → code=%d, comment=%d, blank=%d [CACHED]",
path.name, content_hash[:8] if content_hash else 'none',
result.get('code_lines', 0), result.get('comment_lines', 0), result.get('blank_lines', 0))
return result
except Exception as e:
# Log exception, then fall back to simple counting
_LOG.warning("pygount failed for %s, using fallback counting. Error: %s", path, str(e))
_LOG.debug("pygount exception", exc_info=True)
# fall back to simple counting
pass
# Fallback: basic counting (NOTE: cannot distinguish code from comments)
with path.open("r", errors="ignore") as fh:
lines = fh.readlines()
physical = len(lines)
blanks = sum(1 for l in lines if l.strip() == "")
code_lines = physical - blanks
result.update({
"physical_lines": physical,
"code_lines": code_lines,
"comment_lines": 0,
"blank_lines": blanks,
"language": "unknown",
})
# Cache fallback result too (using only hash as key)
if content_hash:
with _CACHE_LOCK:
_COUNTING_CACHE[content_hash] = result.copy()
return result
def analyze_paths(paths: Iterable[Path]) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for p in paths:
path = Path(p)
try:
results.append(analyze_file_counts(path))
except Exception as e:
results.append({"file": str(path), "error": str(e)})
return results