SXXXXXXX_PyUCC/pyucc/core/differ.py

847 lines
37 KiB
Python

"""
Baseline manager and differ prototype.
- create baseline from directory (snapshot by default)
- load baseline metadata
- diff baseline vs current directory
- output results as dict / JSON-serializable
This is a minimal, self-contained implementation inspired by UCC's DiffTool.
"""
from __future__ import annotations
import hashlib
import json
import os
import shutil
import stat
import subprocess
import tempfile
import time
import fnmatch
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import difflib
BASELINE_ROOT_DIRNAME = ".pyucc_baselines"
def _sha1_of_file(path: Path, chunk_size: int = 8192) -> str:
h = hashlib.sha1()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
h.update(chunk)
return h.hexdigest()
@dataclass
class FileMeta:
path: str # relative path
size: int
mtime: float
sha1: Optional[str] = None
countings: Optional[Dict] = None
metrics: Optional[Dict] = None
@dataclass
class BaselineMetadata:
baseline_id: str
created_at: float
source: str # 'local' or 'git'
origin: Optional[str]
project_root: str
files: List[FileMeta]
profile: Optional[str] = None
class BaselineManager:
def __init__(self, workspace_root: str, baselines_root: Optional[str] = None):
"""Manage baselines storage.
Args:
workspace_root: path to the project/workspace (kept for metadata usage).
baselines_root: optional absolute or relative path where baselines are stored.
If omitted, the environment variable `PYUCC_BASELINE_DIR` is consulted;
if that's not set, defaults to `./baseline` in the current working dir.
"""
self.workspace_root = os.path.abspath(workspace_root)
if baselines_root:
self.baselines_root = os.path.abspath(baselines_root)
else:
# priority: env var, app settings, fallback to ./baseline
env = os.getenv("PYUCC_BASELINE_DIR")
if env:
self.baselines_root = os.path.abspath(env)
else:
# try app settings if available
try:
from ..config import settings as app_settings
sdir = app_settings.get_baseline_dir()
except Exception:
sdir = None
if sdir:
self.baselines_root = os.path.abspath(sdir)
else:
self.baselines_root = os.path.join(os.getcwd(), "baseline")
os.makedirs(self.baselines_root, exist_ok=True)
def _baseline_dir(self, baseline_id: str) -> str:
return os.path.join(self.baselines_root, baseline_id)
def get_baseline_files_dir(self, baseline_id: str) -> str:
"""Get the directory containing the baseline snapshot files."""
return os.path.join(self._baseline_dir(baseline_id), "files")
def list_baselines(self) -> List[str]:
return [d for d in os.listdir(self.baselines_root) if os.path.isdir(os.path.join(self.baselines_root, d))]
def get_metadata_path(self, baseline_id: str) -> str:
return os.path.join(self._baseline_dir(baseline_id), "metadata.json")
def create_baseline_from_dir(self, dir_path: str, baseline_id: Optional[str] = None, snapshot: bool = True, compute_sha1: bool = True, ignore_patterns: Optional[List[str]] = None, profile_name: Optional[str] = None, max_keep: int = 5) -> str:
dir_path = os.path.abspath(dir_path)
if baseline_id is None:
ts = time.strftime("%Y%m%dT%H%M%S")
# include profile name in baseline id when available
if profile_name:
safe_profile = profile_name.replace(" ", "_")
baseline_id = f"{safe_profile}__{ts}_local"
else:
baseline_id = f"{ts}_local"
dest = self._baseline_dir(baseline_id)
if os.path.exists(dest):
raise FileExistsError(dest)
os.makedirs(dest, exist_ok=False)
files_meta: List[FileMeta] = []
# Walk source dir and collect metadata
ignore_patterns = ignore_patterns or []
for root, dirs, files in os.walk(dir_path):
for fn in files:
fpath = os.path.join(root, fn)
# skip baseline storage area if under workspace_root
if os.path.commonpath([self.baselines_root, fpath]) == self.baselines_root:
continue
rel = os.path.relpath(fpath, dir_path)
# check ignore patterns against relative path (unix-style)
rel_unix = rel.replace("\\", "/")
ignored = False
for pat in ignore_patterns:
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(fn, pat):
ignored = True
break
if ignored:
continue
try:
st = os.stat(fpath)
except OSError:
continue
sha1 = None
if compute_sha1: # also compute for 0-byte files
try:
sha1 = _sha1_of_file(Path(fpath))
except Exception:
sha1 = None
files_meta.append(FileMeta(path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1))
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
try:
from ..core.countings_impl import analyze_file_counts as _analyze_file_counts
from ..core.metrics import analyze_file_metrics as _analyze_metrics
for fm in files_meta:
abs_path = os.path.join(dir_path, fm.path)
# per-file counts
try:
c = _analyze_file_counts(Path(abs_path))
fm.countings = {
"physical_lines": int(c.get("physical_lines", 0)),
"code_lines": int(c.get("code_lines", 0)),
"comment_lines": int(c.get("comment_lines", 0)),
"blank_lines": int(c.get("blank_lines", 0)),
}
except Exception:
fm.countings = None
# per-file metrics
try:
m = _analyze_metrics(abs_path)
fm.metrics = {
"avg_cc": float(m.get("avg_cc", 0.0)),
"max_cc": int(m.get("max_cc", 0)),
"func_count": int(m.get("func_count", 0)),
"mi": float(m.get("mi", 0.0)),
}
except Exception:
fm.metrics = None
except Exception:
pass
metadata = BaselineMetadata(
baseline_id=baseline_id,
created_at=time.time(),
source="local",
origin=None,
project_root=dir_path,
files=files_meta,
profile=profile_name,
) # Save metadata
meta_path = self.get_metadata_path(baseline_id)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(self._metadata_to_dict(metadata), f, indent=2)
# Optionally store a snapshot
if snapshot:
snapshot_dir = os.path.join(dest, "files")
os.makedirs(snapshot_dir, exist_ok=True)
# Copy only the files that were included in the baseline (respecting ignore patterns)
for fm in files_meta:
src_file = os.path.join(dir_path, fm.path)
dst_file = os.path.join(snapshot_dir, fm.path)
# Create parent directories if needed
dst_parent = os.path.dirname(dst_file)
if dst_parent:
os.makedirs(dst_parent, exist_ok=True)
try:
shutil.copy2(src_file, dst_file) # copy2 preserves metadata
except Exception:
pass # skip files that cannot be copied
# Optionally create zip archive (controlled by settings)
# Check if user wants zip archives (for space savings at cost of speed)
try:
from ..config import settings as app_settings
if app_settings.get_zip_baselines():
zip_path = os.path.join(dest, "files.zip")
shutil.make_archive(base_name=zip_path[:-4], format="zip", root_dir=snapshot_dir)
except Exception:
pass # if settings not available or zip fails, continue without zip
return baseline_id
def _prune_old_baselines(self, project_root: str, profile_name: Optional[str], keep: int = 5):
"""Prune older baselines for the same project and profile, keeping `keep` newest."""
# scan baselines root and load metadata for each baseline
entries = [] # list of (created_at, baseline_id, path)
for bn in os.listdir(self.baselines_root):
bdir = os.path.join(self.baselines_root, bn)
if not os.path.isdir(bdir):
continue
meta_path = os.path.join(bdir, "metadata.json")
if not os.path.exists(meta_path):
continue
try:
with open(meta_path, "r", encoding="utf-8") as f:
j = json.load(f)
except Exception:
continue
# match by project_root and profile
if j.get("project_root") != project_root:
continue
if profile_name is None:
if j.get("profile") is not None:
continue
else:
if j.get("profile") != profile_name:
continue
entries.append((j.get("created_at", 0), j.get("baseline_id", bn), bdir))
# sort by created_at descending (newest first)
entries.sort(key=lambda x: x[0], reverse=True)
# remove entries beyond keep
for _, bid, path in entries[keep:]:
try:
shutil.rmtree(path)
except Exception:
pass
def create_baseline_from_git(self, repo_path: str, commit_ref: str = "HEAD", baseline_id: Optional[str] = None, snapshot: bool = True, compute_sha1: bool = True, ignore_patterns: Optional[List[str]] = None, profile_name: Optional[str] = None, max_keep: int = 5) -> str:
"""Create a baseline by exporting a git commit (using `git archive`).
This method requires that `git` is available in PATH. It will create a zip
archive of the requested commit and then build the baseline metadata from
the extracted tree.
"""
repo_path = os.path.abspath(repo_path)
if baseline_id is None:
ts = time.strftime("%Y%m%dT%H%M%S")
if profile_name:
safe_profile = profile_name.replace(" ", "_")
baseline_id = f"{safe_profile}__{ts}_git_{commit_ref}"
else:
baseline_id = f"{ts}_git_{commit_ref}"
dest = self._baseline_dir(baseline_id)
if os.path.exists(dest):
raise FileExistsError(dest)
os.makedirs(dest, exist_ok=False)
# create a temporary zip with git archive
zip_tmp = os.path.join(dest, "export.zip")
try:
subprocess.run(["git", "archive", "--format=zip", "-o", zip_tmp, commit_ref], cwd=repo_path, check=True)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"git archive failed: {e}")
# extract zip to a temp dir and build metadata similarly to dir baseline
extract_dir = os.path.join(dest, "extracted")
os.makedirs(extract_dir, exist_ok=True)
shutil.unpack_archive(zip_tmp, extract_dir)
# reuse create_baseline_from_dir logic but avoid creating nested baseline dir
files_meta: List[FileMeta] = []
ignore_patterns = ignore_patterns or []
for root, dirs, files in os.walk(extract_dir):
for fn in files:
fpath = os.path.join(root, fn)
rel = os.path.relpath(fpath, extract_dir)
rel_unix = rel.replace("\\", "/")
# apply ignore patterns
ignored = False
for pat in ignore_patterns:
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(fn, pat):
ignored = True
break
if ignored:
continue
try:
st = os.stat(fpath)
except OSError:
continue
sha1 = None
if compute_sha1 and st.st_size > 0:
try:
sha1 = _sha1_of_file(Path(fpath))
except Exception:
sha1 = None
files_meta.append(FileMeta(path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1))
# attempt to run per-file analyzers on the extracted tree and attach results
try:
from ..core.countings_impl import analyze_file_counts as _analyze_file_counts
from ..core.metrics import analyze_file_metrics as _analyze_metrics
for fm in files_meta:
abs_path = os.path.join(extract_dir, fm.path)
try:
c = _analyze_file_counts(Path(abs_path))
fm.countings = {
"physical_lines": int(c.get("physical_lines", 0)),
"code_lines": int(c.get("code_lines", 0)),
"comment_lines": int(c.get("comment_lines", 0)),
"blank_lines": int(c.get("blank_lines", 0)),
}
except Exception:
fm.countings = None
try:
m = _analyze_metrics(abs_path)
fm.metrics = {
"avg_cc": float(m.get("avg_cc", 0.0)),
"max_cc": int(m.get("max_cc", 0)),
"func_count": int(m.get("func_count", 0)),
"mi": float(m.get("mi", 0.0)),
}
except Exception:
fm.metrics = None
except Exception:
pass
metadata = BaselineMetadata(
baseline_id=baseline_id,
created_at=time.time(),
source="git",
origin=commit_ref,
project_root=repo_path,
files=files_meta,
profile=profile_name,
)
meta_path = self.get_metadata_path(baseline_id)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(self._metadata_to_dict(metadata), f, indent=2)
# Optionally keep the extracted tree (snapshot)
if snapshot:
# move extracted content into dest/files
snapshot_dir = os.path.join(dest, "files")
shutil.move(extract_dir, snapshot_dir)
# Optionally create zip archive from the files directory
try:
from ..config import settings as app_settings
if app_settings.get_zip_baselines():
# Keep both files/ and create files.zip
zip_archive = os.path.join(dest, "files.zip")
shutil.make_archive(base_name=zip_archive[:-4], format="zip", root_dir=snapshot_dir)
except Exception:
pass
# Always remove the git export zip (it was just temporary)
try:
os.remove(zip_tmp)
except Exception:
pass
else:
# remove extracted files and zip
shutil.rmtree(extract_dir, ignore_errors=True)
try:
os.remove(zip_tmp)
except Exception:
pass
# prune old baselines if requested
if max_keep > 0:
self._prune_old_baselines(repo_path, profile_name, max_keep)
return baseline_id
def load_metadata(self, baseline_id: str) -> BaselineMetadata:
meta_path = self.get_metadata_path(baseline_id)
with open(meta_path, "r", encoding="utf-8") as f:
j = json.load(f)
files = [FileMeta(**fm) for fm in j["files"]]
return BaselineMetadata(
baseline_id=j["baseline_id"],
created_at=j["created_at"],
source=j.get("source", "local"),
origin=j.get("origin"),
project_root=j.get("project_root", ""),
files=files,
profile=j.get("profile"),
)
def _metadata_to_dict(self, meta: BaselineMetadata) -> Dict:
d = asdict(meta)
# dataclass conversion
d["files"] = [asdict(fm) for fm in meta.files]
return d
class Differ:
def __init__(self, baseline: BaselineMetadata, current_dir: str, max_workers: int = 4, ignore_patterns: Optional[List[str]] = None, baseline_files_dir: Optional[str] = None):
self.baseline = baseline
self.current_dir = os.path.abspath(current_dir)
self.max_workers = max_workers
self.ignore_patterns = ignore_patterns or []
self._current_files_cache: Optional[List[FileMeta]] = None
# baseline_files_dir is the directory containing the baseline snapshot files
# If not provided, falls back to baseline.project_root (for backwards compatibility)
self.baseline_files_dir = baseline_files_dir if baseline_files_dir else baseline.project_root
def build_current_file_list(self) -> List[FileMeta]:
# Return cached result if already computed
if self._current_files_cache is not None:
return self._current_files_cache
files_meta: List[FileMeta] = []
for root, dirs, files in os.walk(self.current_dir):
for fn in files:
fpath = os.path.join(root, fn)
rel = os.path.relpath(fpath, self.current_dir)
# apply ignore patterns from profile: test against relative path (unix-style) and filename
rel_unix = rel.replace("\\", "/")
ignored = False
for pat in (self.ignore_patterns or []):
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(fn, pat):
ignored = True
break
if ignored:
continue
try:
st = os.stat(fpath)
except OSError:
continue
sha1 = None
# Compute SHA1 for all files, including 0-byte files
try:
sha1 = _sha1_of_file(Path(fpath))
except Exception:
sha1 = None
files_meta.append(FileMeta(path=rel.replace("\\", "/"), size=st.st_size, mtime=st.st_mtime, sha1=sha1))
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
# This ensures current files have the same data as baseline files for comparison
try:
from ..core.countings_impl import analyze_file_counts as _analyze_file_counts
from ..core.metrics import analyze_file_metrics as _analyze_metrics
for fm in files_meta:
abs_path = os.path.join(self.current_dir, fm.path)
# per-file counts
try:
c = _analyze_file_counts(Path(abs_path))
fm.countings = {
"physical_lines": int(c.get("physical_lines", 0)),
"code_lines": int(c.get("code_lines", 0)),
"comment_lines": int(c.get("comment_lines", 0)),
"blank_lines": int(c.get("blank_lines", 0)),
}
except Exception:
fm.countings = None
# per-file metrics
try:
m = _analyze_metrics(abs_path)
fm.metrics = {
"avg_cc": float(m.get("avg_cc", 0.0)),
"max_cc": int(m.get("max_cc", 0)),
"func_count": int(m.get("func_count", 0)),
"mi": float(m.get("mi", 0.0)),
}
except Exception:
fm.metrics = None
except Exception:
pass
# Cache the result to avoid recomputation
self._current_files_cache = files_meta
return files_meta
@staticmethod
def _index_by_name(files: List[FileMeta]) -> Dict[str, List[FileMeta]]:
idx: Dict[str, List[FileMeta]] = {}
for f in files:
name = os.path.basename(f.path)
idx.setdefault(name, []).append(f)
return idx
@staticmethod
def _levenshtein(a: str, b: str) -> int:
# simple DP implementation
la, lb = len(a), len(b)
if la == 0:
return lb
if lb == 0:
return la
prev = list(range(lb + 1))
for i, ca in enumerate(a, start=1):
cur = [i] + [0] * lb
for j, cb in enumerate(b, start=1):
add = prev[j] + 1
delete = cur[j - 1] + 1
change = prev[j - 1] + (0 if ca == cb else 1)
cur[j] = min(add, delete, change)
prev = cur
return prev[lb]
def match_files(self, baseline_files: List[FileMeta], current_files: List[FileMeta]) -> List[Tuple[Optional[FileMeta], Optional[FileMeta]]]:
# Implement Gale-Shapley stable matching inspired by UCC logic.
# Build maps by filename only (candidates must share the same filename)
mapA_by_name = self._index_by_name(baseline_files)
mapB_by_name = self._index_by_name(current_files)
# Build preference lists (for A: list of B candidates sorted by path distance)
prefsA: Dict[str, List[FileMeta]] = {} # key: a.path
prefsB: Dict[str, List[FileMeta]] = {}
# helper: compute preference value between two file paths (parent dirs)
def pref_val(pa: str, pb: str) -> int:
parent_a = os.path.dirname(pa)
parent_b = os.path.dirname(pb)
return self._levenshtein(parent_a, parent_b)
# populate preferences A -> Bs
for a in baseline_files:
candidates = mapB_by_name.get(os.path.basename(a.path), [])
# compute scores and sort
scored = [(pref_val(a.path, b.path), b) for b in candidates]
scored.sort(key=lambda x: x[0])
prefsA[a.path] = [b for (_s, b) in scored]
# populate preferences B -> As
for b in current_files:
candidates = mapA_by_name.get(os.path.basename(b.path), [])
scored = [(pref_val(a.path, b.path), a) for a in candidates]
scored.sort(key=lambda x: x[0])
prefsB[b.path] = [a for (_s, a) in scored]
# Prepare Gale-Shapley structures
freeA = [a for a in baseline_files]
next_proposal_index: Dict[str, int] = {a.path: 0 for a in baseline_files}
matchA: Dict[str, Optional[FileMeta]] = {a.path: None for a in baseline_files}
matchB: Dict[str, Optional[FileMeta]] = {b.path: None for b in current_files}
# For quick comparison, build rank maps for B preferences
rankB: Dict[str, Dict[str, int]] = {}
for b in current_files:
rank = {}
plist = prefsB.get(b.path, [])
for idx, a in enumerate(plist):
rank[a.path] = idx
rankB[b.path] = rank
while freeA:
a = freeA.pop(0)
a_key = a.path
plist = prefsA.get(a_key, [])
if not plist:
# no candidates
matchA[a_key] = None
continue
# propose to next candidate
i = next_proposal_index[a_key]
if i >= len(plist):
matchA[a_key] = None
continue
b = plist[i]
next_proposal_index[a_key] = i + 1
b_key = b.path
current = matchB.get(b_key)
if current is None:
# b accepts
matchA[a_key] = b
matchB[b_key] = a
else:
# b decides preference between current and proposer
rank_map = rankB.get(b_key, {})
r_current = rank_map.get(current.path, float('inf'))
r_proposer = rank_map.get(a_key, float('inf'))
if r_proposer < r_current:
# b prefers new proposer
matchA[a_key] = b
matchB[b_key] = a
# previous current becomes free again
matchA[current.path] = None
freeA.append(current)
else:
# b rejects proposer -> proposer remains free (if more prefs)
freeA.append(a)
# Build results list: pairs for matched A entries
results: List[Tuple[Optional[FileMeta], Optional[FileMeta]]] = []
usedB = set()
for a in baseline_files:
b = matchA.get(a.path)
if b is None:
results.append((a, None))
else:
results.append((a, b))
usedB.add(b.path)
# Any B not matched are added as (None, b)
for b in current_files:
if b.path not in usedB:
results.append((None, b))
return results
@staticmethod
def _diff_file_pair(fileA_path: Optional[str], fileB_path: Optional[str]) -> Dict:
res = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
if fileA_path is None and fileB_path is None:
return res
if fileA_path is None:
# all lines are added
try:
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
res["added"] = len(lines)
except Exception:
res["added"] = 0
return res
if fileB_path is None:
try:
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
res["deleted"] = len(lines)
except Exception:
res["deleted"] = 0
return res
# both exist; line-based diff
try:
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as fa:
a_lines = fa.readlines()
except Exception:
a_lines = []
try:
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as fb:
b_lines = fb.readlines()
except Exception:
b_lines = []
sm = difflib.SequenceMatcher(a=a_lines, b=b_lines)
# DEBUG: Log if files are identical but difflib finds differences
has_differences = False
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag != "equal":
has_differences = True
break
if has_differences and len(a_lines) == len(b_lines):
# Files have same line count but difflib sees differences
print(f"[DIFFER] ANOMALY DETECTED:")
print(f" FileA: {fileA_path}")
print(f" FileB: {fileB_path}")
print(f" Lines: {len(a_lines)} vs {len(b_lines)}")
# Check first differing line
for i, (line_a, line_b) in enumerate(zip(a_lines, b_lines)):
if line_a != line_b:
print(f" First diff at line {i+1}:")
print(f" A: {repr(line_a[:80])}")
print(f" B: {repr(line_b[:80])}")
break
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag == "equal":
res["unmodified"] += (i2 - i1)
elif tag == "delete":
res["deleted"] += (i2 - i1)
elif tag == "insert":
res["added"] += (j2 - j1)
elif tag == "replace":
la = i2 - i1
lb = j2 - j1
res["modified"] += min(la, lb)
if la > lb:
res["deleted"] += la - lb
elif lb > la:
res["added"] += lb - la
return res
def diff(self) -> Dict:
baseline_files = self.baseline.files
current_files = self.build_current_file_list()
pairs = self.match_files(baseline_files, current_files)
total = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
matched_results = []
# helper to construct absolute paths
def abs_path_for(meta: FileMeta) -> str:
return os.path.join(self.current_dir, meta.path) if meta is not None else None
# process pairs possibly in parallel
tasks = []
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
futures = []
for a, b in pairs:
fa = os.path.join(self.baseline_files_dir, a.path) if a is not None else None
fb = os.path.join(self.current_dir, b.path) if b is not None else None
futures.append(ex.submit(self._diff_file_pair, fa, fb))
for (a, b), fut in zip(pairs, futures):
res = fut.result()
total["added"] += res["added"]
total["deleted"] += res["deleted"]
total["modified"] += res["modified"]
total["unmodified"] += res["unmodified"]
# Extract countings and metrics from baseline and current files
baseline_countings = a.countings if (a is not None and hasattr(a, 'countings')) else None
baseline_metrics = a.metrics if (a is not None and hasattr(a, 'metrics')) else None
current_countings = b.countings if (b is not None and hasattr(b, 'countings')) else None
current_metrics = b.metrics if (b is not None and hasattr(b, 'metrics')) else None
# Compute deltas for countings
countings_delta = None
if baseline_countings and current_countings:
countings_delta = {
"physical_lines": current_countings.get("physical_lines", 0) - baseline_countings.get("physical_lines", 0),
"code_lines": current_countings.get("code_lines", 0) - baseline_countings.get("code_lines", 0),
"comment_lines": current_countings.get("comment_lines", 0) - baseline_countings.get("comment_lines", 0),
"blank_lines": current_countings.get("blank_lines", 0) - baseline_countings.get("blank_lines", 0),
}
# DEBUG LOGGING: Show comparison details when there's a delta
if any(v != 0 for v in countings_delta.values()):
fileA_path = a.path if a else "None"
fileB_path = b.path if b else "None"
print(f"[DIFFER] DELTA DETECTED for {fileA_path} vs {fileB_path}")
print(f" Baseline: {baseline_countings}")
print(f" Current: {current_countings}")
print(f" Delta: {countings_delta}")
# Compute deltas for metrics
metrics_delta = None
if baseline_metrics and current_metrics:
metrics_delta = {
"func_count": current_metrics.get("func_count", 0) - baseline_metrics.get("func_count", 0),
"avg_cc": current_metrics.get("avg_cc", 0.0) - baseline_metrics.get("avg_cc", 0.0),
"max_cc": current_metrics.get("max_cc", 0) - baseline_metrics.get("max_cc", 0),
"mi": current_metrics.get("mi", 0.0) - baseline_metrics.get("mi", 0.0),
}
matched_results.append({
"fileA": a.path if a is not None else None,
"fileB": b.path if b is not None else None,
"counts": res,
"baseline_countings": baseline_countings,
"current_countings": current_countings,
"countings_delta": countings_delta,
"baseline_metrics": baseline_metrics,
"current_metrics": current_metrics,
"metrics_delta": metrics_delta,
})
result = {"baseline_id": self.baseline.baseline_id, "compared_at": time.time(), "total": total, "pairs": matched_results}
# Compute summary statistics from baseline and current file metadata
try:
# Calculate baseline summary from baseline files (which have embedded countings/metrics)
baseline_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0}
baseline_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0}
baseline_metrics_count = 0
for fm in baseline_files:
if fm.countings:
baseline_counts["physical_lines"] += fm.countings.get("physical_lines", 0)
baseline_counts["code_lines"] += fm.countings.get("code_lines", 0)
baseline_counts["comment_lines"] += fm.countings.get("comment_lines", 0)
baseline_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
baseline_counts["file_count"] += 1
if fm.metrics:
baseline_metrics["total_func_count"] += fm.metrics.get("func_count", 0)
baseline_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
baseline_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
baseline_metrics_count += 1
if baseline_metrics_count > 0:
baseline_metrics["avg_avg_cc"] /= baseline_metrics_count
baseline_metrics["avg_mi"] /= baseline_metrics_count
baseline_metrics["file_count"] = baseline_metrics_count
# Calculate current summary from current files (which have embedded countings/metrics)
current_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0}
current_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0}
current_metrics_count = 0
for fm in current_files:
if fm.countings:
current_counts["physical_lines"] += fm.countings.get("physical_lines", 0)
current_counts["code_lines"] += fm.countings.get("code_lines", 0)
current_counts["comment_lines"] += fm.countings.get("comment_lines", 0)
current_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
current_counts["file_count"] += 1
if fm.metrics:
current_metrics["total_func_count"] += fm.metrics.get("func_count", 0)
current_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
current_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
current_metrics_count += 1
if current_metrics_count > 0:
current_metrics["avg_avg_cc"] /= current_metrics_count
current_metrics["avg_mi"] /= current_metrics_count
current_metrics["file_count"] = current_metrics_count
# Compute deltas
delta_counts = {
"physical_lines": current_counts["physical_lines"] - baseline_counts["physical_lines"],
"code_lines": current_counts["code_lines"] - baseline_counts["code_lines"],
"comment_lines": current_counts["comment_lines"] - baseline_counts["comment_lines"],
"blank_lines": current_counts["blank_lines"] - baseline_counts["blank_lines"],
"file_count": current_counts["file_count"] - baseline_counts["file_count"],
}
delta_metrics = {
"total_func_count": current_metrics["total_func_count"] - baseline_metrics["total_func_count"],
"avg_avg_cc": current_metrics["avg_avg_cc"] - baseline_metrics["avg_avg_cc"],
"avg_mi": current_metrics["avg_mi"] - baseline_metrics["avg_mi"],
}
result['summary'] = {
'baseline': {'countings': baseline_counts, 'metrics': baseline_metrics},
'current': {'countings': current_counts, 'metrics': current_metrics},
'delta': {'countings': delta_counts, 'metrics': delta_metrics}
}
except Exception:
pass
return result