"""Implementazione `countings` usando il CLI `pygount` (JSON) con fallback. Questo modulo fornisce `analyze_file_counts` e `analyze_paths`. """ from pathlib import Path from typing import Dict, Any, Iterable, List import json import subprocess import logging import threading import hashlib try: import pygount # type: ignore _HAS_PYGOUNT = True except Exception: _HAS_PYGOUNT = False _LOG = logging.getLogger(__name__) # Global lock to serialize pygount subprocess calls to avoid race conditions # pygount has non-deterministic behavior when multiple instances run in parallel _PYGOUNT_LOCK = threading.Lock() # Cache to store counting results by file content hash # This ensures deterministic results for identical file contents # Key: (file_path, content_hash), Value: counting results _COUNTING_CACHE = {} _CACHE_LOCK = threading.Lock() def _map_pygount_json_item(item: Dict[str, Any]) -> Dict[str, Any]: # Support multiple pygount JSON shapes and key names # Priority order: pygount 3.x format first, then older formats # CRITICAL: Use 'is not None' instead of 'or' to handle 0 values correctly physical = ( item.get("lineCount") # pygount 3.x if item.get("lineCount") is not None else item.get("raw_total_lines") if item.get("raw_total_lines") is not None else item.get("n_lines") if item.get("n_lines") is not None else item.get("lines") if item.get("lines") is not None else item.get("raw_lines") if item.get("raw_lines") is not None else item.get("line_count") if item.get("line_count") is not None else 0 ) # For code: use sourceCount (includes code + string lines) # Note: In pygount 3.x, stringCount is NOT provided per-file, only in aggregates # sourceCount = codeCount + stringCount (where strings are multi-line strings) # Using sourceCount ensures: sourceCount + documentationCount + emptyCount = lineCount code = ( item.get("sourceCount") # pygount 3.x (code + strings) if item.get("sourceCount") is not None else item.get("codeCount") # pygount 3.x (code only, excludes strings) if item.get("codeCount") is not None else item.get("code") if item.get("code") is not None else item.get("n_code") if item.get("n_code") is not None else item.get("n_code_lines") if item.get("n_code_lines") is not None else item.get("code_lines") if item.get("code_lines") is not None else 0 ) comment = ( item.get("documentationCount") # pygount 3.x if item.get("documentationCount") is not None else item.get("comment") if item.get("comment") is not None else item.get("n_comment") if item.get("n_comment") is not None else item.get("n_comment_lines") if item.get("n_comment_lines") is not None else item.get("comment_lines") if item.get("comment_lines") is not None else 0 ) blank = ( item.get("emptyCount") # pygount 3.x if item.get("emptyCount") is not None else item.get("blank") if item.get("blank") is not None else item.get("n_blank") if item.get("n_blank") is not None else item.get("blank_lines") if item.get("blank_lines") is not None else item.get("empty_count") if item.get("empty_count") is not None else 0 ) language = item.get("language") or item.get("languageName") or item.get("lang") or "unknown" file_path = ( item.get("filename") or item.get("file") or item.get("path") or item.get("name") or "" ) result_dict = { "file": file_path, "physical_lines": int(physical), "code_lines": int(code), "comment_lines": int(comment), "blank_lines": int(blank), "language": language, } # Note: pygount 3.x counts separately: sourceCount (code+strings), documentation, empty # When using sourceCount: sourceCount + documentationCount + emptyCount = lineCount # We map: sourceCount→code_lines, documentationCount→comment_lines, emptyCount→blank_lines # Validate the sum matches physical lines (should be exact when using sourceCount) sum_categorized = int(code) + int(comment) + int(blank) if int(physical) > 0 and sum_categorized != int(physical): # Only warn if difference is significant (more than rounding error) diff = abs(int(physical) - sum_categorized) if diff > 2: _LOG.warning( f"Counting mismatch for {file_path}: " f"physical={physical}, code={code}, comment={comment}, blank={blank}, " f"sum={sum_categorized} (diff={diff})" ) return result_dict def analyze_file_counts(path: Path) -> Dict[str, Any]: if not path.exists(): raise FileNotFoundError(f"File non trovato: {path}") # Calculate file content hash for caching try: with open(path, "rb") as f: content_hash = hashlib.md5(f.read()).hexdigest() except Exception: content_hash = None # Check cache first cache_key = (str(path), content_hash) if content_hash: with _CACHE_LOCK: if cache_key in _COUNTING_CACHE: _LOG.debug("Using cached counting result for %s", path) return _COUNTING_CACHE[cache_key].copy() result: Dict[str, Any] = { "file": str(path), "physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "language": "unknown", } if _HAS_PYGOUNT: try: # Serialize pygount calls to avoid race conditions and non-deterministic behavior # pygount's language detection can be non-deterministic when run in parallel with _PYGOUNT_LOCK: proc = subprocess.run(["pygount", "--format", "json", str(path)], check=True, capture_output=True, text=True) parsed = json.loads(proc.stdout) # Support expected JSON shapes from pygount: list or dict with 'files' if isinstance(parsed, list) and parsed: item = parsed[0] result.update(_map_pygount_json_item(item)) # Cache the result if content_hash: with _CACHE_LOCK: _COUNTING_CACHE[cache_key] = result.copy() return result if isinstance(parsed, dict): files = parsed.get("files") if files and isinstance(files, list) and files: item = files[0] result.update(_map_pygount_json_item(item)) # Cache the result if content_hash: with _CACHE_LOCK: _COUNTING_CACHE[cache_key] = result.copy() return result # If pygount ran but returned no usable data, log stdout/stderr at DEBUG _LOG.debug("pygount returned empty or unexpected JSON for %s", path) _LOG.debug("pygount stdout:\n%s", proc.stdout) _LOG.debug("pygount stderr:\n%s", proc.stderr) # force fallback to simple counting raise RuntimeError("pygount returned no data") except Exception as e: # Log exception and stderr if available, then fall back to simple counting try: # If proc exists, include its stderr for diagnostics if 'proc' in locals(): _LOG.warning("pygount failed for %s, using fallback counting. Error: %s; stderr: %s", path, str(e), getattr(proc, 'stderr', None)) else: _LOG.warning("pygount failed for %s, using fallback counting. Error: %s", path, str(e)) except Exception: # ensure we don't break on logging _LOG.warning("pygount failed for %s, using fallback counting", path) # fall back to simple counting pass # Fallback: basic counting (NOTE: cannot distinguish code from comments) with path.open("r", errors="ignore") as fh: lines = fh.readlines() physical = len(lines) blanks = sum(1 for l in lines if l.strip() == "") code_lines = physical - blanks result.update({ "physical_lines": physical, "code_lines": code_lines, "comment_lines": 0, "blank_lines": blanks, "language": "unknown", }) # Cache fallback result too if content_hash: with _CACHE_LOCK: _COUNTING_CACHE[cache_key] = result.copy() return result def analyze_paths(paths: Iterable[Path]) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for p in paths: path = Path(p) try: results.append(analyze_file_counts(path)) except Exception as e: results.append({"file": str(path), "error": str(e)}) return results