""" Baseline manager and differ prototype. - create baseline from directory (snapshot by default) - load baseline metadata - diff baseline vs current directory - output results as dict / JSON-serializable This is a minimal, self-contained implementation inspired by UCC's DiffTool. """ from __future__ import annotations import hashlib import json import os import shutil import stat import subprocess import tempfile import time import fnmatch from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, asdict from pathlib import Path from typing import Dict, List, Optional, Tuple import difflib BASELINE_ROOT_DIRNAME = ".pyucc_baselines" def _sha1_of_file(path: Path, chunk_size: int = 8192) -> str: h = hashlib.sha1() with path.open("rb") as f: for chunk in iter(lambda: f.read(chunk_size), b""): h.update(chunk) return h.hexdigest() @dataclass class FileMeta: path: str # relative path size: int mtime: float sha1: Optional[str] = None countings: Optional[Dict] = None metrics: Optional[Dict] = None @dataclass class BaselineMetadata: baseline_id: str created_at: float source: str # 'local' or 'git' origin: Optional[str] project_root: str files: List[FileMeta] profile: Optional[str] = None class BaselineManager: def __init__(self, workspace_root: str, baselines_root: Optional[str] = None): """Manage baselines storage. Args: workspace_root: path to the project/workspace (kept for metadata usage). baselines_root: optional absolute or relative path where baselines are stored. If omitted, the environment variable `PYUCC_BASELINE_DIR` is consulted; if that's not set, defaults to `./baseline` in the current working dir. """ self.workspace_root = os.path.abspath(workspace_root) if baselines_root: self.baselines_root = os.path.abspath(baselines_root) else: # priority: env var, app settings, fallback to ./baseline env = os.getenv("PYUCC_BASELINE_DIR") if env: self.baselines_root = os.path.abspath(env) else: # try app settings if available try: from ..config import settings as app_settings sdir = app_settings.get_baseline_dir() except Exception: sdir = None if sdir: self.baselines_root = os.path.abspath(sdir) else: self.baselines_root = os.path.join(os.getcwd(), "baseline") os.makedirs(self.baselines_root, exist_ok=True) def _baseline_dir(self, baseline_id: str) -> str: return os.path.join(self.baselines_root, baseline_id) def get_baseline_files_dir(self, baseline_id: str) -> str: """Get the directory containing the baseline snapshot files.""" return os.path.join(self._baseline_dir(baseline_id), "files") def list_baselines(self) -> List[str]: return [ d for d in os.listdir(self.baselines_root) if os.path.isdir(os.path.join(self.baselines_root, d)) ] def get_metadata_path(self, baseline_id: str) -> str: return os.path.join(self._baseline_dir(baseline_id), "metadata.json") def create_baseline_from_dir( self, dir_path: str, baseline_id: Optional[str] = None, snapshot: bool = True, compute_sha1: bool = True, ignore_patterns: Optional[List[str]] = None, profile_name: Optional[str] = None, max_keep: int = 5, file_list: Optional[List[str]] = None, ) -> str: dir_path = os.path.abspath(dir_path) if baseline_id is None: ts = time.strftime("%Y%m%dT%H%M%S") # include profile name in baseline id when available if profile_name: safe_profile = profile_name.replace(" ", "_") baseline_id = f"{safe_profile}__{ts}_local" else: baseline_id = f"{ts}_local" dest = self._baseline_dir(baseline_id) if os.path.exists(dest): raise FileExistsError(dest) os.makedirs(dest, exist_ok=False) files_meta: List[FileMeta] = [] # Normalize ignore patterns using scanner helper so patterns like '.bak' are treated as '*.bak' try: from .scanner import normalize_ignore_patterns, find_source_files ignore_patterns = normalize_ignore_patterns(ignore_patterns) or [] except Exception: # fallback: ensure ignore_patterns is list-like find_source_files = None ignore_patterns = ignore_patterns or [] # If caller provided explicit file_list, use it. Otherwise delegate to scanner.find_source_files if file_list is not None: # build FileMeta entries from provided list (paths relative to dir_path or absolute) for f in file_list: try: p = Path(f) if not p.is_absolute(): p = Path(dir_path) / p if not p.is_file(): continue rel_unix = os.path.relpath(str(p), dir_path).replace("\\", "/") st = p.stat() sha1 = None if compute_sha1: try: sha1 = _sha1_of_file(p) except Exception: sha1 = None files_meta.append( FileMeta( path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1 ) ) except Exception: continue else: # Determine allowed extensions from profile (if provided) to pass to scanner allowed_exts = None try: from ..config import profiles as profiles_cfg from ..config.languages import LANGUAGE_EXTENSIONS except Exception: profiles_cfg = None LANGUAGE_EXTENSIONS = {} if profile_name and profiles_cfg: pr = profiles_cfg.find_profile(profile_name) if pr: exts = [] for ln in pr.get("languages", []) or []: if ln in LANGUAGE_EXTENSIONS: exts.extend(LANGUAGE_EXTENSIONS[ln]) if exts: allowed_exts = list(set(exts)) # If scanner available, use it; otherwise fallback to os.walk if find_source_files: try: src_files = find_source_files( Path(dir_path), allowed_extensions=allowed_exts, ignore_patterns=ignore_patterns, ) except Exception: src_files = [] for p in src_files: try: rel_unix = os.path.relpath(str(p), dir_path).replace("\\", "/") st = p.stat() sha1 = None if compute_sha1: try: sha1 = _sha1_of_file(p) except Exception: sha1 = None files_meta.append( FileMeta( path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1, ) ) except Exception: continue else: for root, dirs, files in os.walk(dir_path): for fn in files: fpath = os.path.join(root, fn) # skip baseline storage area if under workspace_root if ( os.path.commonpath([self.baselines_root, fpath]) == self.baselines_root ): continue rel = os.path.relpath(fpath, dir_path) # check ignore patterns against relative path (unix-style) rel_unix = rel.replace("\\", "/") ignored = False for pat in ignore_patterns: if not pat: continue if fnmatch.fnmatch( rel_unix.lower(), pat ) or fnmatch.fnmatch(fn.lower(), pat): ignored = True break if ignored: continue try: st = os.stat(fpath) except OSError: continue sha1 = None if compute_sha1: # also compute for 0-byte files try: sha1 = _sha1_of_file(Path(fpath)) except Exception: sha1 = None files_meta.append( FileMeta( path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1, ) ) # Run per-file analyzers (countings + metrics) and attach results to each FileMeta try: from ..core.countings_impl import ( analyze_file_counts as _analyze_file_counts, ) from ..core.metrics import analyze_file_metrics as _analyze_metrics for fm in files_meta: abs_path = os.path.join(dir_path, fm.path) # per-file counts try: c = _analyze_file_counts(Path(abs_path)) fm.countings = { "physical_lines": int(c.get("physical_lines", 0)), "code_lines": int(c.get("code_lines", 0)), "comment_lines": int(c.get("comment_lines", 0)), "blank_lines": int(c.get("blank_lines", 0)), # UCC extended metrics "comment_whole": int(c.get("comment_whole", 0)), "comment_embedded": int(c.get("comment_embedded", 0)), "compiler_directives": int(c.get("compiler_directives", 0)), "data_declarations": int(c.get("data_declarations", 0)), "exec_instructions": int(c.get("exec_instructions", 0)), "logical_sloc": int(c.get("logical_sloc", 0)), "physical_sloc": int(c.get("physical_sloc", 0)), } except Exception: fm.countings = None # per-file metrics try: m = _analyze_metrics(abs_path) fm.metrics = { "avg_cc": float(m.get("avg_cc", 0.0)), "max_cc": int(m.get("max_cc", 0)), "func_count": int(m.get("func_count", 0)), "mi": float(m.get("mi", 0.0)), } except Exception: fm.metrics = None except Exception: pass # If profile provides languages, determine allowed extensions and # filter `files_meta` BEFORE creating snapshot so unwanted files # (e.g. backups, .txt dumps) are not copied into the baseline. try: from ..config import profiles as profiles_cfg from ..config.languages import LANGUAGE_EXTENSIONS except Exception: profiles_cfg = None LANGUAGE_EXTENSIONS = {} allowed_exts = None if profile_name and profiles_cfg: pr = profiles_cfg.find_profile(profile_name) if pr: exts = [] for ln in pr.get("languages", []) or []: if ln in LANGUAGE_EXTENSIONS: exts.extend(LANGUAGE_EXTENSIONS[ln]) if exts: allowed_exts = list(set(exts)) if allowed_exts: allowed_set = set(e.lower() for e in allowed_exts) from pathlib import Path as _Path filtered = [] for fm in files_meta: try: suf = _Path(fm.path).suffix.lower() except Exception: suf = "" if suf in allowed_set: filtered.append(fm) files_meta = filtered metadata = BaselineMetadata( baseline_id=baseline_id, created_at=time.time(), source="local", origin=None, project_root=dir_path, files=files_meta, profile=profile_name, ) # Save metadata meta_path = self.get_metadata_path(baseline_id) with open(meta_path, "w", encoding="utf-8") as f: json.dump(self._metadata_to_dict(metadata), f, indent=2) # Optionally store a snapshot if snapshot: snapshot_dir = os.path.join(dest, "files") os.makedirs(snapshot_dir, exist_ok=True) # Copy only the files that were included in the baseline (respecting ignore patterns) for fm in files_meta: src_file = os.path.join(dir_path, fm.path) dst_file = os.path.join(snapshot_dir, fm.path) # Create parent directories if needed dst_parent = os.path.dirname(dst_file) if dst_parent: os.makedirs(dst_parent, exist_ok=True) try: shutil.copy2(src_file, dst_file) # copy2 preserves metadata except Exception: pass # skip files that cannot be copied # Optionally create zip archive (controlled by settings) # Check if user wants zip archives (for space savings at cost of speed) try: from ..config import settings as app_settings if app_settings.get_zip_baselines(): zip_path = os.path.join(dest, "files.zip") shutil.make_archive( base_name=zip_path[:-4], format="zip", root_dir=snapshot_dir ) except Exception: pass # if settings not available or zip fails, continue without zip # Generate UCC-style reports (countings, metrics, duplicates) inside baseline folder try: from ..utils.ucc_report_generator import UCCReportGenerator from ..config import profiles as profiles_cfg from ..config.languages import LANGUAGE_EXTENSIONS from ..config import settings as app_settings from ..core import duplicates as dupmod # Prepare counting and metrics lists for report generator counting_results = [] metrics_results = [] # Attempt to ensure per-file `functions` details are present in metrics results. # In some environments `lizard` may not have been available at the time # the initial per-file analysis ran; re-run metrics analysis on the # snapshot file when possible to obtain function-level details. try: from ..core.metrics import analyze_file_metrics as _analyze_metrics except Exception: _analyze_metrics = None for fm in files_meta: abs_snapshot = os.path.join(snapshot_dir, fm.path) if fm.countings: row = dict(fm.countings) row["file"] = abs_snapshot # language may be derived from extension if available row["language"] = "" counting_results.append(row) # Start from existing metrics (if any) mdata = fm.metrics if fm.metrics else None # If we don't have function-level data and analyzer available, re-run if _analyze_metrics and (not mdata or not mdata.get("functions")): try: # run analyzer on the snapshot copy to ensure consistent paths rean = _analyze_metrics(abs_snapshot) if rean: mdata = rean except Exception: pass if mdata: mrow = dict(mdata) mrow["file"] = abs_snapshot metrics_results.append(mrow) # Determine allowed extensions from profile (if provided) allowed_exts = None if profile_name: pr = profiles_cfg.find_profile(profile_name) if pr: exts = [] for ln in pr.get("languages", []) or []: if ln in LANGUAGE_EXTENSIONS: exts.extend(LANGUAGE_EXTENSIONS[ln]) if exts: allowed_exts = list(set(exts)) # If allowed_exts is set, prune files_meta to keep only matching extensions. # This prevents copying/reporting files that are not part of the profile languages # (e.g., backup files, text dumps, etc.). Extensions in LANGUAGE_EXTENSIONS # include the leading dot; compare case-insensitively. if allowed_exts: allowed_set = set(e.lower() for e in allowed_exts) filtered = [] from pathlib import Path as _Path for fm in files_meta: try: suf = _Path(fm.path).suffix.lower() except Exception: suf = "" if suf in allowed_set: filtered.append(fm) files_meta = filtered # Load duplicates search settings (threshold, k, window) dup_settings = app_settings.get_duplicates_settings() or {} thr = dup_settings.get("threshold", 5.0) k = dup_settings.get("k", 25) window = dup_settings.get("window", 4) # Run duplicate finder on the snapshot directory (so reports are self-contained) try: # compute snapshot file list via scanner when available and pass it try: from .scanner import find_source_files as _find_src snap_files = _find_src( Path(snapshot_dir), allowed_extensions=allowed_exts ) snap_list = [str(p) for p in snap_files] except Exception: snap_list = None dup_res = dupmod.find_duplicates_in_dir( root=snapshot_dir, extensions=allowed_exts, dup_threshold=thr, k=k, window=window, file_list=snap_list, ) # convert to list of dicts dup_rows = [] for a, b in dup_res.get("exact", []): dup_rows.append( { "file_a": a, "file_b": b, "match_type": "exact", "pct_change": 0, } ) for a, b in dup_res.get("fuzzy", []): dup_rows.append( { "file_a": a, "file_b": b, "match_type": "fuzzy", "pct_change": f"<={thr}%", } ) except Exception: dup_rows = [] # Write reports into baseline folder (non-fatal) try: count_path = os.path.join(dest, "countings_report.txt") UCCReportGenerator.generate_counting_report( results=counting_results, output_path=Path(count_path), command_description=f"PyUcc Counting Analysis - Baseline: {baseline_id}", base_path=dir_path, ) except Exception: pass try: metrics_path = os.path.join(dest, "metrics_report.txt") UCCReportGenerator.generate_metrics_report( results=metrics_results, output_path=Path(metrics_path), command_description=f"PyUcc Metrics Analysis - Baseline: {baseline_id}", base_path=dir_path, ) except Exception: pass try: dup_path = os.path.join(dest, "duplicates_report.txt") UCCReportGenerator.generate_duplicates_report( duplicates=dup_rows, output_path=Path(dup_path), command_description=f"PyUcc Duplicate Analysis - Baseline: {baseline_id}", base_path=dir_path, params={ "threshold": thr, "extensions": allowed_exts, "k": k, "window": window, }, ) except Exception: pass except Exception: # Do not fail baseline creation if report generation has problems pass # Prune old baselines if requested if max_keep > 0: self._prune_old_baselines(dir_path, profile_name, max_keep) return baseline_id def _prune_old_baselines( self, project_root: str, profile_name: Optional[str], keep: int = 5 ): """Prune older baselines for the same project and profile, keeping `keep` newest.""" # scan baselines root and load metadata for each baseline entries = [] # list of (created_at, baseline_id, path) for bn in os.listdir(self.baselines_root): bdir = os.path.join(self.baselines_root, bn) if not os.path.isdir(bdir): continue meta_path = os.path.join(bdir, "metadata.json") if not os.path.exists(meta_path): continue try: with open(meta_path, "r", encoding="utf-8") as f: j = json.load(f) except Exception: continue # match by project_root and profile if j.get("project_root") != project_root: continue if profile_name is None: if j.get("profile") is not None: continue else: if j.get("profile") != profile_name: continue entries.append((j.get("created_at", 0), j.get("baseline_id", bn), bdir)) # sort by created_at descending (newest first) entries.sort(key=lambda x: x[0], reverse=True) # remove entries beyond keep for _, bid, path in entries[keep:]: try: shutil.rmtree(path) except Exception: pass def create_baseline_from_git( self, repo_path: str, commit_ref: str = "HEAD", baseline_id: Optional[str] = None, snapshot: bool = True, compute_sha1: bool = True, ignore_patterns: Optional[List[str]] = None, profile_name: Optional[str] = None, max_keep: int = 5, file_list: Optional[List[str]] = None, ) -> str: """Create a baseline by exporting a git commit (using `git archive`). This method requires that `git` is available in PATH. It will create a zip archive of the requested commit and then build the baseline metadata from the extracted tree. """ repo_path = os.path.abspath(repo_path) if baseline_id is None: ts = time.strftime("%Y%m%dT%H%M%S") if profile_name: safe_profile = profile_name.replace(" ", "_") baseline_id = f"{safe_profile}__{ts}_git_{commit_ref}" else: baseline_id = f"{ts}_git_{commit_ref}" dest = self._baseline_dir(baseline_id) if os.path.exists(dest): raise FileExistsError(dest) os.makedirs(dest, exist_ok=False) # create a temporary zip with git archive zip_tmp = os.path.join(dest, "export.zip") try: subprocess.run( ["git", "archive", "--format=zip", "-o", zip_tmp, commit_ref], cwd=repo_path, check=True, ) except subprocess.CalledProcessError as e: raise RuntimeError(f"git archive failed: {e}") # extract zip to a temp dir and build metadata similarly to dir baseline extract_dir = os.path.join(dest, "extracted") os.makedirs(extract_dir, exist_ok=True) shutil.unpack_archive(zip_tmp, extract_dir) # reuse create_baseline_from_dir logic but avoid creating nested baseline dir files_meta: List[FileMeta] = [] ignore_patterns = ignore_patterns or [] # If caller provided explicit file list (relative to extract_dir or absolute), use it try: from .scanner import find_source_files except Exception: find_source_files = None if file_list is not None: for f in file_list: try: p = Path(f) if not p.is_absolute(): p = Path(extract_dir) / p if not p.is_file(): continue rel_unix = os.path.relpath(str(p), extract_dir).replace("\\", "/") st = p.stat() sha1 = None if compute_sha1 and st.st_size > 0: try: sha1 = _sha1_of_file(p) except Exception: sha1 = None files_meta.append( FileMeta( path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1 ) ) except Exception: continue else: # Prefer scanner if available if find_source_files: try: src_files = find_source_files( Path(extract_dir), ignore_patterns=ignore_patterns ) except Exception: src_files = [] for p in src_files: try: rel_unix = os.path.relpath(str(p), extract_dir).replace( "\\", "/" ) st = p.stat() sha1 = None if compute_sha1 and st.st_size > 0: try: sha1 = _sha1_of_file(p) except Exception: sha1 = None files_meta.append( FileMeta( path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1, ) ) except Exception: continue else: for root, dirs, files in os.walk(extract_dir): for fn in files: fpath = os.path.join(root, fn) rel = os.path.relpath(fpath, extract_dir) rel_unix = rel.replace("\\", "/") # apply ignore patterns ignored = False for pat in ignore_patterns: if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch( fn, pat ): ignored = True break if ignored: continue try: st = os.stat(fpath) except OSError: continue sha1 = None if compute_sha1 and st.st_size > 0: try: sha1 = _sha1_of_file(Path(fpath)) except Exception: sha1 = None files_meta.append( FileMeta( path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1, ) ) # attempt to run per-file analyzers on the extracted tree and attach results try: from ..core.countings_impl import ( analyze_file_counts as _analyze_file_counts, ) from ..core.metrics import analyze_file_metrics as _analyze_metrics for fm in files_meta: abs_path = os.path.join(extract_dir, fm.path) try: c = _analyze_file_counts(Path(abs_path)) fm.countings = { "physical_lines": int(c.get("physical_lines", 0)), "code_lines": int(c.get("code_lines", 0)), "comment_lines": int(c.get("comment_lines", 0)), "blank_lines": int(c.get("blank_lines", 0)), # UCC extended metrics "comment_whole": int(c.get("comment_whole", 0)), "comment_embedded": int(c.get("comment_embedded", 0)), "compiler_directives": int(c.get("compiler_directives", 0)), "data_declarations": int(c.get("data_declarations", 0)), "exec_instructions": int(c.get("exec_instructions", 0)), "logical_sloc": int(c.get("logical_sloc", 0)), "physical_sloc": int(c.get("physical_sloc", 0)), } except Exception: fm.countings = None try: m = _analyze_metrics(abs_path) fm.metrics = { "avg_cc": float(m.get("avg_cc", 0.0)), "max_cc": int(m.get("max_cc", 0)), "func_count": int(m.get("func_count", 0)), "mi": float(m.get("mi", 0.0)), } except Exception: fm.metrics = None except Exception: pass metadata = BaselineMetadata( baseline_id=baseline_id, created_at=time.time(), source="git", origin=commit_ref, project_root=repo_path, files=files_meta, profile=profile_name, ) meta_path = self.get_metadata_path(baseline_id) with open(meta_path, "w", encoding="utf-8") as f: json.dump(self._metadata_to_dict(metadata), f, indent=2) # Optionally keep the extracted tree (snapshot) if snapshot: # move extracted content into dest/files snapshot_dir = os.path.join(dest, "files") shutil.move(extract_dir, snapshot_dir) # Optionally create zip archive from the files directory try: from ..config import settings as app_settings if app_settings.get_zip_baselines(): # Keep both files/ and create files.zip zip_archive = os.path.join(dest, "files.zip") shutil.make_archive( base_name=zip_archive[:-4], format="zip", root_dir=snapshot_dir ) except Exception: pass # Always remove the git export zip (it was just temporary) try: os.remove(zip_tmp) except Exception: pass # Generate UCC-style reports inside baseline folder for git-created baseline try: from ..utils.ucc_report_generator import UCCReportGenerator from ..config import profiles as profiles_cfg from ..config.languages import LANGUAGE_EXTENSIONS from ..config import settings as app_settings from ..core import duplicates as dupmod counting_results = [] metrics_results = [] for fm in files_meta: abs_snapshot = os.path.join(snapshot_dir, fm.path) if fm.countings: row = dict(fm.countings) row["file"] = abs_snapshot row["language"] = "" counting_results.append(row) if fm.metrics: mrow = dict(fm.metrics) mrow["file"] = abs_snapshot metrics_results.append(mrow) allowed_exts = None if profile_name: pr = profiles_cfg.find_profile(profile_name) if pr: exts = [] for ln in pr.get("languages", []) or []: if ln in LANGUAGE_EXTENSIONS: exts.extend(LANGUAGE_EXTENSIONS[ln]) if exts: allowed_exts = list(set(exts)) dup_settings = app_settings.get_duplicates_settings() or {} thr = dup_settings.get("threshold", 5.0) k = dup_settings.get("k", 25) window = dup_settings.get("window", 4) try: # compute snapshot file list via scanner when available and pass it try: from .scanner import find_source_files as _find_src snap_files = _find_src( Path(snapshot_dir), allowed_extensions=allowed_exts ) snap_list = [str(p) for p in snap_files] except Exception: snap_list = None dup_res = dupmod.find_duplicates_in_dir( root=snapshot_dir, extensions=allowed_exts, dup_threshold=thr, k=k, window=window, file_list=snap_list, ) dup_rows = [] for a, b in dup_res.get("exact", []): dup_rows.append( { "file_a": a, "file_b": b, "match_type": "exact", "pct_change": 0, } ) for a, b in dup_res.get("fuzzy", []): dup_rows.append( { "file_a": a, "file_b": b, "match_type": "fuzzy", "pct_change": f"<={thr}%", } ) except Exception: dup_rows = [] try: count_path = os.path.join(dest, "countings_report.txt") UCCReportGenerator.generate_counting_report( results=counting_results, output_path=Path(count_path), command_description=f"PyUcc Counting Analysis - Baseline: {baseline_id}", base_path=repo_path, ) except Exception: pass try: metrics_path = os.path.join(dest, "metrics_report.txt") UCCReportGenerator.generate_metrics_report( results=metrics_results, output_path=Path(metrics_path), command_description=f"PyUcc Metrics Analysis - Baseline: {baseline_id}", base_path=repo_path, ) except Exception: pass try: dup_path = os.path.join(dest, "duplicates_report.txt") UCCReportGenerator.generate_duplicates_report( duplicates=dup_rows, output_path=Path(dup_path), command_description=f"PyUcc Duplicate Analysis - Baseline: {baseline_id}", base_path=repo_path, params={ "threshold": thr, "extensions": allowed_exts, "k": k, "window": window, }, ) except Exception: pass except Exception: pass else: # remove extracted files and zip shutil.rmtree(extract_dir, ignore_errors=True) try: os.remove(zip_tmp) except Exception: pass # prune old baselines if requested if max_keep > 0: self._prune_old_baselines(repo_path, profile_name, max_keep) return baseline_id def load_metadata(self, baseline_id: str) -> BaselineMetadata: meta_path = self.get_metadata_path(baseline_id) with open(meta_path, "r", encoding="utf-8") as f: j = json.load(f) files = [FileMeta(**fm) for fm in j["files"]] return BaselineMetadata( baseline_id=j["baseline_id"], created_at=j["created_at"], source=j.get("source", "local"), origin=j.get("origin"), project_root=j.get("project_root", ""), files=files, profile=j.get("profile"), ) def _metadata_to_dict(self, meta: BaselineMetadata) -> Dict: d = asdict(meta) # dataclass conversion d["files"] = [asdict(fm) for fm in meta.files] return d class Differ: def __init__( self, baseline: BaselineMetadata, current_dir: str, max_workers: int = 4, ignore_patterns: Optional[List[str]] = None, baseline_files_dir: Optional[str] = None, current_file_list: Optional[List[FileMeta]] = None, ): self.baseline = baseline self.current_dir = os.path.abspath(current_dir) self.max_workers = max_workers # Normalize ignore patterns so entries like '.bak' become '*.bak' try: from .scanner import normalize_ignore_patterns self.ignore_patterns = normalize_ignore_patterns(ignore_patterns) or [] except Exception: self.ignore_patterns = ignore_patterns or [] # If caller passed a precomputed current file list, use it (avoids rescanning) self._current_files_cache: Optional[List[FileMeta]] = current_file_list # baseline_files_dir is the directory containing the baseline snapshot files # If not provided, falls back to baseline.project_root (for backwards compatibility) self.baseline_files_dir = ( baseline_files_dir if baseline_files_dir else baseline.project_root ) def build_current_file_list(self) -> List[FileMeta]: # Return cached result if already computed if self._current_files_cache is not None: return self._current_files_cache files_meta: List[FileMeta] = [] # Prefer to use scanner.find_source_files so scanning rules are centralized try: from .scanner import find_source_files except Exception: find_source_files = None # Derive allowed extensions from baseline profile if available allowed_exts = None try: from ..config import profiles as profiles_cfg from ..config.languages import LANGUAGE_EXTENSIONS except Exception: profiles_cfg = None LANGUAGE_EXTENSIONS = {} if self.baseline and self.baseline.profile and profiles_cfg: pr = profiles_cfg.find_profile(self.baseline.profile) if pr: exts = [] for ln in pr.get("languages", []) or []: if ln in LANGUAGE_EXTENSIONS: exts.extend(LANGUAGE_EXTENSIONS[ln]) if exts: allowed_exts = list(set(exts)) if find_source_files: try: src_files = find_source_files( Path(self.current_dir), allowed_extensions=allowed_exts, ignore_patterns=self.ignore_patterns, ) except Exception: src_files = [] for p in src_files: try: rel_unix = os.path.relpath(str(p), self.current_dir).replace( "\\", "/" ) st = p.stat() sha1 = None try: sha1 = _sha1_of_file(p) except Exception: sha1 = None files_meta.append( FileMeta( path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1 ) ) except Exception: continue else: for root, dirs, files in os.walk(self.current_dir): for fn in files: fpath = os.path.join(root, fn) rel = os.path.relpath(fpath, self.current_dir) # apply ignore patterns from profile: test against relative path (unix-style) and filename rel_unix = rel.replace("\\", "/") ignored = False for pat in self.ignore_patterns or []: # patterns are normalized and lower-cased by the scanner helper if fnmatch.fnmatch(rel_unix.lower(), pat) or fnmatch.fnmatch( fn.lower(), pat ): ignored = True break if ignored: continue try: st = os.stat(fpath) except OSError: continue sha1 = None # Compute SHA1 for all files, including 0-byte files try: sha1 = _sha1_of_file(Path(fpath)) except Exception: sha1 = None files_meta.append( FileMeta( path=rel.replace("\\", "/"), size=st.st_size, mtime=st.st_mtime, sha1=sha1, ) ) # Run per-file analyzers (countings + metrics) and attach results to each FileMeta # This ensures current files have the same data as baseline files for comparison try: from ..core.countings_impl import ( analyze_file_counts as _analyze_file_counts, ) from ..core.metrics import analyze_file_metrics as _analyze_metrics for fm in files_meta: abs_path = os.path.join(self.current_dir, fm.path) # per-file counts try: c = _analyze_file_counts(Path(abs_path)) fm.countings = { "physical_lines": int(c.get("physical_lines", 0)), "code_lines": int(c.get("code_lines", 0)), "comment_lines": int(c.get("comment_lines", 0)), "blank_lines": int(c.get("blank_lines", 0)), # UCC extended metrics "comment_whole": int(c.get("comment_whole", 0)), "comment_embedded": int(c.get("comment_embedded", 0)), "compiler_directives": int(c.get("compiler_directives", 0)), "data_declarations": int(c.get("data_declarations", 0)), "exec_instructions": int(c.get("exec_instructions", 0)), "logical_sloc": int(c.get("logical_sloc", 0)), "physical_sloc": int(c.get("physical_sloc", 0)), } except Exception: fm.countings = None # per-file metrics try: m = _analyze_metrics(abs_path) fm.metrics = { "avg_cc": float(m.get("avg_cc", 0.0)), "max_cc": int(m.get("max_cc", 0)), "func_count": int(m.get("func_count", 0)), "mi": float(m.get("mi", 0.0)), } except Exception: fm.metrics = None except Exception: pass # Cache the result to avoid recomputation self._current_files_cache = files_meta return files_meta @staticmethod def _index_by_name(files: List[FileMeta]) -> Dict[str, List[FileMeta]]: idx: Dict[str, List[FileMeta]] = {} for f in files: name = os.path.basename(f.path) idx.setdefault(name, []).append(f) return idx @staticmethod def _levenshtein(a: str, b: str) -> int: # simple DP implementation la, lb = len(a), len(b) if la == 0: return lb if lb == 0: return la prev = list(range(lb + 1)) for i, ca in enumerate(a, start=1): cur = [i] + [0] * lb for j, cb in enumerate(b, start=1): add = prev[j] + 1 delete = cur[j - 1] + 1 change = prev[j - 1] + (0 if ca == cb else 1) cur[j] = min(add, delete, change) prev = cur return prev[lb] def match_files( self, baseline_files: List[FileMeta], current_files: List[FileMeta] ) -> List[Tuple[Optional[FileMeta], Optional[FileMeta]]]: # Implement Gale-Shapley stable matching inspired by UCC logic. # Build maps by filename only (candidates must share the same filename) mapA_by_name = self._index_by_name(baseline_files) mapB_by_name = self._index_by_name(current_files) # Build preference lists (for A: list of B candidates sorted by path distance) prefsA: Dict[str, List[FileMeta]] = {} # key: a.path prefsB: Dict[str, List[FileMeta]] = {} # helper: compute preference value between two file paths (parent dirs) def pref_val(pa: str, pb: str) -> int: parent_a = os.path.dirname(pa) parent_b = os.path.dirname(pb) return self._levenshtein(parent_a, parent_b) # populate preferences A -> Bs for a in baseline_files: candidates = mapB_by_name.get(os.path.basename(a.path), []) # compute scores and sort scored = [(pref_val(a.path, b.path), b) for b in candidates] scored.sort(key=lambda x: x[0]) prefsA[a.path] = [b for (_s, b) in scored] # populate preferences B -> As for b in current_files: candidates = mapA_by_name.get(os.path.basename(b.path), []) scored = [(pref_val(a.path, b.path), a) for a in candidates] scored.sort(key=lambda x: x[0]) prefsB[b.path] = [a for (_s, a) in scored] # Prepare Gale-Shapley structures freeA = [a for a in baseline_files] next_proposal_index: Dict[str, int] = {a.path: 0 for a in baseline_files} matchA: Dict[str, Optional[FileMeta]] = {a.path: None for a in baseline_files} matchB: Dict[str, Optional[FileMeta]] = {b.path: None for b in current_files} # For quick comparison, build rank maps for B preferences rankB: Dict[str, Dict[str, int]] = {} for b in current_files: rank = {} plist = prefsB.get(b.path, []) for idx, a in enumerate(plist): rank[a.path] = idx rankB[b.path] = rank while freeA: a = freeA.pop(0) a_key = a.path plist = prefsA.get(a_key, []) if not plist: # no candidates matchA[a_key] = None continue # propose to next candidate i = next_proposal_index[a_key] if i >= len(plist): matchA[a_key] = None continue b = plist[i] next_proposal_index[a_key] = i + 1 b_key = b.path current = matchB.get(b_key) if current is None: # b accepts matchA[a_key] = b matchB[b_key] = a else: # b decides preference between current and proposer rank_map = rankB.get(b_key, {}) r_current = rank_map.get(current.path, float("inf")) r_proposer = rank_map.get(a_key, float("inf")) if r_proposer < r_current: # b prefers new proposer matchA[a_key] = b matchB[b_key] = a # previous current becomes free again matchA[current.path] = None freeA.append(current) else: # b rejects proposer -> proposer remains free (if more prefs) freeA.append(a) # Build results list: pairs for matched A entries results: List[Tuple[Optional[FileMeta], Optional[FileMeta]]] = [] usedB = set() for a in baseline_files: b = matchA.get(a.path) if b is None: results.append((a, None)) else: results.append((a, b)) usedB.add(b.path) # Any B not matched are added as (None, b) for b in current_files: if b.path not in usedB: results.append((None, b)) return results @staticmethod def _diff_file_pair(fileA_path: Optional[str], fileB_path: Optional[str]) -> Dict: res = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0} if fileA_path is None and fileB_path is None: return res if fileA_path is None: # all lines are added try: with open(fileB_path, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() res["added"] = len(lines) except Exception: res["added"] = 0 return res if fileB_path is None: try: with open(fileA_path, "r", encoding="utf-8", errors="ignore") as f: lines = f.readlines() res["deleted"] = len(lines) except Exception: res["deleted"] = 0 return res # both exist; line-based diff try: with open(fileA_path, "r", encoding="utf-8", errors="ignore") as fa: a_lines = fa.readlines() except Exception: a_lines = [] try: with open(fileB_path, "r", encoding="utf-8", errors="ignore") as fb: b_lines = fb.readlines() except Exception: b_lines = [] sm = difflib.SequenceMatcher(a=a_lines, b=b_lines) # DEBUG: Log if files are identical but difflib finds differences has_differences = False for tag, i1, i2, j1, j2 in sm.get_opcodes(): if tag != "equal": has_differences = True break if has_differences and len(a_lines) == len(b_lines): # Files have same line count but difflib sees differences print(f"[DIFFER] ANOMALY DETECTED:") print(f" FileA: {fileA_path}") print(f" FileB: {fileB_path}") print(f" Lines: {len(a_lines)} vs {len(b_lines)}") # Check first differing line for i, (line_a, line_b) in enumerate(zip(a_lines, b_lines)): if line_a != line_b: print(f" First diff at line {i+1}:") print(f" A: {repr(line_a[:80])}") print(f" B: {repr(line_b[:80])}") break for tag, i1, i2, j1, j2 in sm.get_opcodes(): if tag == "equal": res["unmodified"] += i2 - i1 elif tag == "delete": res["deleted"] += i2 - i1 elif tag == "insert": res["added"] += j2 - j1 elif tag == "replace": la = i2 - i1 lb = j2 - j1 res["modified"] += min(la, lb) if la > lb: res["deleted"] += la - lb elif lb > la: res["added"] += lb - la return res def diff(self) -> Dict: baseline_files = self.baseline.files current_files = self.build_current_file_list() pairs = self.match_files(baseline_files, current_files) total = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0} matched_results = [] # helper to construct absolute paths def abs_path_for(meta: FileMeta) -> str: return ( os.path.join(self.current_dir, meta.path) if meta is not None else None ) # process pairs possibly in parallel tasks = [] with ThreadPoolExecutor(max_workers=self.max_workers) as ex: futures = [] for a, b in pairs: fa = ( os.path.join(self.baseline_files_dir, a.path) if a is not None else None ) fb = os.path.join(self.current_dir, b.path) if b is not None else None futures.append(ex.submit(self._diff_file_pair, fa, fb)) for (a, b), fut in zip(pairs, futures): res = fut.result() total["added"] += res["added"] total["deleted"] += res["deleted"] total["modified"] += res["modified"] total["unmodified"] += res["unmodified"] # Extract countings and metrics from baseline and current files baseline_countings = ( a.countings if (a is not None and hasattr(a, "countings")) else None ) baseline_metrics = ( a.metrics if (a is not None and hasattr(a, "metrics")) else None ) current_countings = ( b.countings if (b is not None and hasattr(b, "countings")) else None ) current_metrics = ( b.metrics if (b is not None and hasattr(b, "metrics")) else None ) # Compute deltas for countings countings_delta = None if baseline_countings and current_countings: countings_delta = { "physical_lines": current_countings.get("physical_lines", 0) - baseline_countings.get("physical_lines", 0), "code_lines": current_countings.get("code_lines", 0) - baseline_countings.get("code_lines", 0), "comment_lines": current_countings.get("comment_lines", 0) - baseline_countings.get("comment_lines", 0), "blank_lines": current_countings.get("blank_lines", 0) - baseline_countings.get("blank_lines", 0), # UCC extended deltas "comment_whole": current_countings.get("comment_whole", 0) - baseline_countings.get("comment_whole", 0), "comment_embedded": current_countings.get("comment_embedded", 0) - baseline_countings.get("comment_embedded", 0), "compiler_directives": current_countings.get( "compiler_directives", 0 ) - baseline_countings.get("compiler_directives", 0), "data_declarations": current_countings.get( "data_declarations", 0 ) - baseline_countings.get("data_declarations", 0), "exec_instructions": current_countings.get( "exec_instructions", 0 ) - baseline_countings.get("exec_instructions", 0), "logical_sloc": current_countings.get("logical_sloc", 0) - baseline_countings.get("logical_sloc", 0), "physical_sloc": current_countings.get("physical_sloc", 0) - baseline_countings.get("physical_sloc", 0), } # DEBUG LOGGING: Show comparison details when there's a delta if any(v != 0 for v in countings_delta.values()): fileA_path = a.path if a else "None" fileB_path = b.path if b else "None" print( f"[DIFFER] DELTA DETECTED for {fileA_path} vs {fileB_path}" ) print(f" Baseline: {baseline_countings}") print(f" Current: {current_countings}") print(f" Delta: {countings_delta}") # Compute deltas for metrics metrics_delta = None if baseline_metrics and current_metrics: metrics_delta = { "func_count": current_metrics.get("func_count", 0) - baseline_metrics.get("func_count", 0), "avg_cc": current_metrics.get("avg_cc", 0.0) - baseline_metrics.get("avg_cc", 0.0), "max_cc": current_metrics.get("max_cc", 0) - baseline_metrics.get("max_cc", 0), "mi": current_metrics.get("mi", 0.0) - baseline_metrics.get("mi", 0.0), } matched_results.append( { "fileA": a.path if a is not None else None, "fileB": b.path if b is not None else None, "counts": res, "baseline_countings": baseline_countings, "current_countings": current_countings, "countings_delta": countings_delta, "baseline_metrics": baseline_metrics, "current_metrics": current_metrics, "metrics_delta": metrics_delta, } ) result = { "baseline_id": self.baseline.baseline_id, "compared_at": time.time(), "total": total, "pairs": matched_results, } # Compute summary statistics from baseline and current file metadata try: # Calculate baseline summary from baseline files (which have embedded countings/metrics) baseline_counts = { "physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0, } baseline_metrics = { "file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0, } baseline_metrics_count = 0 for fm in baseline_files: if fm.countings: baseline_counts["physical_lines"] += fm.countings.get( "physical_lines", 0 ) baseline_counts["code_lines"] += fm.countings.get("code_lines", 0) baseline_counts["comment_lines"] += fm.countings.get( "comment_lines", 0 ) baseline_counts["blank_lines"] += fm.countings.get("blank_lines", 0) baseline_counts["file_count"] += 1 if fm.metrics: baseline_metrics["total_func_count"] += fm.metrics.get( "func_count", 0 ) baseline_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0) baseline_metrics["avg_mi"] += fm.metrics.get("mi", 0.0) baseline_metrics_count += 1 if baseline_metrics_count > 0: baseline_metrics["avg_avg_cc"] /= baseline_metrics_count baseline_metrics["avg_mi"] /= baseline_metrics_count baseline_metrics["file_count"] = baseline_metrics_count # Calculate current summary from current files (which have embedded countings/metrics) current_counts = { "physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0, } current_metrics = { "file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0, } current_metrics_count = 0 for fm in current_files: if fm.countings: current_counts["physical_lines"] += fm.countings.get( "physical_lines", 0 ) current_counts["code_lines"] += fm.countings.get("code_lines", 0) current_counts["comment_lines"] += fm.countings.get( "comment_lines", 0 ) current_counts["blank_lines"] += fm.countings.get("blank_lines", 0) current_counts["file_count"] += 1 if fm.metrics: current_metrics["total_func_count"] += fm.metrics.get( "func_count", 0 ) current_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0) current_metrics["avg_mi"] += fm.metrics.get("mi", 0.0) current_metrics_count += 1 if current_metrics_count > 0: current_metrics["avg_avg_cc"] /= current_metrics_count current_metrics["avg_mi"] /= current_metrics_count current_metrics["file_count"] = current_metrics_count # Compute deltas delta_counts = { "physical_lines": current_counts["physical_lines"] - baseline_counts["physical_lines"], "code_lines": current_counts["code_lines"] - baseline_counts["code_lines"], "comment_lines": current_counts["comment_lines"] - baseline_counts["comment_lines"], "blank_lines": current_counts["blank_lines"] - baseline_counts["blank_lines"], "file_count": current_counts["file_count"] - baseline_counts["file_count"], } delta_metrics = { "total_func_count": current_metrics["total_func_count"] - baseline_metrics["total_func_count"], "avg_avg_cc": current_metrics["avg_avg_cc"] - baseline_metrics["avg_avg_cc"], "avg_mi": current_metrics["avg_mi"] - baseline_metrics["avg_mi"], } result["summary"] = { "baseline": {"countings": baseline_counts, "metrics": baseline_metrics}, "current": {"countings": current_counts, "metrics": current_metrics}, "delta": {"countings": delta_counts, "metrics": delta_metrics}, } except Exception: pass return result