810 lines
35 KiB
Python
810 lines
35 KiB
Python
"""
|
|
Baseline manager and differ prototype.
|
|
|
|
- create baseline from directory (snapshot by default)
|
|
- load baseline metadata
|
|
- diff baseline vs current directory
|
|
- output results as dict / JSON-serializable
|
|
|
|
This is a minimal, self-contained implementation inspired by UCC's DiffTool.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import fnmatch
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
import difflib
|
|
|
|
BASELINE_ROOT_DIRNAME = ".pyucc_baselines"
|
|
|
|
|
|
def _sha1_of_file(path: Path, chunk_size: int = 8192) -> str:
|
|
h = hashlib.sha1()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(chunk_size), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
@dataclass
|
|
class FileMeta:
|
|
path: str # relative path
|
|
size: int
|
|
mtime: float
|
|
sha1: Optional[str] = None
|
|
countings: Optional[Dict] = None
|
|
metrics: Optional[Dict] = None
|
|
|
|
|
|
@dataclass
|
|
class BaselineMetadata:
|
|
baseline_id: str
|
|
created_at: float
|
|
source: str # 'local' or 'git'
|
|
origin: Optional[str]
|
|
project_root: str
|
|
files: List[FileMeta]
|
|
profile: Optional[str] = None
|
|
|
|
|
|
class BaselineManager:
|
|
def __init__(self, workspace_root: str, baselines_root: Optional[str] = None):
|
|
"""Manage baselines storage.
|
|
|
|
Args:
|
|
workspace_root: path to the project/workspace (kept for metadata usage).
|
|
baselines_root: optional absolute or relative path where baselines are stored.
|
|
If omitted, the environment variable `PYUCC_BASELINE_DIR` is consulted;
|
|
if that's not set, defaults to `./baseline` in the current working dir.
|
|
"""
|
|
self.workspace_root = os.path.abspath(workspace_root)
|
|
if baselines_root:
|
|
self.baselines_root = os.path.abspath(baselines_root)
|
|
else:
|
|
# priority: env var, app settings, fallback to ./baseline
|
|
env = os.getenv("PYUCC_BASELINE_DIR")
|
|
if env:
|
|
self.baselines_root = os.path.abspath(env)
|
|
else:
|
|
# try app settings if available
|
|
try:
|
|
from ..config import settings as app_settings
|
|
sdir = app_settings.get_baseline_dir()
|
|
except Exception:
|
|
sdir = None
|
|
if sdir:
|
|
self.baselines_root = os.path.abspath(sdir)
|
|
else:
|
|
self.baselines_root = os.path.join(os.getcwd(), "baseline")
|
|
os.makedirs(self.baselines_root, exist_ok=True)
|
|
|
|
def _baseline_dir(self, baseline_id: str) -> str:
|
|
return os.path.join(self.baselines_root, baseline_id)
|
|
|
|
def list_baselines(self) -> List[str]:
|
|
return [d for d in os.listdir(self.baselines_root) if os.path.isdir(os.path.join(self.baselines_root, d))]
|
|
|
|
def get_metadata_path(self, baseline_id: str) -> str:
|
|
return os.path.join(self._baseline_dir(baseline_id), "metadata.json")
|
|
|
|
def create_baseline_from_dir(self, dir_path: str, baseline_id: Optional[str] = None, snapshot: bool = True, compute_sha1: bool = True, ignore_patterns: Optional[List[str]] = None, profile_name: Optional[str] = None, max_keep: int = 5) -> str:
|
|
dir_path = os.path.abspath(dir_path)
|
|
if baseline_id is None:
|
|
ts = time.strftime("%Y%m%dT%H%M%S")
|
|
# include profile name in baseline id when available
|
|
if profile_name:
|
|
safe_profile = profile_name.replace(" ", "_")
|
|
baseline_id = f"{safe_profile}__{ts}_local"
|
|
else:
|
|
baseline_id = f"{ts}_local"
|
|
dest = self._baseline_dir(baseline_id)
|
|
if os.path.exists(dest):
|
|
raise FileExistsError(dest)
|
|
os.makedirs(dest, exist_ok=False)
|
|
|
|
files_meta: List[FileMeta] = []
|
|
|
|
# Walk source dir and collect metadata
|
|
ignore_patterns = ignore_patterns or []
|
|
for root, dirs, files in os.walk(dir_path):
|
|
for fn in files:
|
|
fpath = os.path.join(root, fn)
|
|
# skip baseline storage area if under workspace_root
|
|
if os.path.commonpath([self.baselines_root, fpath]) == self.baselines_root:
|
|
continue
|
|
rel = os.path.relpath(fpath, dir_path)
|
|
# check ignore patterns against relative path (unix-style)
|
|
rel_unix = rel.replace("\\", "/")
|
|
ignored = False
|
|
for pat in ignore_patterns:
|
|
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(fn, pat):
|
|
ignored = True
|
|
break
|
|
if ignored:
|
|
continue
|
|
try:
|
|
st = os.stat(fpath)
|
|
except OSError:
|
|
continue
|
|
sha1 = None
|
|
if compute_sha1 and st.st_size > 0:
|
|
try:
|
|
sha1 = _sha1_of_file(Path(fpath))
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(FileMeta(path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1))
|
|
|
|
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
|
|
try:
|
|
from ..core.countings_impl import analyze_file_counts as _analyze_file_counts
|
|
from ..core.metrics import analyze_file_metrics as _analyze_metrics
|
|
for fm in files_meta:
|
|
abs_path = os.path.join(dir_path, fm.path)
|
|
# per-file counts
|
|
try:
|
|
c = _analyze_file_counts(Path(abs_path))
|
|
fm.countings = {
|
|
"physical_lines": int(c.get("physical_lines", 0)),
|
|
"code_lines": int(c.get("code_lines", 0)),
|
|
"comment_lines": int(c.get("comment_lines", 0)),
|
|
"blank_lines": int(c.get("blank_lines", 0)),
|
|
}
|
|
except Exception:
|
|
fm.countings = None
|
|
# per-file metrics
|
|
try:
|
|
m = _analyze_metrics(abs_path)
|
|
fm.metrics = {
|
|
"avg_cc": float(m.get("avg_cc", 0.0)),
|
|
"max_cc": int(m.get("max_cc", 0)),
|
|
"func_count": int(m.get("func_count", 0)),
|
|
"mi": float(m.get("mi", 0.0)),
|
|
}
|
|
except Exception:
|
|
fm.metrics = None
|
|
except Exception:
|
|
pass
|
|
|
|
metadata = BaselineMetadata(
|
|
baseline_id=baseline_id,
|
|
created_at=time.time(),
|
|
source="local",
|
|
origin=None,
|
|
project_root=dir_path,
|
|
files=files_meta,
|
|
profile=profile_name,
|
|
) # Save metadata
|
|
meta_path = self.get_metadata_path(baseline_id)
|
|
with open(meta_path, "w", encoding="utf-8") as f:
|
|
json.dump(self._metadata_to_dict(metadata), f, indent=2)
|
|
|
|
# Optionally store a snapshot
|
|
if snapshot:
|
|
snapshot_dir = os.path.join(dest, "files")
|
|
os.makedirs(snapshot_dir, exist_ok=True)
|
|
# Copy only the files that were included in the baseline (respecting ignore patterns)
|
|
for fm in files_meta:
|
|
src_file = os.path.join(dir_path, fm.path)
|
|
dst_file = os.path.join(snapshot_dir, fm.path)
|
|
# Create parent directories if needed
|
|
dst_parent = os.path.dirname(dst_file)
|
|
if dst_parent:
|
|
os.makedirs(dst_parent, exist_ok=True)
|
|
try:
|
|
shutil.copy2(src_file, dst_file) # copy2 preserves metadata
|
|
except Exception:
|
|
pass # skip files that cannot be copied
|
|
|
|
# Optionally create zip archive (controlled by settings)
|
|
# Check if user wants zip archives (for space savings at cost of speed)
|
|
try:
|
|
from ..config import settings as app_settings
|
|
if app_settings.get_zip_baselines():
|
|
zip_path = os.path.join(dest, "files.zip")
|
|
shutil.make_archive(base_name=zip_path[:-4], format="zip", root_dir=snapshot_dir)
|
|
except Exception:
|
|
pass # if settings not available or zip fails, continue without zip
|
|
|
|
return baseline_id
|
|
|
|
def _prune_old_baselines(self, project_root: str, profile_name: Optional[str], keep: int = 5):
|
|
"""Prune older baselines for the same project and profile, keeping `keep` newest."""
|
|
# scan baselines root and load metadata for each baseline
|
|
entries = [] # list of (created_at, baseline_id, path)
|
|
for bn in os.listdir(self.baselines_root):
|
|
bdir = os.path.join(self.baselines_root, bn)
|
|
if not os.path.isdir(bdir):
|
|
continue
|
|
meta_path = os.path.join(bdir, "metadata.json")
|
|
if not os.path.exists(meta_path):
|
|
continue
|
|
try:
|
|
with open(meta_path, "r", encoding="utf-8") as f:
|
|
j = json.load(f)
|
|
except Exception:
|
|
continue
|
|
# match by project_root and profile
|
|
if j.get("project_root") != project_root:
|
|
continue
|
|
if profile_name is None:
|
|
if j.get("profile") is not None:
|
|
continue
|
|
else:
|
|
if j.get("profile") != profile_name:
|
|
continue
|
|
entries.append((j.get("created_at", 0), j.get("baseline_id", bn), bdir))
|
|
|
|
# sort by created_at descending (newest first)
|
|
entries.sort(key=lambda x: x[0], reverse=True)
|
|
# remove entries beyond keep
|
|
for _, bid, path in entries[keep:]:
|
|
try:
|
|
shutil.rmtree(path)
|
|
except Exception:
|
|
pass
|
|
|
|
def create_baseline_from_git(self, repo_path: str, commit_ref: str = "HEAD", baseline_id: Optional[str] = None, snapshot: bool = True, compute_sha1: bool = True, ignore_patterns: Optional[List[str]] = None, profile_name: Optional[str] = None, max_keep: int = 5) -> str:
|
|
"""Create a baseline by exporting a git commit (using `git archive`).
|
|
|
|
This method requires that `git` is available in PATH. It will create a zip
|
|
archive of the requested commit and then build the baseline metadata from
|
|
the extracted tree.
|
|
"""
|
|
repo_path = os.path.abspath(repo_path)
|
|
if baseline_id is None:
|
|
ts = time.strftime("%Y%m%dT%H%M%S")
|
|
if profile_name:
|
|
safe_profile = profile_name.replace(" ", "_")
|
|
baseline_id = f"{safe_profile}__{ts}_git_{commit_ref}"
|
|
else:
|
|
baseline_id = f"{ts}_git_{commit_ref}"
|
|
dest = self._baseline_dir(baseline_id)
|
|
if os.path.exists(dest):
|
|
raise FileExistsError(dest)
|
|
os.makedirs(dest, exist_ok=False)
|
|
|
|
# create a temporary zip with git archive
|
|
zip_tmp = os.path.join(dest, "export.zip")
|
|
try:
|
|
subprocess.run(["git", "archive", "--format=zip", "-o", zip_tmp, commit_ref], cwd=repo_path, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"git archive failed: {e}")
|
|
|
|
# extract zip to a temp dir and build metadata similarly to dir baseline
|
|
extract_dir = os.path.join(dest, "extracted")
|
|
os.makedirs(extract_dir, exist_ok=True)
|
|
shutil.unpack_archive(zip_tmp, extract_dir)
|
|
|
|
# reuse create_baseline_from_dir logic but avoid creating nested baseline dir
|
|
files_meta: List[FileMeta] = []
|
|
ignore_patterns = ignore_patterns or []
|
|
for root, dirs, files in os.walk(extract_dir):
|
|
for fn in files:
|
|
fpath = os.path.join(root, fn)
|
|
rel = os.path.relpath(fpath, extract_dir)
|
|
rel_unix = rel.replace("\\", "/")
|
|
# apply ignore patterns
|
|
ignored = False
|
|
for pat in ignore_patterns:
|
|
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(fn, pat):
|
|
ignored = True
|
|
break
|
|
if ignored:
|
|
continue
|
|
try:
|
|
st = os.stat(fpath)
|
|
except OSError:
|
|
continue
|
|
sha1 = None
|
|
if compute_sha1 and st.st_size > 0:
|
|
try:
|
|
sha1 = _sha1_of_file(Path(fpath))
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(FileMeta(path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1))
|
|
|
|
# attempt to run per-file analyzers on the extracted tree and attach results
|
|
try:
|
|
from ..core.countings_impl import analyze_file_counts as _analyze_file_counts
|
|
from ..core.metrics import analyze_file_metrics as _analyze_metrics
|
|
for fm in files_meta:
|
|
abs_path = os.path.join(extract_dir, fm.path)
|
|
try:
|
|
c = _analyze_file_counts(Path(abs_path))
|
|
fm.countings = {
|
|
"physical_lines": int(c.get("physical_lines", 0)),
|
|
"code_lines": int(c.get("code_lines", 0)),
|
|
"comment_lines": int(c.get("comment_lines", 0)),
|
|
"blank_lines": int(c.get("blank_lines", 0)),
|
|
}
|
|
except Exception:
|
|
fm.countings = None
|
|
try:
|
|
m = _analyze_metrics(abs_path)
|
|
fm.metrics = {
|
|
"avg_cc": float(m.get("avg_cc", 0.0)),
|
|
"max_cc": int(m.get("max_cc", 0)),
|
|
"func_count": int(m.get("func_count", 0)),
|
|
"mi": float(m.get("mi", 0.0)),
|
|
}
|
|
except Exception:
|
|
fm.metrics = None
|
|
except Exception:
|
|
pass
|
|
|
|
metadata = BaselineMetadata(
|
|
baseline_id=baseline_id,
|
|
created_at=time.time(),
|
|
source="git",
|
|
origin=commit_ref,
|
|
project_root=repo_path,
|
|
files=files_meta,
|
|
profile=profile_name,
|
|
)
|
|
|
|
meta_path = self.get_metadata_path(baseline_id)
|
|
with open(meta_path, "w", encoding="utf-8") as f:
|
|
json.dump(self._metadata_to_dict(metadata), f, indent=2)
|
|
|
|
# Optionally keep the extracted tree (snapshot)
|
|
if snapshot:
|
|
# move extracted content into dest/files
|
|
snapshot_dir = os.path.join(dest, "files")
|
|
shutil.move(extract_dir, snapshot_dir)
|
|
|
|
# Optionally create zip archive from the files directory
|
|
try:
|
|
from ..config import settings as app_settings
|
|
if app_settings.get_zip_baselines():
|
|
# Keep both files/ and create files.zip
|
|
zip_archive = os.path.join(dest, "files.zip")
|
|
shutil.make_archive(base_name=zip_archive[:-4], format="zip", root_dir=snapshot_dir)
|
|
except Exception:
|
|
pass
|
|
|
|
# Always remove the git export zip (it was just temporary)
|
|
try:
|
|
os.remove(zip_tmp)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# remove extracted files and zip
|
|
shutil.rmtree(extract_dir, ignore_errors=True)
|
|
try:
|
|
os.remove(zip_tmp)
|
|
except Exception:
|
|
pass
|
|
|
|
# prune old baselines if requested
|
|
if max_keep > 0:
|
|
self._prune_old_baselines(repo_path, profile_name, max_keep)
|
|
|
|
return baseline_id
|
|
|
|
def load_metadata(self, baseline_id: str) -> BaselineMetadata:
|
|
meta_path = self.get_metadata_path(baseline_id)
|
|
with open(meta_path, "r", encoding="utf-8") as f:
|
|
j = json.load(f)
|
|
files = [FileMeta(**fm) for fm in j["files"]]
|
|
return BaselineMetadata(
|
|
baseline_id=j["baseline_id"],
|
|
created_at=j["created_at"],
|
|
source=j.get("source", "local"),
|
|
origin=j.get("origin"),
|
|
project_root=j.get("project_root", ""),
|
|
files=files,
|
|
profile=j.get("profile"),
|
|
)
|
|
|
|
def _metadata_to_dict(self, meta: BaselineMetadata) -> Dict:
|
|
d = asdict(meta)
|
|
# dataclass conversion
|
|
d["files"] = [asdict(fm) for fm in meta.files]
|
|
return d
|
|
|
|
|
|
class Differ:
|
|
def __init__(self, baseline: BaselineMetadata, current_dir: str, max_workers: int = 4, ignore_patterns: Optional[List[str]] = None):
|
|
self.baseline = baseline
|
|
self.current_dir = os.path.abspath(current_dir)
|
|
self.max_workers = max_workers
|
|
self.ignore_patterns = ignore_patterns or []
|
|
self._current_files_cache: Optional[List[FileMeta]] = None
|
|
|
|
def build_current_file_list(self) -> List[FileMeta]:
|
|
# Return cached result if already computed
|
|
if self._current_files_cache is not None:
|
|
return self._current_files_cache
|
|
|
|
files_meta: List[FileMeta] = []
|
|
for root, dirs, files in os.walk(self.current_dir):
|
|
for fn in files:
|
|
fpath = os.path.join(root, fn)
|
|
rel = os.path.relpath(fpath, self.current_dir)
|
|
# apply ignore patterns from profile: test against relative path (unix-style) and filename
|
|
rel_unix = rel.replace("\\", "/")
|
|
ignored = False
|
|
for pat in (self.ignore_patterns or []):
|
|
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(fn, pat):
|
|
ignored = True
|
|
break
|
|
if ignored:
|
|
continue
|
|
try:
|
|
st = os.stat(fpath)
|
|
except OSError:
|
|
continue
|
|
sha1 = None
|
|
if st.st_size > 0:
|
|
try:
|
|
sha1 = _sha1_of_file(Path(fpath))
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(FileMeta(path=rel.replace("\\", "/"), size=st.st_size, mtime=st.st_mtime, sha1=sha1))
|
|
|
|
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
|
|
# This ensures current files have the same data as baseline files for comparison
|
|
try:
|
|
from ..core.countings_impl import analyze_file_counts as _analyze_file_counts
|
|
from ..core.metrics import analyze_file_metrics as _analyze_metrics
|
|
for fm in files_meta:
|
|
abs_path = os.path.join(self.current_dir, fm.path)
|
|
# per-file counts
|
|
try:
|
|
c = _analyze_file_counts(Path(abs_path))
|
|
fm.countings = {
|
|
"physical_lines": int(c.get("physical_lines", 0)),
|
|
"code_lines": int(c.get("code_lines", 0)),
|
|
"comment_lines": int(c.get("comment_lines", 0)),
|
|
"blank_lines": int(c.get("blank_lines", 0)),
|
|
}
|
|
except Exception:
|
|
fm.countings = None
|
|
# per-file metrics
|
|
try:
|
|
m = _analyze_metrics(abs_path)
|
|
fm.metrics = {
|
|
"avg_cc": float(m.get("avg_cc", 0.0)),
|
|
"max_cc": int(m.get("max_cc", 0)),
|
|
"func_count": int(m.get("func_count", 0)),
|
|
"mi": float(m.get("mi", 0.0)),
|
|
}
|
|
except Exception:
|
|
fm.metrics = None
|
|
except Exception:
|
|
pass
|
|
|
|
# Cache the result to avoid recomputation
|
|
self._current_files_cache = files_meta
|
|
return files_meta
|
|
|
|
@staticmethod
|
|
def _index_by_name(files: List[FileMeta]) -> Dict[str, List[FileMeta]]:
|
|
idx: Dict[str, List[FileMeta]] = {}
|
|
for f in files:
|
|
name = os.path.basename(f.path)
|
|
idx.setdefault(name, []).append(f)
|
|
return idx
|
|
|
|
@staticmethod
|
|
def _levenshtein(a: str, b: str) -> int:
|
|
# simple DP implementation
|
|
la, lb = len(a), len(b)
|
|
if la == 0:
|
|
return lb
|
|
if lb == 0:
|
|
return la
|
|
prev = list(range(lb + 1))
|
|
for i, ca in enumerate(a, start=1):
|
|
cur = [i] + [0] * lb
|
|
for j, cb in enumerate(b, start=1):
|
|
add = prev[j] + 1
|
|
delete = cur[j - 1] + 1
|
|
change = prev[j - 1] + (0 if ca == cb else 1)
|
|
cur[j] = min(add, delete, change)
|
|
prev = cur
|
|
return prev[lb]
|
|
|
|
def match_files(self, baseline_files: List[FileMeta], current_files: List[FileMeta]) -> List[Tuple[Optional[FileMeta], Optional[FileMeta]]]:
|
|
# Implement Gale-Shapley stable matching inspired by UCC logic.
|
|
# Build maps by filename only (candidates must share the same filename)
|
|
mapA_by_name = self._index_by_name(baseline_files)
|
|
mapB_by_name = self._index_by_name(current_files)
|
|
|
|
# Build preference lists (for A: list of B candidates sorted by path distance)
|
|
prefsA: Dict[str, List[FileMeta]] = {} # key: a.path
|
|
prefsB: Dict[str, List[FileMeta]] = {}
|
|
|
|
# helper: compute preference value between two file paths (parent dirs)
|
|
def pref_val(pa: str, pb: str) -> int:
|
|
parent_a = os.path.dirname(pa)
|
|
parent_b = os.path.dirname(pb)
|
|
return self._levenshtein(parent_a, parent_b)
|
|
|
|
# populate preferences A -> Bs
|
|
for a in baseline_files:
|
|
candidates = mapB_by_name.get(os.path.basename(a.path), [])
|
|
# compute scores and sort
|
|
scored = [(pref_val(a.path, b.path), b) for b in candidates]
|
|
scored.sort(key=lambda x: x[0])
|
|
prefsA[a.path] = [b for (_s, b) in scored]
|
|
|
|
# populate preferences B -> As
|
|
for b in current_files:
|
|
candidates = mapA_by_name.get(os.path.basename(b.path), [])
|
|
scored = [(pref_val(a.path, b.path), a) for a in candidates]
|
|
scored.sort(key=lambda x: x[0])
|
|
prefsB[b.path] = [a for (_s, a) in scored]
|
|
|
|
# Prepare Gale-Shapley structures
|
|
freeA = [a for a in baseline_files]
|
|
next_proposal_index: Dict[str, int] = {a.path: 0 for a in baseline_files}
|
|
matchA: Dict[str, Optional[FileMeta]] = {a.path: None for a in baseline_files}
|
|
matchB: Dict[str, Optional[FileMeta]] = {b.path: None for b in current_files}
|
|
|
|
# For quick comparison, build rank maps for B preferences
|
|
rankB: Dict[str, Dict[str, int]] = {}
|
|
for b in current_files:
|
|
rank = {}
|
|
plist = prefsB.get(b.path, [])
|
|
for idx, a in enumerate(plist):
|
|
rank[a.path] = idx
|
|
rankB[b.path] = rank
|
|
|
|
while freeA:
|
|
a = freeA.pop(0)
|
|
a_key = a.path
|
|
plist = prefsA.get(a_key, [])
|
|
if not plist:
|
|
# no candidates
|
|
matchA[a_key] = None
|
|
continue
|
|
# propose to next candidate
|
|
i = next_proposal_index[a_key]
|
|
if i >= len(plist):
|
|
matchA[a_key] = None
|
|
continue
|
|
b = plist[i]
|
|
next_proposal_index[a_key] = i + 1
|
|
b_key = b.path
|
|
current = matchB.get(b_key)
|
|
if current is None:
|
|
# b accepts
|
|
matchA[a_key] = b
|
|
matchB[b_key] = a
|
|
else:
|
|
# b decides preference between current and proposer
|
|
rank_map = rankB.get(b_key, {})
|
|
r_current = rank_map.get(current.path, float('inf'))
|
|
r_proposer = rank_map.get(a_key, float('inf'))
|
|
if r_proposer < r_current:
|
|
# b prefers new proposer
|
|
matchA[a_key] = b
|
|
matchB[b_key] = a
|
|
# previous current becomes free again
|
|
matchA[current.path] = None
|
|
freeA.append(current)
|
|
else:
|
|
# b rejects proposer -> proposer remains free (if more prefs)
|
|
freeA.append(a)
|
|
|
|
# Build results list: pairs for matched A entries
|
|
results: List[Tuple[Optional[FileMeta], Optional[FileMeta]]] = []
|
|
usedB = set()
|
|
for a in baseline_files:
|
|
b = matchA.get(a.path)
|
|
if b is None:
|
|
results.append((a, None))
|
|
else:
|
|
results.append((a, b))
|
|
usedB.add(b.path)
|
|
|
|
# Any B not matched are added as (None, b)
|
|
for b in current_files:
|
|
if b.path not in usedB:
|
|
results.append((None, b))
|
|
|
|
return results
|
|
|
|
@staticmethod
|
|
def _diff_file_pair(fileA_path: Optional[str], fileB_path: Optional[str]) -> Dict:
|
|
res = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
|
|
if fileA_path is None and fileB_path is None:
|
|
return res
|
|
if fileA_path is None:
|
|
# all lines are added
|
|
try:
|
|
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
lines = f.readlines()
|
|
res["added"] = len(lines)
|
|
except Exception:
|
|
res["added"] = 0
|
|
return res
|
|
if fileB_path is None:
|
|
try:
|
|
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
lines = f.readlines()
|
|
res["deleted"] = len(lines)
|
|
except Exception:
|
|
res["deleted"] = 0
|
|
return res
|
|
|
|
# both exist; line-based diff
|
|
try:
|
|
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as fa:
|
|
a_lines = fa.readlines()
|
|
except Exception:
|
|
a_lines = []
|
|
try:
|
|
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as fb:
|
|
b_lines = fb.readlines()
|
|
except Exception:
|
|
b_lines = []
|
|
|
|
sm = difflib.SequenceMatcher(a=a_lines, b=b_lines)
|
|
for tag, i1, i2, j1, j2 in sm.get_opcodes():
|
|
if tag == "equal":
|
|
res["unmodified"] += (i2 - i1)
|
|
elif tag == "delete":
|
|
res["deleted"] += (i2 - i1)
|
|
elif tag == "insert":
|
|
res["added"] += (j2 - j1)
|
|
elif tag == "replace":
|
|
la = i2 - i1
|
|
lb = j2 - j1
|
|
res["modified"] += min(la, lb)
|
|
if la > lb:
|
|
res["deleted"] += la - lb
|
|
elif lb > la:
|
|
res["added"] += lb - la
|
|
return res
|
|
|
|
def diff(self) -> Dict:
|
|
baseline_files = self.baseline.files
|
|
current_files = self.build_current_file_list()
|
|
pairs = self.match_files(baseline_files, current_files)
|
|
|
|
total = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
|
|
matched_results = []
|
|
|
|
# helper to construct absolute paths
|
|
def abs_path_for(meta: FileMeta) -> str:
|
|
return os.path.join(self.current_dir, meta.path) if meta is not None else None
|
|
|
|
# process pairs possibly in parallel
|
|
tasks = []
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
|
|
futures = []
|
|
for a, b in pairs:
|
|
fa = os.path.join(self.baseline.project_root, a.path) if a is not None else None
|
|
fb = os.path.join(self.current_dir, b.path) if b is not None else None
|
|
futures.append(ex.submit(self._diff_file_pair, fa, fb))
|
|
for (a, b), fut in zip(pairs, futures):
|
|
res = fut.result()
|
|
total["added"] += res["added"]
|
|
total["deleted"] += res["deleted"]
|
|
total["modified"] += res["modified"]
|
|
total["unmodified"] += res["unmodified"]
|
|
|
|
# Extract countings and metrics from baseline and current files
|
|
baseline_countings = a.countings if (a is not None and hasattr(a, 'countings')) else None
|
|
baseline_metrics = a.metrics if (a is not None and hasattr(a, 'metrics')) else None
|
|
current_countings = b.countings if (b is not None and hasattr(b, 'countings')) else None
|
|
current_metrics = b.metrics if (b is not None and hasattr(b, 'metrics')) else None
|
|
|
|
# Compute deltas for countings
|
|
countings_delta = None
|
|
if baseline_countings and current_countings:
|
|
countings_delta = {
|
|
"physical_lines": current_countings.get("physical_lines", 0) - baseline_countings.get("physical_lines", 0),
|
|
"code_lines": current_countings.get("code_lines", 0) - baseline_countings.get("code_lines", 0),
|
|
"comment_lines": current_countings.get("comment_lines", 0) - baseline_countings.get("comment_lines", 0),
|
|
"blank_lines": current_countings.get("blank_lines", 0) - baseline_countings.get("blank_lines", 0),
|
|
}
|
|
|
|
# Compute deltas for metrics
|
|
metrics_delta = None
|
|
if baseline_metrics and current_metrics:
|
|
metrics_delta = {
|
|
"func_count": current_metrics.get("func_count", 0) - baseline_metrics.get("func_count", 0),
|
|
"avg_cc": current_metrics.get("avg_cc", 0.0) - baseline_metrics.get("avg_cc", 0.0),
|
|
"max_cc": current_metrics.get("max_cc", 0) - baseline_metrics.get("max_cc", 0),
|
|
"mi": current_metrics.get("mi", 0.0) - baseline_metrics.get("mi", 0.0),
|
|
}
|
|
|
|
matched_results.append({
|
|
"fileA": a.path if a is not None else None,
|
|
"fileB": b.path if b is not None else None,
|
|
"counts": res,
|
|
"baseline_countings": baseline_countings,
|
|
"current_countings": current_countings,
|
|
"countings_delta": countings_delta,
|
|
"baseline_metrics": baseline_metrics,
|
|
"current_metrics": current_metrics,
|
|
"metrics_delta": metrics_delta,
|
|
})
|
|
|
|
result = {"baseline_id": self.baseline.baseline_id, "compared_at": time.time(), "total": total, "pairs": matched_results}
|
|
|
|
# Compute summary statistics from baseline and current file metadata
|
|
try:
|
|
# Calculate baseline summary from baseline files (which have embedded countings/metrics)
|
|
baseline_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0}
|
|
baseline_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0}
|
|
baseline_metrics_count = 0
|
|
|
|
for fm in baseline_files:
|
|
if fm.countings:
|
|
baseline_counts["physical_lines"] += fm.countings.get("physical_lines", 0)
|
|
baseline_counts["code_lines"] += fm.countings.get("code_lines", 0)
|
|
baseline_counts["comment_lines"] += fm.countings.get("comment_lines", 0)
|
|
baseline_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
|
|
baseline_counts["file_count"] += 1
|
|
if fm.metrics:
|
|
baseline_metrics["total_func_count"] += fm.metrics.get("func_count", 0)
|
|
baseline_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
|
|
baseline_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
|
|
baseline_metrics_count += 1
|
|
|
|
if baseline_metrics_count > 0:
|
|
baseline_metrics["avg_avg_cc"] /= baseline_metrics_count
|
|
baseline_metrics["avg_mi"] /= baseline_metrics_count
|
|
baseline_metrics["file_count"] = baseline_metrics_count
|
|
|
|
# Calculate current summary from current files (which have embedded countings/metrics)
|
|
current_counts = {"physical_lines": 0, "code_lines": 0, "comment_lines": 0, "blank_lines": 0, "file_count": 0}
|
|
current_metrics = {"file_count": 0, "total_func_count": 0, "avg_avg_cc": 0.0, "avg_mi": 0.0}
|
|
current_metrics_count = 0
|
|
|
|
for fm in current_files:
|
|
if fm.countings:
|
|
current_counts["physical_lines"] += fm.countings.get("physical_lines", 0)
|
|
current_counts["code_lines"] += fm.countings.get("code_lines", 0)
|
|
current_counts["comment_lines"] += fm.countings.get("comment_lines", 0)
|
|
current_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
|
|
current_counts["file_count"] += 1
|
|
if fm.metrics:
|
|
current_metrics["total_func_count"] += fm.metrics.get("func_count", 0)
|
|
current_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
|
|
current_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
|
|
current_metrics_count += 1
|
|
|
|
if current_metrics_count > 0:
|
|
current_metrics["avg_avg_cc"] /= current_metrics_count
|
|
current_metrics["avg_mi"] /= current_metrics_count
|
|
current_metrics["file_count"] = current_metrics_count
|
|
|
|
# Compute deltas
|
|
delta_counts = {
|
|
"physical_lines": current_counts["physical_lines"] - baseline_counts["physical_lines"],
|
|
"code_lines": current_counts["code_lines"] - baseline_counts["code_lines"],
|
|
"comment_lines": current_counts["comment_lines"] - baseline_counts["comment_lines"],
|
|
"blank_lines": current_counts["blank_lines"] - baseline_counts["blank_lines"],
|
|
"file_count": current_counts["file_count"] - baseline_counts["file_count"],
|
|
}
|
|
delta_metrics = {
|
|
"total_func_count": current_metrics["total_func_count"] - baseline_metrics["total_func_count"],
|
|
"avg_avg_cc": current_metrics["avg_avg_cc"] - baseline_metrics["avg_avg_cc"],
|
|
"avg_mi": current_metrics["avg_mi"] - baseline_metrics["avg_mi"],
|
|
}
|
|
|
|
result['summary'] = {
|
|
'baseline': {'countings': baseline_counts, 'metrics': baseline_metrics},
|
|
'current': {'countings': current_counts, 'metrics': current_metrics},
|
|
'delta': {'countings': delta_counts, 'metrics': delta_metrics}
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|