"""Implementazione `countings` usando il CLI `pygount` (JSON) con fallback. Questo modulo fornisce `analyze_file_counts` e `analyze_paths`. """ from pathlib import Path from typing import Dict, Any, Iterable, List import json import subprocess import logging import threading import hashlib try: import pygount # type: ignore _HAS_PYGOUNT = True except Exception: _HAS_PYGOUNT = False _LOG = logging.getLogger(__name__) # Cache to store counting results by file content hash ONLY # This ensures deterministic results for identical file contents regardless of file location # Key: content_hash (MD5), Value: counting results dict # Using only hash (not path) ensures same content always returns same result, # even if file is in different directories (e.g., baseline snapshots vs current code) # NOTE: pygount IS deterministic - same file content = same numeric results _COUNTING_CACHE = {} _CACHE_LOCK = threading.Lock() def _map_pygount_json_item(item: Dict[str, Any]) -> Dict[str, Any]: # Support multiple pygount JSON shapes and key names # Priority order: pygount 3.x format first, then older formats # CRITICAL: Use 'is not None' instead of 'or' to handle 0 values correctly physical = ( item.get("lineCount") # pygount 3.x if item.get("lineCount") is not None else ( item.get("raw_total_lines") if item.get("raw_total_lines") is not None else ( item.get("n_lines") if item.get("n_lines") is not None else ( item.get("lines") if item.get("lines") is not None else ( item.get("raw_lines") if item.get("raw_lines") is not None else ( item.get("line_count") if item.get("line_count") is not None else 0 ) ) ) ) ) ) # For code: use sourceCount (includes code + string lines) # Note: In pygount 3.x, stringCount is NOT provided per-file, only in aggregates # sourceCount = codeCount + stringCount (where strings are multi-line strings) # Using sourceCount ensures: sourceCount + documentationCount + emptyCount = lineCount code = ( item.get("sourceCount") # pygount 3.x (code + strings) if item.get("sourceCount") is not None else ( item.get("codeCount") # pygount 3.x (code only, excludes strings) if item.get("codeCount") is not None else ( item.get("code") if item.get("code") is not None else ( item.get("n_code") if item.get("n_code") is not None else ( item.get("n_code_lines") if item.get("n_code_lines") is not None else ( item.get("code_lines") if item.get("code_lines") is not None else 0 ) ) ) ) ) ) comment = ( item.get("documentationCount") # pygount 3.x if item.get("documentationCount") is not None else ( item.get("comment") if item.get("comment") is not None else ( item.get("n_comment") if item.get("n_comment") is not None else ( item.get("n_comment_lines") if item.get("n_comment_lines") is not None else ( item.get("comment_lines") if item.get("comment_lines") is not None else 0 ) ) ) ) ) blank = ( item.get("emptyCount") # pygount 3.x if item.get("emptyCount") is not None else ( item.get("blank") if item.get("blank") is not None else ( item.get("n_blank") if item.get("n_blank") is not None else ( item.get("blank_lines") if item.get("blank_lines") is not None else ( item.get("empty_count") if item.get("empty_count") is not None else 0 ) ) ) ) ) language = ( item.get("language") or item.get("languageName") or item.get("lang") or "unknown" ) file_path = ( item.get("filename") or item.get("file") or item.get("path") or item.get("name") or "" ) result_dict = { "file": file_path, "physical_lines": int(physical), "code_lines": int(code), "comment_lines": int(comment), "blank_lines": int(blank), "language": language, } # Note: pygount 3.x counts separately: sourceCount (code+strings), documentation, empty # When using sourceCount: sourceCount + documentationCount + emptyCount = lineCount # We map: sourceCount→code_lines, documentationCount→comment_lines, emptyCount→blank_lines # Validate the sum matches physical lines (should be exact when using sourceCount) sum_categorized = int(code) + int(comment) + int(blank) if int(physical) > 0 and sum_categorized != int(physical): # Only warn if difference is significant (more than rounding error) diff = abs(int(physical) - sum_categorized) if diff > 2: _LOG.warning( f"Counting mismatch for {file_path}: " f"physical={physical}, code={code}, comment={comment}, blank={blank}, " f"sum={sum_categorized} (diff={diff})" ) return result_dict def analyze_file_counts(path: Path) -> Dict[str, Any]: if not path.exists(): raise FileNotFoundError(f"File non trovato: {path}") # Read raw bytes and calculate normalized content hash for caching # Normalize by removing UTF-8 BOM and converting CRLF->LF and lone CR->LF try: with open(path, "rb") as fh: raw = fh.read() except Exception: raw = None def _normalize_bytes(b: bytes) -> bytes: if b.startswith(b"\xef\xbb\xbf"): b = b[3:] b = b.replace(b"\r\n", b"\n") b = b.replace(b"\r", b"\n") return b if raw is not None: norm = _normalize_bytes(raw) content_hash = hashlib.md5(norm).hexdigest() else: content_hash = None # Check cache first (using only normalized hash, not path) if content_hash: with _CACHE_LOCK: if content_hash in _COUNTING_CACHE: cached_result = _COUNTING_CACHE[content_hash].copy() _LOG.info( "CACHE HIT for %s (hash: %s) → code=%d, comment=%d, blank=%d", path.name, content_hash[:8], cached_result.get("code_lines", 0), cached_result.get("comment_lines", 0), cached_result.get("blank_lines", 0), ) return cached_result result: Dict[str, Any] = { "file": str(path), "physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "language": "unknown", # UCC extended metrics (initialized to 0) "comment_whole": 0, "comment_embedded": 0, "compiler_directives": 0, "data_declarations": 0, "exec_instructions": 0, "logical_sloc": 0, "physical_sloc": 0, } if _HAS_PYGOUNT: try: # Use pygount Python API directly (more reliable than calling CLI via subprocess) _LOG.info( "CACHE MISS for %s (hash: %s) - Running pygount API...", path.name, content_hash[:8] if content_hash else "none", ) # SourceAnalysis.from_file returns per-file counts from pygount import ( analysis as _pg_analysis, ) # local import to avoid top-level dependency issues sa = _pg_analysis.SourceAnalysis.from_file( str(path), group=str(path.parent) ) phys = int(sa.line_count or 0) # prefer source_count (code + strings) when available code_val = int( getattr(sa, "source_count", None) or getattr(sa, "code_count", 0) or 0 ) comment_val = int(getattr(sa, "documentation_count", 0) or 0) blank_val = int(getattr(sa, "empty_count", 0) or 0) result.update( { "file": str(path), "physical_lines": phys, "code_lines": code_val, "comment_lines": comment_val, "blank_lines": blank_val, "language": getattr(sa, "language", "unknown") or "unknown", } ) # Sanity check: if pygount reports zero physical lines but file # actually contains bytes, treat this as an error and fall back # to the simple reader. This guards against encoding/pygount # failures that return bogus zeroed results. try: norm_len = len(norm) if "norm" in locals() and norm is not None else 0 if norm_len > 0 and int(result.get("physical_lines", 0)) == 0: raise RuntimeError("pygount produced zero lines for non-empty file") except Exception: # Force fallback path by raising to outer except raise # Add UCC extended metrics for C/C++ and Python files language = result.get("language", "unknown").lower() if language in ["c", "c++", "cpp"]: try: from .ucc_complete_counter import UCCCompleteCounter ucc_counter = UCCCompleteCounter(language="C") ucc_result = ucc_counter.analyze_file(path) result.update( { "comment_whole": ucc_result.get("comment_whole", 0), "comment_embedded": ucc_result.get("comment_embedded", 0), "compiler_directives": ucc_result.get( "compiler_directives", 0 ), "data_declarations": ucc_result.get("data_declarations", 0), "exec_instructions": ucc_result.get("exec_instructions", 0), "logical_sloc": ucc_result.get("logical_sloc", 0), "physical_sloc": ucc_result.get("physical_sloc", 0), } ) _LOG.info( "UCC C/C++ extended metrics for %s: logical=%d, data=%d, exec=%d", path.name, ucc_result.get("logical_sloc", 0), ucc_result.get("data_declarations", 0), ucc_result.get("exec_instructions", 0), ) except Exception as e: _LOG.warning( "Failed to get UCC C/C++ metrics for %s: %s", path.name, e ) elif language == "python": try: from .ucc_python_counter import UCCPythonCounter ucc_counter = UCCPythonCounter() ucc_result = ucc_counter.analyze_file(path) result.update( { "comment_whole": ucc_result.get("comment_whole", 0), "comment_embedded": ucc_result.get("comment_embedded", 0), "compiler_directives": ucc_result.get( "compiler_directives", 0 ), "data_declarations": ucc_result.get( "data_declarations", 0 ), # Always 0 for Python "exec_instructions": ucc_result.get("exec_instructions", 0), "logical_sloc": ucc_result.get("logical_sloc", 0), "physical_sloc": ucc_result.get("physical_sloc", 0), } ) _LOG.info( "UCC Python extended metrics for %s: logical=%d, exec=%d, directives=%d", path.name, ucc_result.get("logical_sloc", 0), ucc_result.get("exec_instructions", 0), ucc_result.get("compiler_directives", 0), ) except Exception as e: _LOG.warning( "Failed to get UCC Python metrics for %s: %s", path.name, e ) elif language == "java": try: from .ucc_java_counter import UCCJavaCounter ucc_counter = UCCJavaCounter() ucc_result = ucc_counter.analyze_file(path) result.update( { "comment_whole": ucc_result.get("comment_whole", 0), "comment_embedded": ucc_result.get("comment_embedded", 0), "compiler_directives": ucc_result.get( "compiler_directives", 0 ), "data_declarations": ucc_result.get("data_declarations", 0), "exec_instructions": ucc_result.get("exec_instructions", 0), "logical_sloc": ucc_result.get("logical_sloc", 0), "physical_sloc": ucc_result.get("physical_sloc", 0), } ) _LOG.info( "UCC Java extended metrics for %s: logical=%d, data=%d, exec=%d", path.name, ucc_result.get("logical_sloc", 0), ucc_result.get("data_declarations", 0), ucc_result.get("exec_instructions", 0), ) except Exception as e: _LOG.warning( "Failed to get UCC Java metrics for %s: %s", path.name, e ) elif language in ["assembly", "asm"]: try: from .ucc_assembly_counter import UCCAssemblyCounter ucc_counter = UCCAssemblyCounter() ucc_result = ucc_counter.analyze_file(path) result.update( { "comment_whole": ucc_result.get("comment_whole", 0), "comment_embedded": ucc_result.get("comment_embedded", 0), "compiler_directives": ucc_result.get( "compiler_directives", 0 ), "data_declarations": ucc_result.get("data_declarations", 0), "exec_instructions": ucc_result.get("exec_instructions", 0), "logical_sloc": ucc_result.get("logical_sloc", 0), "physical_sloc": ucc_result.get("physical_sloc", 0), } ) _LOG.info( "UCC Assembly extended metrics for %s: logical=%d, data=%d, exec=%d", path.name, ucc_result.get("logical_sloc", 0), ucc_result.get("data_declarations", 0), ucc_result.get("exec_instructions", 0), ) except Exception as e: _LOG.warning( "Failed to get UCC Assembly metrics for %s: %s", path.name, e ) # Cache the result (using only normalized hash as key) if content_hash: with _CACHE_LOCK: _COUNTING_CACHE[content_hash] = result.copy() _LOG.info( "PYGOUNT API RESULT for %s (hash: %s) → code=%d, comment=%d, blank=%d [CACHED]", path.name, content_hash[:8] if content_hash else "none", result.get("code_lines", 0), result.get("comment_lines", 0), result.get("blank_lines", 0), ) return result except Exception as e: # Log exception, then fall back to simple counting _LOG.warning( "pygount failed for %s, using fallback counting. Error: %s", path, str(e), ) _LOG.debug("pygount exception", exc_info=True) # fall back to simple counting pass # Fallback: basic counting (NOTE: cannot distinguish code from comments) with path.open("r", errors="ignore") as fh: lines = fh.readlines() physical = len(lines) blanks = sum(1 for l in lines if l.strip() == "") code_lines = physical - blanks result.update( { "physical_lines": physical, "code_lines": code_lines, "comment_lines": 0, "blank_lines": blanks, "language": "unknown", } ) # Cache fallback result too (using only hash as key) if content_hash: with _CACHE_LOCK: _COUNTING_CACHE[content_hash] = result.copy() return result def analyze_paths(paths: Iterable[Path]) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for p in paths: path = Path(p) try: results.append(analyze_file_counts(path)) except Exception as e: results.append({"file": str(path), "error": str(e)}) return results