SXXXXXXX_PyUCC/pyucc/core/differ.py

1592 lines
66 KiB
Python

"""
Baseline manager and differ prototype.
- create baseline from directory (snapshot by default)
- load baseline metadata
- diff baseline vs current directory
- output results as dict / JSON-serializable
This is a minimal, self-contained implementation inspired by UCC's DiffTool.
"""
from __future__ import annotations
import hashlib
import json
import os
import shutil
import stat
import subprocess
import tempfile
import time
import fnmatch
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import difflib
BASELINE_ROOT_DIRNAME = ".pyucc_baselines"
def _sha1_of_file(path: Path, chunk_size: int = 8192) -> str:
h = hashlib.sha1()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
return h.hexdigest()
@dataclass
class FileMeta:
path: str # relative path
size: int
mtime: float
sha1: Optional[str] = None
countings: Optional[Dict] = None
metrics: Optional[Dict] = None
@dataclass
class BaselineMetadata:
baseline_id: str
created_at: float
source: str # 'local' or 'git'
origin: Optional[str]
project_root: str
files: List[FileMeta]
profile: Optional[str] = None
class BaselineManager:
def __init__(self, workspace_root: str, baselines_root: Optional[str] = None):
"""Manage baselines storage.
Args:
workspace_root: path to the project/workspace (kept for metadata usage).
baselines_root: optional absolute or relative path where baselines are stored.
If omitted, the environment variable `PYUCC_BASELINE_DIR` is consulted;
if that's not set, defaults to `./baseline` in the current working dir.
"""
self.workspace_root = os.path.abspath(workspace_root)
if baselines_root:
self.baselines_root = os.path.abspath(baselines_root)
else:
# priority: env var, app settings, fallback to ./baseline
env = os.getenv("PYUCC_BASELINE_DIR")
if env:
self.baselines_root = os.path.abspath(env)
else:
# try app settings if available
try:
from ..config import settings as app_settings
sdir = app_settings.get_baseline_dir()
except Exception:
sdir = None
if sdir:
self.baselines_root = os.path.abspath(sdir)
else:
self.baselines_root = os.path.join(os.getcwd(), "baseline")
os.makedirs(self.baselines_root, exist_ok=True)
def _baseline_dir(self, baseline_id: str) -> str:
return os.path.join(self.baselines_root, baseline_id)
def get_baseline_files_dir(self, baseline_id: str) -> str:
"""Get the directory containing the baseline snapshot files."""
return os.path.join(self._baseline_dir(baseline_id), "files")
def list_baselines(self) -> List[str]:
return [
d
for d in os.listdir(self.baselines_root)
if os.path.isdir(os.path.join(self.baselines_root, d))
]
def get_metadata_path(self, baseline_id: str) -> str:
return os.path.join(self._baseline_dir(baseline_id), "metadata.json")
def create_baseline_from_dir(
self,
dir_path: str,
baseline_id: Optional[str] = None,
snapshot: bool = True,
compute_sha1: bool = True,
ignore_patterns: Optional[List[str]] = None,
profile_name: Optional[str] = None,
max_keep: int = 5,
file_list: Optional[List[str]] = None,
) -> str:
dir_path = os.path.abspath(dir_path)
if baseline_id is None:
ts = time.strftime("%Y%m%dT%H%M%S")
# include profile name in baseline id when available
if profile_name:
safe_profile = profile_name.replace(" ", "_")
baseline_id = f"{safe_profile}__{ts}_local"
else:
baseline_id = f"{ts}_local"
dest = self._baseline_dir(baseline_id)
if os.path.exists(dest):
raise FileExistsError(dest)
os.makedirs(dest, exist_ok=False)
files_meta: List[FileMeta] = []
# Normalize ignore patterns using scanner helper so patterns like '.bak' are treated as '*.bak'
try:
from .scanner import normalize_ignore_patterns, find_source_files
ignore_patterns = normalize_ignore_patterns(ignore_patterns) or []
except Exception:
# fallback: ensure ignore_patterns is list-like
find_source_files = None
ignore_patterns = ignore_patterns or []
# If caller provided explicit file_list, use it. Otherwise delegate to scanner.find_source_files
if file_list is not None:
# build FileMeta entries from provided list (paths relative to dir_path or absolute)
for f in file_list:
try:
p = Path(f)
if not p.is_absolute():
p = Path(dir_path) / p
if not p.is_file():
continue
rel_unix = os.path.relpath(str(p), dir_path).replace("\\", "/")
st = p.stat()
sha1 = None
if compute_sha1:
try:
sha1 = _sha1_of_file(p)
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1
)
)
except Exception:
continue
else:
# Determine allowed extensions from profile (if provided) to pass to scanner
allowed_exts = None
try:
from ..config import profiles as profiles_cfg
from ..config.languages import LANGUAGE_EXTENSIONS
except Exception:
profiles_cfg = None
LANGUAGE_EXTENSIONS = {}
if profile_name and profiles_cfg:
pr = profiles_cfg.find_profile(profile_name)
if pr:
exts = []
for ln in pr.get("languages", []) or []:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
if exts:
allowed_exts = list(set(exts))
# If scanner available, use it; otherwise fallback to os.walk
if find_source_files:
try:
src_files = find_source_files(
Path(dir_path),
allowed_extensions=allowed_exts,
ignore_patterns=ignore_patterns,
)
except Exception:
src_files = []
for p in src_files:
try:
rel_unix = os.path.relpath(str(p), dir_path).replace("\\", "/")
st = p.stat()
sha1 = None
if compute_sha1:
try:
sha1 = _sha1_of_file(p)
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel_unix,
size=st.st_size,
mtime=st.st_mtime,
sha1=sha1,
)
)
except Exception:
continue
else:
for root, dirs, files in os.walk(dir_path):
for fn in files:
fpath = os.path.join(root, fn)
# skip baseline storage area if under workspace_root
if (
os.path.commonpath([self.baselines_root, fpath])
== self.baselines_root
):
continue
rel = os.path.relpath(fpath, dir_path)
# check ignore patterns against relative path (unix-style)
rel_unix = rel.replace("\\", "/")
ignored = False
for pat in ignore_patterns:
if not pat:
continue
if fnmatch.fnmatch(
rel_unix.lower(), pat
) or fnmatch.fnmatch(fn.lower(), pat):
ignored = True
break
if ignored:
continue
try:
st = os.stat(fpath)
except OSError:
continue
sha1 = None
if compute_sha1: # also compute for 0-byte files
try:
sha1 = _sha1_of_file(Path(fpath))
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel_unix,
size=st.st_size,
mtime=st.st_mtime,
sha1=sha1,
)
)
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
try:
from ..core.countings_impl import (
analyze_file_counts as _analyze_file_counts,
)
from ..core.metrics import analyze_file_metrics as _analyze_metrics
for fm in files_meta:
abs_path = os.path.join(dir_path, fm.path)
# per-file counts
try:
c = _analyze_file_counts(Path(abs_path))
fm.countings = {
"physical_lines": int(c.get("physical_lines", 0)),
"code_lines": int(c.get("code_lines", 0)),
"comment_lines": int(c.get("comment_lines", 0)),
"blank_lines": int(c.get("blank_lines", 0)),
# UCC extended metrics
"comment_whole": int(c.get("comment_whole", 0)),
"comment_embedded": int(c.get("comment_embedded", 0)),
"compiler_directives": int(c.get("compiler_directives", 0)),
"data_declarations": int(c.get("data_declarations", 0)),
"exec_instructions": int(c.get("exec_instructions", 0)),
"logical_sloc": int(c.get("logical_sloc", 0)),
"physical_sloc": int(c.get("physical_sloc", 0)),
}
except Exception:
fm.countings = None
# per-file metrics
try:
m = _analyze_metrics(abs_path)
fm.metrics = {
"avg_cc": float(m.get("avg_cc", 0.0)),
"max_cc": int(m.get("max_cc", 0)),
"func_count": int(m.get("func_count", 0)),
"mi": float(m.get("mi", 0.0)),
}
except Exception:
fm.metrics = None
except Exception:
pass
# If profile provides languages, determine allowed extensions and
# filter `files_meta` BEFORE creating snapshot so unwanted files
# (e.g. backups, .txt dumps) are not copied into the baseline.
try:
from ..config import profiles as profiles_cfg
from ..config.languages import LANGUAGE_EXTENSIONS
except Exception:
profiles_cfg = None
LANGUAGE_EXTENSIONS = {}
allowed_exts = None
if profile_name and profiles_cfg:
pr = profiles_cfg.find_profile(profile_name)
if pr:
exts = []
for ln in pr.get("languages", []) or []:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
if exts:
allowed_exts = list(set(exts))
if allowed_exts:
allowed_set = set(e.lower() for e in allowed_exts)
from pathlib import Path as _Path
filtered = []
for fm in files_meta:
try:
suf = _Path(fm.path).suffix.lower()
except Exception:
suf = ""
if suf in allowed_set:
filtered.append(fm)
files_meta = filtered
metadata = BaselineMetadata(
baseline_id=baseline_id,
created_at=time.time(),
source="local",
origin=None,
project_root=dir_path,
files=files_meta,
profile=profile_name,
) # Save metadata
meta_path = self.get_metadata_path(baseline_id)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(self._metadata_to_dict(metadata), f, indent=2)
# Optionally store a snapshot
if snapshot:
snapshot_dir = os.path.join(dest, "files")
os.makedirs(snapshot_dir, exist_ok=True)
# Copy only the files that were included in the baseline (respecting ignore patterns)
for fm in files_meta:
src_file = os.path.join(dir_path, fm.path)
dst_file = os.path.join(snapshot_dir, fm.path)
# Create parent directories if needed
dst_parent = os.path.dirname(dst_file)
if dst_parent:
os.makedirs(dst_parent, exist_ok=True)
try:
shutil.copy2(src_file, dst_file) # copy2 preserves metadata
except Exception:
pass # skip files that cannot be copied
# Optionally create zip archive (controlled by settings)
# Check if user wants zip archives (for space savings at cost of speed)
try:
from ..config import settings as app_settings
if app_settings.get_zip_baselines():
zip_path = os.path.join(dest, "files.zip")
shutil.make_archive(
base_name=zip_path[:-4], format="zip", root_dir=snapshot_dir
)
except Exception:
pass # if settings not available or zip fails, continue without zip
# Generate UCC-style reports (countings, metrics, duplicates) inside baseline folder
try:
from ..utils.ucc_report_generator import UCCReportGenerator
from ..config import profiles as profiles_cfg
from ..config.languages import LANGUAGE_EXTENSIONS
from ..config import settings as app_settings
from ..core import duplicates as dupmod
# Prepare counting and metrics lists for report generator
counting_results = []
metrics_results = []
# Attempt to ensure per-file `functions` details are present in metrics results.
# In some environments `lizard` may not have been available at the time
# the initial per-file analysis ran; re-run metrics analysis on the
# snapshot file when possible to obtain function-level details.
try:
from ..core.metrics import analyze_file_metrics as _analyze_metrics
except Exception:
_analyze_metrics = None
for fm in files_meta:
abs_snapshot = os.path.join(snapshot_dir, fm.path)
if fm.countings:
row = dict(fm.countings)
row["file"] = abs_snapshot
# language may be derived from extension if available
row["language"] = ""
counting_results.append(row)
# Start from existing metrics (if any)
mdata = fm.metrics if fm.metrics else None
# If we don't have function-level data and analyzer available, re-run
if _analyze_metrics and (not mdata or not mdata.get("functions")):
try:
# run analyzer on the snapshot copy to ensure consistent paths
rean = _analyze_metrics(abs_snapshot)
if rean:
mdata = rean
except Exception:
pass
if mdata:
mrow = dict(mdata)
mrow["file"] = abs_snapshot
metrics_results.append(mrow)
# Determine allowed extensions from profile (if provided)
allowed_exts = None
if profile_name:
pr = profiles_cfg.find_profile(profile_name)
if pr:
exts = []
for ln in pr.get("languages", []) or []:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
if exts:
allowed_exts = list(set(exts))
# If allowed_exts is set, prune files_meta to keep only matching extensions.
# This prevents copying/reporting files that are not part of the profile languages
# (e.g., backup files, text dumps, etc.). Extensions in LANGUAGE_EXTENSIONS
# include the leading dot; compare case-insensitively.
if allowed_exts:
allowed_set = set(e.lower() for e in allowed_exts)
filtered = []
from pathlib import Path as _Path
for fm in files_meta:
try:
suf = _Path(fm.path).suffix.lower()
except Exception:
suf = ""
if suf in allowed_set:
filtered.append(fm)
files_meta = filtered
# Load duplicates search settings (threshold, k, window)
dup_settings = app_settings.get_duplicates_settings() or {}
thr = dup_settings.get("threshold", 5.0)
k = dup_settings.get("k", 25)
window = dup_settings.get("window", 4)
# Run duplicate finder on the snapshot directory (so reports are self-contained)
try:
# compute snapshot file list via scanner when available and pass it
try:
from .scanner import find_source_files as _find_src
snap_files = _find_src(
Path(snapshot_dir), allowed_extensions=allowed_exts
)
snap_list = [str(p) for p in snap_files]
except Exception:
snap_list = None
dup_res = dupmod.find_duplicates_in_dir(
root=snapshot_dir,
extensions=allowed_exts,
dup_threshold=thr,
k=k,
window=window,
file_list=snap_list,
)
# convert to list of dicts
dup_rows = []
for a, b in dup_res.get("exact", []):
dup_rows.append(
{
"file_a": a,
"file_b": b,
"match_type": "exact",
"pct_change": 0,
}
)
for a, b in dup_res.get("fuzzy", []):
dup_rows.append(
{
"file_a": a,
"file_b": b,
"match_type": "fuzzy",
"pct_change": f"<={thr}%",
}
)
except Exception:
dup_rows = []
# Write reports into baseline folder (non-fatal)
try:
count_path = os.path.join(dest, "countings_report.txt")
UCCReportGenerator.generate_counting_report(
results=counting_results,
output_path=Path(count_path),
command_description=f"PyUcc Counting Analysis - Baseline: {baseline_id}",
base_path=dir_path,
)
except Exception:
pass
try:
metrics_path = os.path.join(dest, "metrics_report.txt")
UCCReportGenerator.generate_metrics_report(
results=metrics_results,
output_path=Path(metrics_path),
command_description=f"PyUcc Metrics Analysis - Baseline: {baseline_id}",
base_path=dir_path,
)
except Exception:
pass
try:
dup_path = os.path.join(dest, "duplicates_report.txt")
UCCReportGenerator.generate_duplicates_report(
duplicates=dup_rows,
output_path=Path(dup_path),
command_description=f"PyUcc Duplicate Analysis - Baseline: {baseline_id}",
base_path=dir_path,
params={
"threshold": thr,
"extensions": allowed_exts,
"k": k,
"window": window,
},
)
except Exception:
pass
except Exception:
# Do not fail baseline creation if report generation has problems
pass
# Prune old baselines if requested
if max_keep > 0:
self._prune_old_baselines(dir_path, profile_name, max_keep)
return baseline_id
def _prune_old_baselines(
self, project_root: str, profile_name: Optional[str], keep: int = 5
):
"""Prune older baselines for the same project and profile, keeping `keep` newest."""
# scan baselines root and load metadata for each baseline
entries = [] # list of (created_at, baseline_id, path)
for bn in os.listdir(self.baselines_root):
bdir = os.path.join(self.baselines_root, bn)
if not os.path.isdir(bdir):
continue
meta_path = os.path.join(bdir, "metadata.json")
if not os.path.exists(meta_path):
continue
try:
with open(meta_path, "r", encoding="utf-8") as f:
j = json.load(f)
except Exception:
continue
# match by project_root and profile
if j.get("project_root") != project_root:
continue
if profile_name is None:
if j.get("profile") is not None:
continue
else:
if j.get("profile") != profile_name:
continue
entries.append((j.get("created_at", 0), j.get("baseline_id", bn), bdir))
# sort by created_at descending (newest first)
entries.sort(key=lambda x: x[0], reverse=True)
# remove entries beyond keep
for _, bid, path in entries[keep:]:
try:
shutil.rmtree(path)
except Exception:
pass
def create_baseline_from_git(
self,
repo_path: str,
commit_ref: str = "HEAD",
baseline_id: Optional[str] = None,
snapshot: bool = True,
compute_sha1: bool = True,
ignore_patterns: Optional[List[str]] = None,
profile_name: Optional[str] = None,
max_keep: int = 5,
file_list: Optional[List[str]] = None,
) -> str:
"""Create a baseline by exporting a git commit (using `git archive`).
This method requires that `git` is available in PATH. It will create a zip
archive of the requested commit and then build the baseline metadata from
the extracted tree.
"""
repo_path = os.path.abspath(repo_path)
if baseline_id is None:
ts = time.strftime("%Y%m%dT%H%M%S")
if profile_name:
safe_profile = profile_name.replace(" ", "_")
baseline_id = f"{safe_profile}__{ts}_git_{commit_ref}"
else:
baseline_id = f"{ts}_git_{commit_ref}"
dest = self._baseline_dir(baseline_id)
if os.path.exists(dest):
raise FileExistsError(dest)
os.makedirs(dest, exist_ok=False)
# create a temporary zip with git archive
zip_tmp = os.path.join(dest, "export.zip")
try:
subprocess.run(
["git", "archive", "--format=zip", "-o", zip_tmp, commit_ref],
cwd=repo_path,
check=True,
)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"git archive failed: {e}")
# extract zip to a temp dir and build metadata similarly to dir baseline
extract_dir = os.path.join(dest, "extracted")
os.makedirs(extract_dir, exist_ok=True)
shutil.unpack_archive(zip_tmp, extract_dir)
# reuse create_baseline_from_dir logic but avoid creating nested baseline dir
files_meta: List[FileMeta] = []
ignore_patterns = ignore_patterns or []
# If caller provided explicit file list (relative to extract_dir or absolute), use it
try:
from .scanner import find_source_files
except Exception:
find_source_files = None
if file_list is not None:
for f in file_list:
try:
p = Path(f)
if not p.is_absolute():
p = Path(extract_dir) / p
if not p.is_file():
continue
rel_unix = os.path.relpath(str(p), extract_dir).replace("\\", "/")
st = p.stat()
sha1 = None
if compute_sha1 and st.st_size > 0:
try:
sha1 = _sha1_of_file(p)
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1
)
)
except Exception:
continue
else:
# Prefer scanner if available
if find_source_files:
try:
src_files = find_source_files(
Path(extract_dir), ignore_patterns=ignore_patterns
)
except Exception:
src_files = []
for p in src_files:
try:
rel_unix = os.path.relpath(str(p), extract_dir).replace(
"\\", "/"
)
st = p.stat()
sha1 = None
if compute_sha1 and st.st_size > 0:
try:
sha1 = _sha1_of_file(p)
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel_unix,
size=st.st_size,
mtime=st.st_mtime,
sha1=sha1,
)
)
except Exception:
continue
else:
for root, dirs, files in os.walk(extract_dir):
for fn in files:
fpath = os.path.join(root, fn)
rel = os.path.relpath(fpath, extract_dir)
rel_unix = rel.replace("\\", "/")
# apply ignore patterns
ignored = False
for pat in ignore_patterns:
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(
fn, pat
):
ignored = True
break
if ignored:
continue
try:
st = os.stat(fpath)
except OSError:
continue
sha1 = None
if compute_sha1 and st.st_size > 0:
try:
sha1 = _sha1_of_file(Path(fpath))
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel_unix,
size=st.st_size,
mtime=st.st_mtime,
sha1=sha1,
)
)
# attempt to run per-file analyzers on the extracted tree and attach results
try:
from ..core.countings_impl import (
analyze_file_counts as _analyze_file_counts,
)
from ..core.metrics import analyze_file_metrics as _analyze_metrics
for fm in files_meta:
abs_path = os.path.join(extract_dir, fm.path)
try:
c = _analyze_file_counts(Path(abs_path))
fm.countings = {
"physical_lines": int(c.get("physical_lines", 0)),
"code_lines": int(c.get("code_lines", 0)),
"comment_lines": int(c.get("comment_lines", 0)),
"blank_lines": int(c.get("blank_lines", 0)),
# UCC extended metrics
"comment_whole": int(c.get("comment_whole", 0)),
"comment_embedded": int(c.get("comment_embedded", 0)),
"compiler_directives": int(c.get("compiler_directives", 0)),
"data_declarations": int(c.get("data_declarations", 0)),
"exec_instructions": int(c.get("exec_instructions", 0)),
"logical_sloc": int(c.get("logical_sloc", 0)),
"physical_sloc": int(c.get("physical_sloc", 0)),
}
except Exception:
fm.countings = None
try:
m = _analyze_metrics(abs_path)
fm.metrics = {
"avg_cc": float(m.get("avg_cc", 0.0)),
"max_cc": int(m.get("max_cc", 0)),
"func_count": int(m.get("func_count", 0)),
"mi": float(m.get("mi", 0.0)),
}
except Exception:
fm.metrics = None
except Exception:
pass
metadata = BaselineMetadata(
baseline_id=baseline_id,
created_at=time.time(),
source="git",
origin=commit_ref,
project_root=repo_path,
files=files_meta,
profile=profile_name,
)
meta_path = self.get_metadata_path(baseline_id)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(self._metadata_to_dict(metadata), f, indent=2)
# Optionally keep the extracted tree (snapshot)
if snapshot:
# move extracted content into dest/files
snapshot_dir = os.path.join(dest, "files")
shutil.move(extract_dir, snapshot_dir)
# Optionally create zip archive from the files directory
try:
from ..config import settings as app_settings
if app_settings.get_zip_baselines():
# Keep both files/ and create files.zip
zip_archive = os.path.join(dest, "files.zip")
shutil.make_archive(
base_name=zip_archive[:-4], format="zip", root_dir=snapshot_dir
)
except Exception:
pass
# Always remove the git export zip (it was just temporary)
try:
os.remove(zip_tmp)
except Exception:
pass
# Generate UCC-style reports inside baseline folder for git-created baseline
try:
from ..utils.ucc_report_generator import UCCReportGenerator
from ..config import profiles as profiles_cfg
from ..config.languages import LANGUAGE_EXTENSIONS
from ..config import settings as app_settings
from ..core import duplicates as dupmod
counting_results = []
metrics_results = []
for fm in files_meta:
abs_snapshot = os.path.join(snapshot_dir, fm.path)
if fm.countings:
row = dict(fm.countings)
row["file"] = abs_snapshot
row["language"] = ""
counting_results.append(row)
if fm.metrics:
mrow = dict(fm.metrics)
mrow["file"] = abs_snapshot
metrics_results.append(mrow)
allowed_exts = None
if profile_name:
pr = profiles_cfg.find_profile(profile_name)
if pr:
exts = []
for ln in pr.get("languages", []) or []:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
if exts:
allowed_exts = list(set(exts))
dup_settings = app_settings.get_duplicates_settings() or {}
thr = dup_settings.get("threshold", 5.0)
k = dup_settings.get("k", 25)
window = dup_settings.get("window", 4)
try:
# compute snapshot file list via scanner when available and pass it
try:
from .scanner import find_source_files as _find_src
snap_files = _find_src(
Path(snapshot_dir), allowed_extensions=allowed_exts
)
snap_list = [str(p) for p in snap_files]
except Exception:
snap_list = None
dup_res = dupmod.find_duplicates_in_dir(
root=snapshot_dir,
extensions=allowed_exts,
dup_threshold=thr,
k=k,
window=window,
file_list=snap_list,
)
dup_rows = []
for a, b in dup_res.get("exact", []):
dup_rows.append(
{
"file_a": a,
"file_b": b,
"match_type": "exact",
"pct_change": 0,
}
)
for a, b in dup_res.get("fuzzy", []):
dup_rows.append(
{
"file_a": a,
"file_b": b,
"match_type": "fuzzy",
"pct_change": f"<={thr}%",
}
)
except Exception:
dup_rows = []
try:
count_path = os.path.join(dest, "countings_report.txt")
UCCReportGenerator.generate_counting_report(
results=counting_results,
output_path=Path(count_path),
command_description=f"PyUcc Counting Analysis - Baseline: {baseline_id}",
base_path=repo_path,
)
except Exception:
pass
try:
metrics_path = os.path.join(dest, "metrics_report.txt")
UCCReportGenerator.generate_metrics_report(
results=metrics_results,
output_path=Path(metrics_path),
command_description=f"PyUcc Metrics Analysis - Baseline: {baseline_id}",
base_path=repo_path,
)
except Exception:
pass
try:
dup_path = os.path.join(dest, "duplicates_report.txt")
UCCReportGenerator.generate_duplicates_report(
duplicates=dup_rows,
output_path=Path(dup_path),
command_description=f"PyUcc Duplicate Analysis - Baseline: {baseline_id}",
base_path=repo_path,
params={
"threshold": thr,
"extensions": allowed_exts,
"k": k,
"window": window,
},
)
except Exception:
pass
except Exception:
pass
else:
# remove extracted files and zip
shutil.rmtree(extract_dir, ignore_errors=True)
try:
os.remove(zip_tmp)
except Exception:
pass
# prune old baselines if requested
if max_keep > 0:
self._prune_old_baselines(repo_path, profile_name, max_keep)
return baseline_id
def load_metadata(self, baseline_id: str) -> BaselineMetadata:
meta_path = self.get_metadata_path(baseline_id)
with open(meta_path, "r", encoding="utf-8") as f:
j = json.load(f)
files = [FileMeta(**fm) for fm in j["files"]]
return BaselineMetadata(
baseline_id=j["baseline_id"],
created_at=j["created_at"],
source=j.get("source", "local"),
origin=j.get("origin"),
project_root=j.get("project_root", ""),
files=files,
profile=j.get("profile"),
)
def _metadata_to_dict(self, meta: BaselineMetadata) -> Dict:
d = asdict(meta)
# dataclass conversion
d["files"] = [asdict(fm) for fm in meta.files]
return d
class Differ:
def __init__(
self,
baseline: BaselineMetadata,
current_dir: str,
max_workers: int = 4,
ignore_patterns: Optional[List[str]] = None,
baseline_files_dir: Optional[str] = None,
current_file_list: Optional[List[FileMeta]] = None,
):
self.baseline = baseline
self.current_dir = os.path.abspath(current_dir)
self.max_workers = max_workers
# Normalize ignore patterns so entries like '.bak' become '*.bak'
try:
from .scanner import normalize_ignore_patterns
self.ignore_patterns = normalize_ignore_patterns(ignore_patterns) or []
except Exception:
self.ignore_patterns = ignore_patterns or []
# If caller passed a precomputed current file list, use it (avoids rescanning)
self._current_files_cache: Optional[List[FileMeta]] = current_file_list
# baseline_files_dir is the directory containing the baseline snapshot files
# If not provided, falls back to baseline.project_root (for backwards compatibility)
self.baseline_files_dir = (
baseline_files_dir if baseline_files_dir else baseline.project_root
)
def build_current_file_list(self) -> List[FileMeta]:
# Return cached result if already computed
if self._current_files_cache is not None:
return self._current_files_cache
files_meta: List[FileMeta] = []
# Prefer to use scanner.find_source_files so scanning rules are centralized
try:
from .scanner import find_source_files
except Exception:
find_source_files = None
# Derive allowed extensions from baseline profile if available
allowed_exts = None
try:
from ..config import profiles as profiles_cfg
from ..config.languages import LANGUAGE_EXTENSIONS
except Exception:
profiles_cfg = None
LANGUAGE_EXTENSIONS = {}
if self.baseline and self.baseline.profile and profiles_cfg:
pr = profiles_cfg.find_profile(self.baseline.profile)
if pr:
exts = []
for ln in pr.get("languages", []) or []:
if ln in LANGUAGE_EXTENSIONS:
exts.extend(LANGUAGE_EXTENSIONS[ln])
if exts:
allowed_exts = list(set(exts))
if find_source_files:
try:
src_files = find_source_files(
Path(self.current_dir),
allowed_extensions=allowed_exts,
ignore_patterns=self.ignore_patterns,
)
except Exception:
src_files = []
for p in src_files:
try:
rel_unix = os.path.relpath(str(p), self.current_dir).replace(
"\\", "/"
)
st = p.stat()
sha1 = None
try:
sha1 = _sha1_of_file(p)
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1
)
)
except Exception:
continue
else:
for root, dirs, files in os.walk(self.current_dir):
for fn in files:
fpath = os.path.join(root, fn)
rel = os.path.relpath(fpath, self.current_dir)
# apply ignore patterns from profile: test against relative path (unix-style) and filename
rel_unix = rel.replace("\\", "/")
ignored = False
for pat in self.ignore_patterns or []:
# patterns are normalized and lower-cased by the scanner helper
if fnmatch.fnmatch(rel_unix.lower(), pat) or fnmatch.fnmatch(
fn.lower(), pat
):
ignored = True
break
if ignored:
continue
try:
st = os.stat(fpath)
except OSError:
continue
sha1 = None
# Compute SHA1 for all files, including 0-byte files
try:
sha1 = _sha1_of_file(Path(fpath))
except Exception:
sha1 = None
files_meta.append(
FileMeta(
path=rel.replace("\\", "/"),
size=st.st_size,
mtime=st.st_mtime,
sha1=sha1,
)
)
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
# This ensures current files have the same data as baseline files for comparison
try:
from ..core.countings_impl import (
analyze_file_counts as _analyze_file_counts,
)
from ..core.metrics import analyze_file_metrics as _analyze_metrics
for fm in files_meta:
abs_path = os.path.join(self.current_dir, fm.path)
# per-file counts
try:
c = _analyze_file_counts(Path(abs_path))
fm.countings = {
"physical_lines": int(c.get("physical_lines", 0)),
"code_lines": int(c.get("code_lines", 0)),
"comment_lines": int(c.get("comment_lines", 0)),
"blank_lines": int(c.get("blank_lines", 0)),
# UCC extended metrics
"comment_whole": int(c.get("comment_whole", 0)),
"comment_embedded": int(c.get("comment_embedded", 0)),
"compiler_directives": int(c.get("compiler_directives", 0)),
"data_declarations": int(c.get("data_declarations", 0)),
"exec_instructions": int(c.get("exec_instructions", 0)),
"logical_sloc": int(c.get("logical_sloc", 0)),
"physical_sloc": int(c.get("physical_sloc", 0)),
}
except Exception:
fm.countings = None
# per-file metrics
try:
m = _analyze_metrics(abs_path)
fm.metrics = {
"avg_cc": float(m.get("avg_cc", 0.0)),
"max_cc": int(m.get("max_cc", 0)),
"func_count": int(m.get("func_count", 0)),
"mi": float(m.get("mi", 0.0)),
}
except Exception:
fm.metrics = None
except Exception:
pass
# Cache the result to avoid recomputation
self._current_files_cache = files_meta
return files_meta
@staticmethod
def _index_by_name(files: List[FileMeta]) -> Dict[str, List[FileMeta]]:
idx: Dict[str, List[FileMeta]] = {}
for f in files:
name = os.path.basename(f.path)
idx.setdefault(name, []).append(f)
return idx
@staticmethod
def _levenshtein(a: str, b: str) -> int:
# simple DP implementation
la, lb = len(a), len(b)
if la == 0:
return lb
if lb == 0:
return la
prev = list(range(lb + 1))
for i, ca in enumerate(a, start=1):
cur = [i] + [0] * lb
for j, cb in enumerate(b, start=1):
add = prev[j] + 1
delete = cur[j - 1] + 1
change = prev[j - 1] + (0 if ca == cb else 1)
cur[j] = min(add, delete, change)
prev = cur
return prev[lb]
def match_files(
self, baseline_files: List[FileMeta], current_files: List[FileMeta]
) -> List[Tuple[Optional[FileMeta], Optional[FileMeta]]]:
# Implement Gale-Shapley stable matching inspired by UCC logic.
# Build maps by filename only (candidates must share the same filename)
mapA_by_name = self._index_by_name(baseline_files)
mapB_by_name = self._index_by_name(current_files)
# Build preference lists (for A: list of B candidates sorted by path distance)
prefsA: Dict[str, List[FileMeta]] = {} # key: a.path
prefsB: Dict[str, List[FileMeta]] = {}
# helper: compute preference value between two file paths (parent dirs)
def pref_val(pa: str, pb: str) -> int:
parent_a = os.path.dirname(pa)
parent_b = os.path.dirname(pb)
return self._levenshtein(parent_a, parent_b)
# populate preferences A -> Bs
for a in baseline_files:
candidates = mapB_by_name.get(os.path.basename(a.path), [])
# compute scores and sort
scored = [(pref_val(a.path, b.path), b) for b in candidates]
scored.sort(key=lambda x: x[0])
prefsA[a.path] = [b for (_s, b) in scored]
# populate preferences B -> As
for b in current_files:
candidates = mapA_by_name.get(os.path.basename(b.path), [])
scored = [(pref_val(a.path, b.path), a) for a in candidates]
scored.sort(key=lambda x: x[0])
prefsB[b.path] = [a for (_s, a) in scored]
# Prepare Gale-Shapley structures
freeA = [a for a in baseline_files]
next_proposal_index: Dict[str, int] = {a.path: 0 for a in baseline_files}
matchA: Dict[str, Optional[FileMeta]] = {a.path: None for a in baseline_files}
matchB: Dict[str, Optional[FileMeta]] = {b.path: None for b in current_files}
# For quick comparison, build rank maps for B preferences
rankB: Dict[str, Dict[str, int]] = {}
for b in current_files:
rank = {}
plist = prefsB.get(b.path, [])
for idx, a in enumerate(plist):
rank[a.path] = idx
rankB[b.path] = rank
while freeA:
a = freeA.pop(0)
a_key = a.path
plist = prefsA.get(a_key, [])
if not plist:
# no candidates
matchA[a_key] = None
continue
# propose to next candidate
i = next_proposal_index[a_key]
if i >= len(plist):
matchA[a_key] = None
continue
b = plist[i]
next_proposal_index[a_key] = i + 1
b_key = b.path
current = matchB.get(b_key)
if current is None:
# b accepts
matchA[a_key] = b
matchB[b_key] = a
else:
# b decides preference between current and proposer
rank_map = rankB.get(b_key, {})
r_current = rank_map.get(current.path, float("inf"))
r_proposer = rank_map.get(a_key, float("inf"))
if r_proposer < r_current:
# b prefers new proposer
matchA[a_key] = b
matchB[b_key] = a
# previous current becomes free again
matchA[current.path] = None
freeA.append(current)
else:
# b rejects proposer -> proposer remains free (if more prefs)
freeA.append(a)
# Build results list: pairs for matched A entries
results: List[Tuple[Optional[FileMeta], Optional[FileMeta]]] = []
usedB = set()
for a in baseline_files:
b = matchA.get(a.path)
if b is None:
results.append((a, None))
else:
results.append((a, b))
usedB.add(b.path)
# Any B not matched are added as (None, b)
for b in current_files:
if b.path not in usedB:
results.append((None, b))
return results
@staticmethod
def _diff_file_pair(fileA_path: Optional[str], fileB_path: Optional[str]) -> Dict:
res = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
if fileA_path is None and fileB_path is None:
return res
if fileA_path is None:
# all lines are added
try:
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
res["added"] = len(lines)
except Exception:
res["added"] = 0
return res
if fileB_path is None:
try:
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
res["deleted"] = len(lines)
except Exception:
res["deleted"] = 0
return res
# both exist; line-based diff
try:
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as fa:
a_lines = fa.readlines()
except Exception:
a_lines = []
try:
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as fb:
b_lines = fb.readlines()
except Exception:
b_lines = []
sm = difflib.SequenceMatcher(a=a_lines, b=b_lines)
# DEBUG: Log if files are identical but difflib finds differences
has_differences = False
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag != "equal":
has_differences = True
break
if has_differences and len(a_lines) == len(b_lines):
# Files have same line count but difflib sees differences
print(f"[DIFFER] ANOMALY DETECTED:")
print(f" FileA: {fileA_path}")
print(f" FileB: {fileB_path}")
print(f" Lines: {len(a_lines)} vs {len(b_lines)}")
# Check first differing line
for i, (line_a, line_b) in enumerate(zip(a_lines, b_lines)):
if line_a != line_b:
print(f" First diff at line {i+1}:")
print(f" A: {repr(line_a[:80])}")
print(f" B: {repr(line_b[:80])}")
break
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag == "equal":
res["unmodified"] += i2 - i1
elif tag == "delete":
res["deleted"] += i2 - i1
elif tag == "insert":
res["added"] += j2 - j1
elif tag == "replace":
la = i2 - i1
lb = j2 - j1
res["modified"] += min(la, lb)
if la > lb:
res["deleted"] += la - lb
elif lb > la:
res["added"] += lb - la
return res
def diff(self) -> Dict:
baseline_files = self.baseline.files
current_files = self.build_current_file_list()
pairs = self.match_files(baseline_files, current_files)
total = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
matched_results = []
# helper to construct absolute paths
def abs_path_for(meta: FileMeta) -> str:
return (
os.path.join(self.current_dir, meta.path) if meta is not None else None
)
# process pairs possibly in parallel
tasks = []
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
futures = []
for a, b in pairs:
fa = (
os.path.join(self.baseline_files_dir, a.path)
if a is not None
else None
)
fb = os.path.join(self.current_dir, b.path) if b is not None else None
futures.append(ex.submit(self._diff_file_pair, fa, fb))
for (a, b), fut in zip(pairs, futures):
res = fut.result()
total["added"] += res["added"]
total["deleted"] += res["deleted"]
total["modified"] += res["modified"]
total["unmodified"] += res["unmodified"]
# Extract countings and metrics from baseline and current files
baseline_countings = (
a.countings if (a is not None and hasattr(a, "countings")) else None
)
baseline_metrics = (
a.metrics if (a is not None and hasattr(a, "metrics")) else None
)
current_countings = (
b.countings if (b is not None and hasattr(b, "countings")) else None
)
current_metrics = (
b.metrics if (b is not None and hasattr(b, "metrics")) else None
)
# Compute deltas for countings
countings_delta = None
if baseline_countings and current_countings:
countings_delta = {
"physical_lines": current_countings.get("physical_lines", 0)
- baseline_countings.get("physical_lines", 0),
"code_lines": current_countings.get("code_lines", 0)
- baseline_countings.get("code_lines", 0),
"comment_lines": current_countings.get("comment_lines", 0)
- baseline_countings.get("comment_lines", 0),
"blank_lines": current_countings.get("blank_lines", 0)
- baseline_countings.get("blank_lines", 0),
# UCC extended deltas
"comment_whole": current_countings.get("comment_whole", 0)
- baseline_countings.get("comment_whole", 0),
"comment_embedded": current_countings.get("comment_embedded", 0)
- baseline_countings.get("comment_embedded", 0),
"compiler_directives": current_countings.get(
"compiler_directives", 0
)
- baseline_countings.get("compiler_directives", 0),
"data_declarations": current_countings.get(
"data_declarations", 0
)
- baseline_countings.get("data_declarations", 0),
"exec_instructions": current_countings.get(
"exec_instructions", 0
)
- baseline_countings.get("exec_instructions", 0),
"logical_sloc": current_countings.get("logical_sloc", 0)
- baseline_countings.get("logical_sloc", 0),
"physical_sloc": current_countings.get("physical_sloc", 0)
- baseline_countings.get("physical_sloc", 0),
}
# DEBUG LOGGING: Show comparison details when there's a delta
if any(v != 0 for v in countings_delta.values()):
fileA_path = a.path if a else "None"
fileB_path = b.path if b else "None"
print(
f"[DIFFER] DELTA DETECTED for {fileA_path} vs {fileB_path}"
)
print(f" Baseline: {baseline_countings}")
print(f" Current: {current_countings}")
print(f" Delta: {countings_delta}")
# Compute deltas for metrics
metrics_delta = None
if baseline_metrics and current_metrics:
metrics_delta = {
"func_count": current_metrics.get("func_count", 0)
- baseline_metrics.get("func_count", 0),
"avg_cc": current_metrics.get("avg_cc", 0.0)
- baseline_metrics.get("avg_cc", 0.0),
"max_cc": current_metrics.get("max_cc", 0)
- baseline_metrics.get("max_cc", 0),
"mi": current_metrics.get("mi", 0.0)
- baseline_metrics.get("mi", 0.0),
}
matched_results.append(
{
"fileA": a.path if a is not None else None,
"fileB": b.path if b is not None else None,
"counts": res,
"baseline_countings": baseline_countings,
"current_countings": current_countings,
"countings_delta": countings_delta,
"baseline_metrics": baseline_metrics,
"current_metrics": current_metrics,
"metrics_delta": metrics_delta,
}
)
result = {
"baseline_id": self.baseline.baseline_id,
"compared_at": time.time(),
"total": total,
"pairs": matched_results,
}
# Compute summary statistics from baseline and current file metadata
try:
# Calculate baseline summary from baseline files (which have embedded countings/metrics)
baseline_counts = {
"physical_lines": 0,
"code_lines": 0,
"comment_lines": 0,
"blank_lines": 0,
"file_count": 0,
}
baseline_metrics = {
"file_count": 0,
"total_func_count": 0,
"avg_avg_cc": 0.0,
"avg_mi": 0.0,
}
baseline_metrics_count = 0
for fm in baseline_files:
if fm.countings:
baseline_counts["physical_lines"] += fm.countings.get(
"physical_lines", 0
)
baseline_counts["code_lines"] += fm.countings.get("code_lines", 0)
baseline_counts["comment_lines"] += fm.countings.get(
"comment_lines", 0
)
baseline_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
baseline_counts["file_count"] += 1
if fm.metrics:
baseline_metrics["total_func_count"] += fm.metrics.get(
"func_count", 0
)
baseline_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
baseline_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
baseline_metrics_count += 1
if baseline_metrics_count > 0:
baseline_metrics["avg_avg_cc"] /= baseline_metrics_count
baseline_metrics["avg_mi"] /= baseline_metrics_count
baseline_metrics["file_count"] = baseline_metrics_count
# Calculate current summary from current files (which have embedded countings/metrics)
current_counts = {
"physical_lines": 0,
"code_lines": 0,
"comment_lines": 0,
"blank_lines": 0,
"file_count": 0,
}
current_metrics = {
"file_count": 0,
"total_func_count": 0,
"avg_avg_cc": 0.0,
"avg_mi": 0.0,
}
current_metrics_count = 0
for fm in current_files:
if fm.countings:
current_counts["physical_lines"] += fm.countings.get(
"physical_lines", 0
)
current_counts["code_lines"] += fm.countings.get("code_lines", 0)
current_counts["comment_lines"] += fm.countings.get(
"comment_lines", 0
)
current_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
current_counts["file_count"] += 1
if fm.metrics:
current_metrics["total_func_count"] += fm.metrics.get(
"func_count", 0
)
current_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
current_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
current_metrics_count += 1
if current_metrics_count > 0:
current_metrics["avg_avg_cc"] /= current_metrics_count
current_metrics["avg_mi"] /= current_metrics_count
current_metrics["file_count"] = current_metrics_count
# Compute deltas
delta_counts = {
"physical_lines": current_counts["physical_lines"]
- baseline_counts["physical_lines"],
"code_lines": current_counts["code_lines"]
- baseline_counts["code_lines"],
"comment_lines": current_counts["comment_lines"]
- baseline_counts["comment_lines"],
"blank_lines": current_counts["blank_lines"]
- baseline_counts["blank_lines"],
"file_count": current_counts["file_count"]
- baseline_counts["file_count"],
}
delta_metrics = {
"total_func_count": current_metrics["total_func_count"]
- baseline_metrics["total_func_count"],
"avg_avg_cc": current_metrics["avg_avg_cc"]
- baseline_metrics["avg_avg_cc"],
"avg_mi": current_metrics["avg_mi"] - baseline_metrics["avg_mi"],
}
result["summary"] = {
"baseline": {"countings": baseline_counts, "metrics": baseline_metrics},
"current": {"countings": current_counts, "metrics": current_metrics},
"delta": {"countings": delta_counts, "metrics": delta_metrics},
}
except Exception:
pass
return result