1592 lines
66 KiB
Python
1592 lines
66 KiB
Python
"""
|
|
Baseline manager and differ prototype.
|
|
|
|
- create baseline from directory (snapshot by default)
|
|
- load baseline metadata
|
|
- diff baseline vs current directory
|
|
- output results as dict / JSON-serializable
|
|
|
|
This is a minimal, self-contained implementation inspired by UCC's DiffTool.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
import fnmatch
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
import difflib
|
|
|
|
BASELINE_ROOT_DIRNAME = ".pyucc_baselines"
|
|
|
|
|
|
def _sha1_of_file(path: Path, chunk_size: int = 8192) -> str:
|
|
h = hashlib.sha1()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(chunk_size), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
@dataclass
|
|
class FileMeta:
|
|
path: str # relative path
|
|
size: int
|
|
mtime: float
|
|
sha1: Optional[str] = None
|
|
countings: Optional[Dict] = None
|
|
metrics: Optional[Dict] = None
|
|
|
|
|
|
@dataclass
|
|
class BaselineMetadata:
|
|
baseline_id: str
|
|
created_at: float
|
|
source: str # 'local' or 'git'
|
|
origin: Optional[str]
|
|
project_root: str
|
|
files: List[FileMeta]
|
|
profile: Optional[str] = None
|
|
|
|
|
|
class BaselineManager:
|
|
def __init__(self, workspace_root: str, baselines_root: Optional[str] = None):
|
|
"""Manage baselines storage.
|
|
|
|
Args:
|
|
workspace_root: path to the project/workspace (kept for metadata usage).
|
|
baselines_root: optional absolute or relative path where baselines are stored.
|
|
If omitted, the environment variable `PYUCC_BASELINE_DIR` is consulted;
|
|
if that's not set, defaults to `./baseline` in the current working dir.
|
|
"""
|
|
self.workspace_root = os.path.abspath(workspace_root)
|
|
if baselines_root:
|
|
self.baselines_root = os.path.abspath(baselines_root)
|
|
else:
|
|
# priority: env var, app settings, fallback to ./baseline
|
|
env = os.getenv("PYUCC_BASELINE_DIR")
|
|
if env:
|
|
self.baselines_root = os.path.abspath(env)
|
|
else:
|
|
# try app settings if available
|
|
try:
|
|
from ..config import settings as app_settings
|
|
|
|
sdir = app_settings.get_baseline_dir()
|
|
except Exception:
|
|
sdir = None
|
|
if sdir:
|
|
self.baselines_root = os.path.abspath(sdir)
|
|
else:
|
|
self.baselines_root = os.path.join(os.getcwd(), "baseline")
|
|
os.makedirs(self.baselines_root, exist_ok=True)
|
|
|
|
def _baseline_dir(self, baseline_id: str) -> str:
|
|
return os.path.join(self.baselines_root, baseline_id)
|
|
|
|
def get_baseline_files_dir(self, baseline_id: str) -> str:
|
|
"""Get the directory containing the baseline snapshot files."""
|
|
return os.path.join(self._baseline_dir(baseline_id), "files")
|
|
|
|
def list_baselines(self) -> List[str]:
|
|
return [
|
|
d
|
|
for d in os.listdir(self.baselines_root)
|
|
if os.path.isdir(os.path.join(self.baselines_root, d))
|
|
]
|
|
|
|
def get_metadata_path(self, baseline_id: str) -> str:
|
|
return os.path.join(self._baseline_dir(baseline_id), "metadata.json")
|
|
|
|
def create_baseline_from_dir(
|
|
self,
|
|
dir_path: str,
|
|
baseline_id: Optional[str] = None,
|
|
snapshot: bool = True,
|
|
compute_sha1: bool = True,
|
|
ignore_patterns: Optional[List[str]] = None,
|
|
profile_name: Optional[str] = None,
|
|
max_keep: int = 5,
|
|
file_list: Optional[List[str]] = None,
|
|
) -> str:
|
|
dir_path = os.path.abspath(dir_path)
|
|
if baseline_id is None:
|
|
ts = time.strftime("%Y%m%dT%H%M%S")
|
|
# include profile name in baseline id when available
|
|
if profile_name:
|
|
safe_profile = profile_name.replace(" ", "_")
|
|
baseline_id = f"{safe_profile}__{ts}_local"
|
|
else:
|
|
baseline_id = f"{ts}_local"
|
|
dest = self._baseline_dir(baseline_id)
|
|
if os.path.exists(dest):
|
|
raise FileExistsError(dest)
|
|
os.makedirs(dest, exist_ok=False)
|
|
|
|
files_meta: List[FileMeta] = []
|
|
|
|
# Normalize ignore patterns using scanner helper so patterns like '.bak' are treated as '*.bak'
|
|
try:
|
|
from .scanner import normalize_ignore_patterns, find_source_files
|
|
|
|
ignore_patterns = normalize_ignore_patterns(ignore_patterns) or []
|
|
except Exception:
|
|
# fallback: ensure ignore_patterns is list-like
|
|
find_source_files = None
|
|
ignore_patterns = ignore_patterns or []
|
|
|
|
# If caller provided explicit file_list, use it. Otherwise delegate to scanner.find_source_files
|
|
if file_list is not None:
|
|
# build FileMeta entries from provided list (paths relative to dir_path or absolute)
|
|
for f in file_list:
|
|
try:
|
|
p = Path(f)
|
|
if not p.is_absolute():
|
|
p = Path(dir_path) / p
|
|
if not p.is_file():
|
|
continue
|
|
rel_unix = os.path.relpath(str(p), dir_path).replace("\\", "/")
|
|
st = p.stat()
|
|
sha1 = None
|
|
if compute_sha1:
|
|
try:
|
|
sha1 = _sha1_of_file(p)
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1
|
|
)
|
|
)
|
|
except Exception:
|
|
continue
|
|
else:
|
|
# Determine allowed extensions from profile (if provided) to pass to scanner
|
|
allowed_exts = None
|
|
try:
|
|
from ..config import profiles as profiles_cfg
|
|
from ..config.languages import LANGUAGE_EXTENSIONS
|
|
except Exception:
|
|
profiles_cfg = None
|
|
LANGUAGE_EXTENSIONS = {}
|
|
|
|
if profile_name and profiles_cfg:
|
|
pr = profiles_cfg.find_profile(profile_name)
|
|
if pr:
|
|
exts = []
|
|
for ln in pr.get("languages", []) or []:
|
|
if ln in LANGUAGE_EXTENSIONS:
|
|
exts.extend(LANGUAGE_EXTENSIONS[ln])
|
|
if exts:
|
|
allowed_exts = list(set(exts))
|
|
|
|
# If scanner available, use it; otherwise fallback to os.walk
|
|
if find_source_files:
|
|
try:
|
|
src_files = find_source_files(
|
|
Path(dir_path),
|
|
allowed_extensions=allowed_exts,
|
|
ignore_patterns=ignore_patterns,
|
|
)
|
|
except Exception:
|
|
src_files = []
|
|
for p in src_files:
|
|
try:
|
|
rel_unix = os.path.relpath(str(p), dir_path).replace("\\", "/")
|
|
st = p.stat()
|
|
sha1 = None
|
|
if compute_sha1:
|
|
try:
|
|
sha1 = _sha1_of_file(p)
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel_unix,
|
|
size=st.st_size,
|
|
mtime=st.st_mtime,
|
|
sha1=sha1,
|
|
)
|
|
)
|
|
except Exception:
|
|
continue
|
|
else:
|
|
for root, dirs, files in os.walk(dir_path):
|
|
for fn in files:
|
|
fpath = os.path.join(root, fn)
|
|
# skip baseline storage area if under workspace_root
|
|
if (
|
|
os.path.commonpath([self.baselines_root, fpath])
|
|
== self.baselines_root
|
|
):
|
|
continue
|
|
rel = os.path.relpath(fpath, dir_path)
|
|
# check ignore patterns against relative path (unix-style)
|
|
rel_unix = rel.replace("\\", "/")
|
|
ignored = False
|
|
for pat in ignore_patterns:
|
|
if not pat:
|
|
continue
|
|
if fnmatch.fnmatch(
|
|
rel_unix.lower(), pat
|
|
) or fnmatch.fnmatch(fn.lower(), pat):
|
|
ignored = True
|
|
break
|
|
if ignored:
|
|
continue
|
|
try:
|
|
st = os.stat(fpath)
|
|
except OSError:
|
|
continue
|
|
sha1 = None
|
|
if compute_sha1: # also compute for 0-byte files
|
|
try:
|
|
sha1 = _sha1_of_file(Path(fpath))
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel_unix,
|
|
size=st.st_size,
|
|
mtime=st.st_mtime,
|
|
sha1=sha1,
|
|
)
|
|
)
|
|
|
|
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
|
|
try:
|
|
from ..core.countings_impl import (
|
|
analyze_file_counts as _analyze_file_counts,
|
|
)
|
|
from ..core.metrics import analyze_file_metrics as _analyze_metrics
|
|
|
|
for fm in files_meta:
|
|
abs_path = os.path.join(dir_path, fm.path)
|
|
# per-file counts
|
|
try:
|
|
c = _analyze_file_counts(Path(abs_path))
|
|
fm.countings = {
|
|
"physical_lines": int(c.get("physical_lines", 0)),
|
|
"code_lines": int(c.get("code_lines", 0)),
|
|
"comment_lines": int(c.get("comment_lines", 0)),
|
|
"blank_lines": int(c.get("blank_lines", 0)),
|
|
# UCC extended metrics
|
|
"comment_whole": int(c.get("comment_whole", 0)),
|
|
"comment_embedded": int(c.get("comment_embedded", 0)),
|
|
"compiler_directives": int(c.get("compiler_directives", 0)),
|
|
"data_declarations": int(c.get("data_declarations", 0)),
|
|
"exec_instructions": int(c.get("exec_instructions", 0)),
|
|
"logical_sloc": int(c.get("logical_sloc", 0)),
|
|
"physical_sloc": int(c.get("physical_sloc", 0)),
|
|
}
|
|
except Exception:
|
|
fm.countings = None
|
|
# per-file metrics
|
|
try:
|
|
m = _analyze_metrics(abs_path)
|
|
fm.metrics = {
|
|
"avg_cc": float(m.get("avg_cc", 0.0)),
|
|
"max_cc": int(m.get("max_cc", 0)),
|
|
"func_count": int(m.get("func_count", 0)),
|
|
"mi": float(m.get("mi", 0.0)),
|
|
}
|
|
except Exception:
|
|
fm.metrics = None
|
|
except Exception:
|
|
pass
|
|
|
|
# If profile provides languages, determine allowed extensions and
|
|
# filter `files_meta` BEFORE creating snapshot so unwanted files
|
|
# (e.g. backups, .txt dumps) are not copied into the baseline.
|
|
try:
|
|
from ..config import profiles as profiles_cfg
|
|
from ..config.languages import LANGUAGE_EXTENSIONS
|
|
except Exception:
|
|
profiles_cfg = None
|
|
LANGUAGE_EXTENSIONS = {}
|
|
|
|
allowed_exts = None
|
|
if profile_name and profiles_cfg:
|
|
pr = profiles_cfg.find_profile(profile_name)
|
|
if pr:
|
|
exts = []
|
|
for ln in pr.get("languages", []) or []:
|
|
if ln in LANGUAGE_EXTENSIONS:
|
|
exts.extend(LANGUAGE_EXTENSIONS[ln])
|
|
if exts:
|
|
allowed_exts = list(set(exts))
|
|
|
|
if allowed_exts:
|
|
allowed_set = set(e.lower() for e in allowed_exts)
|
|
from pathlib import Path as _Path
|
|
|
|
filtered = []
|
|
for fm in files_meta:
|
|
try:
|
|
suf = _Path(fm.path).suffix.lower()
|
|
except Exception:
|
|
suf = ""
|
|
if suf in allowed_set:
|
|
filtered.append(fm)
|
|
files_meta = filtered
|
|
|
|
metadata = BaselineMetadata(
|
|
baseline_id=baseline_id,
|
|
created_at=time.time(),
|
|
source="local",
|
|
origin=None,
|
|
project_root=dir_path,
|
|
files=files_meta,
|
|
profile=profile_name,
|
|
) # Save metadata
|
|
meta_path = self.get_metadata_path(baseline_id)
|
|
with open(meta_path, "w", encoding="utf-8") as f:
|
|
json.dump(self._metadata_to_dict(metadata), f, indent=2)
|
|
|
|
# Optionally store a snapshot
|
|
if snapshot:
|
|
snapshot_dir = os.path.join(dest, "files")
|
|
os.makedirs(snapshot_dir, exist_ok=True)
|
|
# Copy only the files that were included in the baseline (respecting ignore patterns)
|
|
for fm in files_meta:
|
|
src_file = os.path.join(dir_path, fm.path)
|
|
dst_file = os.path.join(snapshot_dir, fm.path)
|
|
# Create parent directories if needed
|
|
dst_parent = os.path.dirname(dst_file)
|
|
if dst_parent:
|
|
os.makedirs(dst_parent, exist_ok=True)
|
|
try:
|
|
shutil.copy2(src_file, dst_file) # copy2 preserves metadata
|
|
except Exception:
|
|
pass # skip files that cannot be copied
|
|
|
|
# Optionally create zip archive (controlled by settings)
|
|
# Check if user wants zip archives (for space savings at cost of speed)
|
|
try:
|
|
from ..config import settings as app_settings
|
|
|
|
if app_settings.get_zip_baselines():
|
|
zip_path = os.path.join(dest, "files.zip")
|
|
shutil.make_archive(
|
|
base_name=zip_path[:-4], format="zip", root_dir=snapshot_dir
|
|
)
|
|
except Exception:
|
|
pass # if settings not available or zip fails, continue without zip
|
|
|
|
# Generate UCC-style reports (countings, metrics, duplicates) inside baseline folder
|
|
try:
|
|
from ..utils.ucc_report_generator import UCCReportGenerator
|
|
from ..config import profiles as profiles_cfg
|
|
from ..config.languages import LANGUAGE_EXTENSIONS
|
|
from ..config import settings as app_settings
|
|
from ..core import duplicates as dupmod
|
|
|
|
# Prepare counting and metrics lists for report generator
|
|
counting_results = []
|
|
metrics_results = []
|
|
# Attempt to ensure per-file `functions` details are present in metrics results.
|
|
# In some environments `lizard` may not have been available at the time
|
|
# the initial per-file analysis ran; re-run metrics analysis on the
|
|
# snapshot file when possible to obtain function-level details.
|
|
try:
|
|
from ..core.metrics import analyze_file_metrics as _analyze_metrics
|
|
except Exception:
|
|
_analyze_metrics = None
|
|
|
|
for fm in files_meta:
|
|
abs_snapshot = os.path.join(snapshot_dir, fm.path)
|
|
if fm.countings:
|
|
row = dict(fm.countings)
|
|
row["file"] = abs_snapshot
|
|
# language may be derived from extension if available
|
|
row["language"] = ""
|
|
counting_results.append(row)
|
|
|
|
# Start from existing metrics (if any)
|
|
mdata = fm.metrics if fm.metrics else None
|
|
# If we don't have function-level data and analyzer available, re-run
|
|
if _analyze_metrics and (not mdata or not mdata.get("functions")):
|
|
try:
|
|
# run analyzer on the snapshot copy to ensure consistent paths
|
|
rean = _analyze_metrics(abs_snapshot)
|
|
if rean:
|
|
mdata = rean
|
|
except Exception:
|
|
pass
|
|
|
|
if mdata:
|
|
mrow = dict(mdata)
|
|
mrow["file"] = abs_snapshot
|
|
metrics_results.append(mrow)
|
|
|
|
# Determine allowed extensions from profile (if provided)
|
|
allowed_exts = None
|
|
if profile_name:
|
|
pr = profiles_cfg.find_profile(profile_name)
|
|
if pr:
|
|
exts = []
|
|
for ln in pr.get("languages", []) or []:
|
|
if ln in LANGUAGE_EXTENSIONS:
|
|
exts.extend(LANGUAGE_EXTENSIONS[ln])
|
|
if exts:
|
|
allowed_exts = list(set(exts))
|
|
|
|
# If allowed_exts is set, prune files_meta to keep only matching extensions.
|
|
# This prevents copying/reporting files that are not part of the profile languages
|
|
# (e.g., backup files, text dumps, etc.). Extensions in LANGUAGE_EXTENSIONS
|
|
# include the leading dot; compare case-insensitively.
|
|
if allowed_exts:
|
|
allowed_set = set(e.lower() for e in allowed_exts)
|
|
filtered = []
|
|
from pathlib import Path as _Path
|
|
|
|
for fm in files_meta:
|
|
try:
|
|
suf = _Path(fm.path).suffix.lower()
|
|
except Exception:
|
|
suf = ""
|
|
if suf in allowed_set:
|
|
filtered.append(fm)
|
|
files_meta = filtered
|
|
|
|
# Load duplicates search settings (threshold, k, window)
|
|
dup_settings = app_settings.get_duplicates_settings() or {}
|
|
thr = dup_settings.get("threshold", 5.0)
|
|
k = dup_settings.get("k", 25)
|
|
window = dup_settings.get("window", 4)
|
|
|
|
# Run duplicate finder on the snapshot directory (so reports are self-contained)
|
|
try:
|
|
# compute snapshot file list via scanner when available and pass it
|
|
try:
|
|
from .scanner import find_source_files as _find_src
|
|
|
|
snap_files = _find_src(
|
|
Path(snapshot_dir), allowed_extensions=allowed_exts
|
|
)
|
|
snap_list = [str(p) for p in snap_files]
|
|
except Exception:
|
|
snap_list = None
|
|
|
|
dup_res = dupmod.find_duplicates_in_dir(
|
|
root=snapshot_dir,
|
|
extensions=allowed_exts,
|
|
dup_threshold=thr,
|
|
k=k,
|
|
window=window,
|
|
file_list=snap_list,
|
|
)
|
|
# convert to list of dicts
|
|
dup_rows = []
|
|
for a, b in dup_res.get("exact", []):
|
|
dup_rows.append(
|
|
{
|
|
"file_a": a,
|
|
"file_b": b,
|
|
"match_type": "exact",
|
|
"pct_change": 0,
|
|
}
|
|
)
|
|
for a, b in dup_res.get("fuzzy", []):
|
|
dup_rows.append(
|
|
{
|
|
"file_a": a,
|
|
"file_b": b,
|
|
"match_type": "fuzzy",
|
|
"pct_change": f"<={thr}%",
|
|
}
|
|
)
|
|
except Exception:
|
|
dup_rows = []
|
|
|
|
# Write reports into baseline folder (non-fatal)
|
|
try:
|
|
count_path = os.path.join(dest, "countings_report.txt")
|
|
UCCReportGenerator.generate_counting_report(
|
|
results=counting_results,
|
|
output_path=Path(count_path),
|
|
command_description=f"PyUcc Counting Analysis - Baseline: {baseline_id}",
|
|
base_path=dir_path,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
metrics_path = os.path.join(dest, "metrics_report.txt")
|
|
UCCReportGenerator.generate_metrics_report(
|
|
results=metrics_results,
|
|
output_path=Path(metrics_path),
|
|
command_description=f"PyUcc Metrics Analysis - Baseline: {baseline_id}",
|
|
base_path=dir_path,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
dup_path = os.path.join(dest, "duplicates_report.txt")
|
|
UCCReportGenerator.generate_duplicates_report(
|
|
duplicates=dup_rows,
|
|
output_path=Path(dup_path),
|
|
command_description=f"PyUcc Duplicate Analysis - Baseline: {baseline_id}",
|
|
base_path=dir_path,
|
|
params={
|
|
"threshold": thr,
|
|
"extensions": allowed_exts,
|
|
"k": k,
|
|
"window": window,
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
# Do not fail baseline creation if report generation has problems
|
|
pass
|
|
|
|
# Prune old baselines if requested
|
|
if max_keep > 0:
|
|
self._prune_old_baselines(dir_path, profile_name, max_keep)
|
|
|
|
return baseline_id
|
|
|
|
def _prune_old_baselines(
|
|
self, project_root: str, profile_name: Optional[str], keep: int = 5
|
|
):
|
|
"""Prune older baselines for the same project and profile, keeping `keep` newest."""
|
|
# scan baselines root and load metadata for each baseline
|
|
entries = [] # list of (created_at, baseline_id, path)
|
|
for bn in os.listdir(self.baselines_root):
|
|
bdir = os.path.join(self.baselines_root, bn)
|
|
if not os.path.isdir(bdir):
|
|
continue
|
|
meta_path = os.path.join(bdir, "metadata.json")
|
|
if not os.path.exists(meta_path):
|
|
continue
|
|
try:
|
|
with open(meta_path, "r", encoding="utf-8") as f:
|
|
j = json.load(f)
|
|
except Exception:
|
|
continue
|
|
# match by project_root and profile
|
|
if j.get("project_root") != project_root:
|
|
continue
|
|
if profile_name is None:
|
|
if j.get("profile") is not None:
|
|
continue
|
|
else:
|
|
if j.get("profile") != profile_name:
|
|
continue
|
|
entries.append((j.get("created_at", 0), j.get("baseline_id", bn), bdir))
|
|
|
|
# sort by created_at descending (newest first)
|
|
entries.sort(key=lambda x: x[0], reverse=True)
|
|
# remove entries beyond keep
|
|
for _, bid, path in entries[keep:]:
|
|
try:
|
|
shutil.rmtree(path)
|
|
except Exception:
|
|
pass
|
|
|
|
def create_baseline_from_git(
|
|
self,
|
|
repo_path: str,
|
|
commit_ref: str = "HEAD",
|
|
baseline_id: Optional[str] = None,
|
|
snapshot: bool = True,
|
|
compute_sha1: bool = True,
|
|
ignore_patterns: Optional[List[str]] = None,
|
|
profile_name: Optional[str] = None,
|
|
max_keep: int = 5,
|
|
file_list: Optional[List[str]] = None,
|
|
) -> str:
|
|
"""Create a baseline by exporting a git commit (using `git archive`).
|
|
|
|
This method requires that `git` is available in PATH. It will create a zip
|
|
archive of the requested commit and then build the baseline metadata from
|
|
the extracted tree.
|
|
"""
|
|
repo_path = os.path.abspath(repo_path)
|
|
if baseline_id is None:
|
|
ts = time.strftime("%Y%m%dT%H%M%S")
|
|
if profile_name:
|
|
safe_profile = profile_name.replace(" ", "_")
|
|
baseline_id = f"{safe_profile}__{ts}_git_{commit_ref}"
|
|
else:
|
|
baseline_id = f"{ts}_git_{commit_ref}"
|
|
dest = self._baseline_dir(baseline_id)
|
|
if os.path.exists(dest):
|
|
raise FileExistsError(dest)
|
|
os.makedirs(dest, exist_ok=False)
|
|
|
|
# create a temporary zip with git archive
|
|
zip_tmp = os.path.join(dest, "export.zip")
|
|
try:
|
|
subprocess.run(
|
|
["git", "archive", "--format=zip", "-o", zip_tmp, commit_ref],
|
|
cwd=repo_path,
|
|
check=True,
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"git archive failed: {e}")
|
|
|
|
# extract zip to a temp dir and build metadata similarly to dir baseline
|
|
extract_dir = os.path.join(dest, "extracted")
|
|
os.makedirs(extract_dir, exist_ok=True)
|
|
shutil.unpack_archive(zip_tmp, extract_dir)
|
|
|
|
# reuse create_baseline_from_dir logic but avoid creating nested baseline dir
|
|
files_meta: List[FileMeta] = []
|
|
ignore_patterns = ignore_patterns or []
|
|
|
|
# If caller provided explicit file list (relative to extract_dir or absolute), use it
|
|
try:
|
|
from .scanner import find_source_files
|
|
except Exception:
|
|
find_source_files = None
|
|
|
|
if file_list is not None:
|
|
for f in file_list:
|
|
try:
|
|
p = Path(f)
|
|
if not p.is_absolute():
|
|
p = Path(extract_dir) / p
|
|
if not p.is_file():
|
|
continue
|
|
rel_unix = os.path.relpath(str(p), extract_dir).replace("\\", "/")
|
|
st = p.stat()
|
|
sha1 = None
|
|
if compute_sha1 and st.st_size > 0:
|
|
try:
|
|
sha1 = _sha1_of_file(p)
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1
|
|
)
|
|
)
|
|
except Exception:
|
|
continue
|
|
else:
|
|
# Prefer scanner if available
|
|
if find_source_files:
|
|
try:
|
|
src_files = find_source_files(
|
|
Path(extract_dir), ignore_patterns=ignore_patterns
|
|
)
|
|
except Exception:
|
|
src_files = []
|
|
for p in src_files:
|
|
try:
|
|
rel_unix = os.path.relpath(str(p), extract_dir).replace(
|
|
"\\", "/"
|
|
)
|
|
st = p.stat()
|
|
sha1 = None
|
|
if compute_sha1 and st.st_size > 0:
|
|
try:
|
|
sha1 = _sha1_of_file(p)
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel_unix,
|
|
size=st.st_size,
|
|
mtime=st.st_mtime,
|
|
sha1=sha1,
|
|
)
|
|
)
|
|
except Exception:
|
|
continue
|
|
else:
|
|
for root, dirs, files in os.walk(extract_dir):
|
|
for fn in files:
|
|
fpath = os.path.join(root, fn)
|
|
rel = os.path.relpath(fpath, extract_dir)
|
|
rel_unix = rel.replace("\\", "/")
|
|
# apply ignore patterns
|
|
ignored = False
|
|
for pat in ignore_patterns:
|
|
if fnmatch.fnmatch(rel_unix, pat) or fnmatch.fnmatch(
|
|
fn, pat
|
|
):
|
|
ignored = True
|
|
break
|
|
if ignored:
|
|
continue
|
|
try:
|
|
st = os.stat(fpath)
|
|
except OSError:
|
|
continue
|
|
sha1 = None
|
|
if compute_sha1 and st.st_size > 0:
|
|
try:
|
|
sha1 = _sha1_of_file(Path(fpath))
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel_unix,
|
|
size=st.st_size,
|
|
mtime=st.st_mtime,
|
|
sha1=sha1,
|
|
)
|
|
)
|
|
|
|
# attempt to run per-file analyzers on the extracted tree and attach results
|
|
try:
|
|
from ..core.countings_impl import (
|
|
analyze_file_counts as _analyze_file_counts,
|
|
)
|
|
from ..core.metrics import analyze_file_metrics as _analyze_metrics
|
|
|
|
for fm in files_meta:
|
|
abs_path = os.path.join(extract_dir, fm.path)
|
|
try:
|
|
c = _analyze_file_counts(Path(abs_path))
|
|
fm.countings = {
|
|
"physical_lines": int(c.get("physical_lines", 0)),
|
|
"code_lines": int(c.get("code_lines", 0)),
|
|
"comment_lines": int(c.get("comment_lines", 0)),
|
|
"blank_lines": int(c.get("blank_lines", 0)),
|
|
# UCC extended metrics
|
|
"comment_whole": int(c.get("comment_whole", 0)),
|
|
"comment_embedded": int(c.get("comment_embedded", 0)),
|
|
"compiler_directives": int(c.get("compiler_directives", 0)),
|
|
"data_declarations": int(c.get("data_declarations", 0)),
|
|
"exec_instructions": int(c.get("exec_instructions", 0)),
|
|
"logical_sloc": int(c.get("logical_sloc", 0)),
|
|
"physical_sloc": int(c.get("physical_sloc", 0)),
|
|
}
|
|
except Exception:
|
|
fm.countings = None
|
|
try:
|
|
m = _analyze_metrics(abs_path)
|
|
fm.metrics = {
|
|
"avg_cc": float(m.get("avg_cc", 0.0)),
|
|
"max_cc": int(m.get("max_cc", 0)),
|
|
"func_count": int(m.get("func_count", 0)),
|
|
"mi": float(m.get("mi", 0.0)),
|
|
}
|
|
except Exception:
|
|
fm.metrics = None
|
|
except Exception:
|
|
pass
|
|
|
|
metadata = BaselineMetadata(
|
|
baseline_id=baseline_id,
|
|
created_at=time.time(),
|
|
source="git",
|
|
origin=commit_ref,
|
|
project_root=repo_path,
|
|
files=files_meta,
|
|
profile=profile_name,
|
|
)
|
|
|
|
meta_path = self.get_metadata_path(baseline_id)
|
|
with open(meta_path, "w", encoding="utf-8") as f:
|
|
json.dump(self._metadata_to_dict(metadata), f, indent=2)
|
|
|
|
# Optionally keep the extracted tree (snapshot)
|
|
if snapshot:
|
|
# move extracted content into dest/files
|
|
snapshot_dir = os.path.join(dest, "files")
|
|
shutil.move(extract_dir, snapshot_dir)
|
|
|
|
# Optionally create zip archive from the files directory
|
|
try:
|
|
from ..config import settings as app_settings
|
|
|
|
if app_settings.get_zip_baselines():
|
|
# Keep both files/ and create files.zip
|
|
zip_archive = os.path.join(dest, "files.zip")
|
|
shutil.make_archive(
|
|
base_name=zip_archive[:-4], format="zip", root_dir=snapshot_dir
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Always remove the git export zip (it was just temporary)
|
|
try:
|
|
os.remove(zip_tmp)
|
|
except Exception:
|
|
pass
|
|
|
|
# Generate UCC-style reports inside baseline folder for git-created baseline
|
|
try:
|
|
from ..utils.ucc_report_generator import UCCReportGenerator
|
|
from ..config import profiles as profiles_cfg
|
|
from ..config.languages import LANGUAGE_EXTENSIONS
|
|
from ..config import settings as app_settings
|
|
from ..core import duplicates as dupmod
|
|
|
|
counting_results = []
|
|
metrics_results = []
|
|
for fm in files_meta:
|
|
abs_snapshot = os.path.join(snapshot_dir, fm.path)
|
|
if fm.countings:
|
|
row = dict(fm.countings)
|
|
row["file"] = abs_snapshot
|
|
row["language"] = ""
|
|
counting_results.append(row)
|
|
if fm.metrics:
|
|
mrow = dict(fm.metrics)
|
|
mrow["file"] = abs_snapshot
|
|
metrics_results.append(mrow)
|
|
|
|
allowed_exts = None
|
|
if profile_name:
|
|
pr = profiles_cfg.find_profile(profile_name)
|
|
if pr:
|
|
exts = []
|
|
for ln in pr.get("languages", []) or []:
|
|
if ln in LANGUAGE_EXTENSIONS:
|
|
exts.extend(LANGUAGE_EXTENSIONS[ln])
|
|
if exts:
|
|
allowed_exts = list(set(exts))
|
|
|
|
dup_settings = app_settings.get_duplicates_settings() or {}
|
|
thr = dup_settings.get("threshold", 5.0)
|
|
k = dup_settings.get("k", 25)
|
|
window = dup_settings.get("window", 4)
|
|
|
|
try:
|
|
# compute snapshot file list via scanner when available and pass it
|
|
try:
|
|
from .scanner import find_source_files as _find_src
|
|
|
|
snap_files = _find_src(
|
|
Path(snapshot_dir), allowed_extensions=allowed_exts
|
|
)
|
|
snap_list = [str(p) for p in snap_files]
|
|
except Exception:
|
|
snap_list = None
|
|
|
|
dup_res = dupmod.find_duplicates_in_dir(
|
|
root=snapshot_dir,
|
|
extensions=allowed_exts,
|
|
dup_threshold=thr,
|
|
k=k,
|
|
window=window,
|
|
file_list=snap_list,
|
|
)
|
|
dup_rows = []
|
|
for a, b in dup_res.get("exact", []):
|
|
dup_rows.append(
|
|
{
|
|
"file_a": a,
|
|
"file_b": b,
|
|
"match_type": "exact",
|
|
"pct_change": 0,
|
|
}
|
|
)
|
|
for a, b in dup_res.get("fuzzy", []):
|
|
dup_rows.append(
|
|
{
|
|
"file_a": a,
|
|
"file_b": b,
|
|
"match_type": "fuzzy",
|
|
"pct_change": f"<={thr}%",
|
|
}
|
|
)
|
|
except Exception:
|
|
dup_rows = []
|
|
|
|
try:
|
|
count_path = os.path.join(dest, "countings_report.txt")
|
|
UCCReportGenerator.generate_counting_report(
|
|
results=counting_results,
|
|
output_path=Path(count_path),
|
|
command_description=f"PyUcc Counting Analysis - Baseline: {baseline_id}",
|
|
base_path=repo_path,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
metrics_path = os.path.join(dest, "metrics_report.txt")
|
|
UCCReportGenerator.generate_metrics_report(
|
|
results=metrics_results,
|
|
output_path=Path(metrics_path),
|
|
command_description=f"PyUcc Metrics Analysis - Baseline: {baseline_id}",
|
|
base_path=repo_path,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
dup_path = os.path.join(dest, "duplicates_report.txt")
|
|
UCCReportGenerator.generate_duplicates_report(
|
|
duplicates=dup_rows,
|
|
output_path=Path(dup_path),
|
|
command_description=f"PyUcc Duplicate Analysis - Baseline: {baseline_id}",
|
|
base_path=repo_path,
|
|
params={
|
|
"threshold": thr,
|
|
"extensions": allowed_exts,
|
|
"k": k,
|
|
"window": window,
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# remove extracted files and zip
|
|
shutil.rmtree(extract_dir, ignore_errors=True)
|
|
try:
|
|
os.remove(zip_tmp)
|
|
except Exception:
|
|
pass
|
|
|
|
# prune old baselines if requested
|
|
if max_keep > 0:
|
|
self._prune_old_baselines(repo_path, profile_name, max_keep)
|
|
|
|
return baseline_id
|
|
|
|
def load_metadata(self, baseline_id: str) -> BaselineMetadata:
|
|
meta_path = self.get_metadata_path(baseline_id)
|
|
with open(meta_path, "r", encoding="utf-8") as f:
|
|
j = json.load(f)
|
|
files = [FileMeta(**fm) for fm in j["files"]]
|
|
return BaselineMetadata(
|
|
baseline_id=j["baseline_id"],
|
|
created_at=j["created_at"],
|
|
source=j.get("source", "local"),
|
|
origin=j.get("origin"),
|
|
project_root=j.get("project_root", ""),
|
|
files=files,
|
|
profile=j.get("profile"),
|
|
)
|
|
|
|
def _metadata_to_dict(self, meta: BaselineMetadata) -> Dict:
|
|
d = asdict(meta)
|
|
# dataclass conversion
|
|
d["files"] = [asdict(fm) for fm in meta.files]
|
|
return d
|
|
|
|
|
|
class Differ:
|
|
def __init__(
|
|
self,
|
|
baseline: BaselineMetadata,
|
|
current_dir: str,
|
|
max_workers: int = 4,
|
|
ignore_patterns: Optional[List[str]] = None,
|
|
baseline_files_dir: Optional[str] = None,
|
|
current_file_list: Optional[List[FileMeta]] = None,
|
|
):
|
|
self.baseline = baseline
|
|
self.current_dir = os.path.abspath(current_dir)
|
|
self.max_workers = max_workers
|
|
# Normalize ignore patterns so entries like '.bak' become '*.bak'
|
|
try:
|
|
from .scanner import normalize_ignore_patterns
|
|
|
|
self.ignore_patterns = normalize_ignore_patterns(ignore_patterns) or []
|
|
except Exception:
|
|
self.ignore_patterns = ignore_patterns or []
|
|
# If caller passed a precomputed current file list, use it (avoids rescanning)
|
|
self._current_files_cache: Optional[List[FileMeta]] = current_file_list
|
|
# baseline_files_dir is the directory containing the baseline snapshot files
|
|
# If not provided, falls back to baseline.project_root (for backwards compatibility)
|
|
self.baseline_files_dir = (
|
|
baseline_files_dir if baseline_files_dir else baseline.project_root
|
|
)
|
|
|
|
def build_current_file_list(self) -> List[FileMeta]:
|
|
# Return cached result if already computed
|
|
if self._current_files_cache is not None:
|
|
return self._current_files_cache
|
|
|
|
files_meta: List[FileMeta] = []
|
|
# Prefer to use scanner.find_source_files so scanning rules are centralized
|
|
try:
|
|
from .scanner import find_source_files
|
|
except Exception:
|
|
find_source_files = None
|
|
|
|
# Derive allowed extensions from baseline profile if available
|
|
allowed_exts = None
|
|
try:
|
|
from ..config import profiles as profiles_cfg
|
|
from ..config.languages import LANGUAGE_EXTENSIONS
|
|
except Exception:
|
|
profiles_cfg = None
|
|
LANGUAGE_EXTENSIONS = {}
|
|
|
|
if self.baseline and self.baseline.profile and profiles_cfg:
|
|
pr = profiles_cfg.find_profile(self.baseline.profile)
|
|
if pr:
|
|
exts = []
|
|
for ln in pr.get("languages", []) or []:
|
|
if ln in LANGUAGE_EXTENSIONS:
|
|
exts.extend(LANGUAGE_EXTENSIONS[ln])
|
|
if exts:
|
|
allowed_exts = list(set(exts))
|
|
|
|
if find_source_files:
|
|
try:
|
|
src_files = find_source_files(
|
|
Path(self.current_dir),
|
|
allowed_extensions=allowed_exts,
|
|
ignore_patterns=self.ignore_patterns,
|
|
)
|
|
except Exception:
|
|
src_files = []
|
|
for p in src_files:
|
|
try:
|
|
rel_unix = os.path.relpath(str(p), self.current_dir).replace(
|
|
"\\", "/"
|
|
)
|
|
st = p.stat()
|
|
sha1 = None
|
|
try:
|
|
sha1 = _sha1_of_file(p)
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel_unix, size=st.st_size, mtime=st.st_mtime, sha1=sha1
|
|
)
|
|
)
|
|
except Exception:
|
|
continue
|
|
else:
|
|
for root, dirs, files in os.walk(self.current_dir):
|
|
for fn in files:
|
|
fpath = os.path.join(root, fn)
|
|
rel = os.path.relpath(fpath, self.current_dir)
|
|
# apply ignore patterns from profile: test against relative path (unix-style) and filename
|
|
rel_unix = rel.replace("\\", "/")
|
|
ignored = False
|
|
for pat in self.ignore_patterns or []:
|
|
# patterns are normalized and lower-cased by the scanner helper
|
|
if fnmatch.fnmatch(rel_unix.lower(), pat) or fnmatch.fnmatch(
|
|
fn.lower(), pat
|
|
):
|
|
ignored = True
|
|
break
|
|
if ignored:
|
|
continue
|
|
try:
|
|
st = os.stat(fpath)
|
|
except OSError:
|
|
continue
|
|
sha1 = None
|
|
# Compute SHA1 for all files, including 0-byte files
|
|
try:
|
|
sha1 = _sha1_of_file(Path(fpath))
|
|
except Exception:
|
|
sha1 = None
|
|
files_meta.append(
|
|
FileMeta(
|
|
path=rel.replace("\\", "/"),
|
|
size=st.st_size,
|
|
mtime=st.st_mtime,
|
|
sha1=sha1,
|
|
)
|
|
)
|
|
|
|
# Run per-file analyzers (countings + metrics) and attach results to each FileMeta
|
|
# This ensures current files have the same data as baseline files for comparison
|
|
try:
|
|
from ..core.countings_impl import (
|
|
analyze_file_counts as _analyze_file_counts,
|
|
)
|
|
from ..core.metrics import analyze_file_metrics as _analyze_metrics
|
|
|
|
for fm in files_meta:
|
|
abs_path = os.path.join(self.current_dir, fm.path)
|
|
# per-file counts
|
|
try:
|
|
c = _analyze_file_counts(Path(abs_path))
|
|
fm.countings = {
|
|
"physical_lines": int(c.get("physical_lines", 0)),
|
|
"code_lines": int(c.get("code_lines", 0)),
|
|
"comment_lines": int(c.get("comment_lines", 0)),
|
|
"blank_lines": int(c.get("blank_lines", 0)),
|
|
# UCC extended metrics
|
|
"comment_whole": int(c.get("comment_whole", 0)),
|
|
"comment_embedded": int(c.get("comment_embedded", 0)),
|
|
"compiler_directives": int(c.get("compiler_directives", 0)),
|
|
"data_declarations": int(c.get("data_declarations", 0)),
|
|
"exec_instructions": int(c.get("exec_instructions", 0)),
|
|
"logical_sloc": int(c.get("logical_sloc", 0)),
|
|
"physical_sloc": int(c.get("physical_sloc", 0)),
|
|
}
|
|
except Exception:
|
|
fm.countings = None
|
|
# per-file metrics
|
|
try:
|
|
m = _analyze_metrics(abs_path)
|
|
fm.metrics = {
|
|
"avg_cc": float(m.get("avg_cc", 0.0)),
|
|
"max_cc": int(m.get("max_cc", 0)),
|
|
"func_count": int(m.get("func_count", 0)),
|
|
"mi": float(m.get("mi", 0.0)),
|
|
}
|
|
except Exception:
|
|
fm.metrics = None
|
|
except Exception:
|
|
pass
|
|
|
|
# Cache the result to avoid recomputation
|
|
self._current_files_cache = files_meta
|
|
return files_meta
|
|
|
|
@staticmethod
|
|
def _index_by_name(files: List[FileMeta]) -> Dict[str, List[FileMeta]]:
|
|
idx: Dict[str, List[FileMeta]] = {}
|
|
for f in files:
|
|
name = os.path.basename(f.path)
|
|
idx.setdefault(name, []).append(f)
|
|
return idx
|
|
|
|
@staticmethod
|
|
def _levenshtein(a: str, b: str) -> int:
|
|
# simple DP implementation
|
|
la, lb = len(a), len(b)
|
|
if la == 0:
|
|
return lb
|
|
if lb == 0:
|
|
return la
|
|
prev = list(range(lb + 1))
|
|
for i, ca in enumerate(a, start=1):
|
|
cur = [i] + [0] * lb
|
|
for j, cb in enumerate(b, start=1):
|
|
add = prev[j] + 1
|
|
delete = cur[j - 1] + 1
|
|
change = prev[j - 1] + (0 if ca == cb else 1)
|
|
cur[j] = min(add, delete, change)
|
|
prev = cur
|
|
return prev[lb]
|
|
|
|
def match_files(
|
|
self, baseline_files: List[FileMeta], current_files: List[FileMeta]
|
|
) -> List[Tuple[Optional[FileMeta], Optional[FileMeta]]]:
|
|
# Implement Gale-Shapley stable matching inspired by UCC logic.
|
|
# Build maps by filename only (candidates must share the same filename)
|
|
mapA_by_name = self._index_by_name(baseline_files)
|
|
mapB_by_name = self._index_by_name(current_files)
|
|
|
|
# Build preference lists (for A: list of B candidates sorted by path distance)
|
|
prefsA: Dict[str, List[FileMeta]] = {} # key: a.path
|
|
prefsB: Dict[str, List[FileMeta]] = {}
|
|
|
|
# helper: compute preference value between two file paths (parent dirs)
|
|
def pref_val(pa: str, pb: str) -> int:
|
|
parent_a = os.path.dirname(pa)
|
|
parent_b = os.path.dirname(pb)
|
|
return self._levenshtein(parent_a, parent_b)
|
|
|
|
# populate preferences A -> Bs
|
|
for a in baseline_files:
|
|
candidates = mapB_by_name.get(os.path.basename(a.path), [])
|
|
# compute scores and sort
|
|
scored = [(pref_val(a.path, b.path), b) for b in candidates]
|
|
scored.sort(key=lambda x: x[0])
|
|
prefsA[a.path] = [b for (_s, b) in scored]
|
|
|
|
# populate preferences B -> As
|
|
for b in current_files:
|
|
candidates = mapA_by_name.get(os.path.basename(b.path), [])
|
|
scored = [(pref_val(a.path, b.path), a) for a in candidates]
|
|
scored.sort(key=lambda x: x[0])
|
|
prefsB[b.path] = [a for (_s, a) in scored]
|
|
|
|
# Prepare Gale-Shapley structures
|
|
freeA = [a for a in baseline_files]
|
|
next_proposal_index: Dict[str, int] = {a.path: 0 for a in baseline_files}
|
|
matchA: Dict[str, Optional[FileMeta]] = {a.path: None for a in baseline_files}
|
|
matchB: Dict[str, Optional[FileMeta]] = {b.path: None for b in current_files}
|
|
|
|
# For quick comparison, build rank maps for B preferences
|
|
rankB: Dict[str, Dict[str, int]] = {}
|
|
for b in current_files:
|
|
rank = {}
|
|
plist = prefsB.get(b.path, [])
|
|
for idx, a in enumerate(plist):
|
|
rank[a.path] = idx
|
|
rankB[b.path] = rank
|
|
|
|
while freeA:
|
|
a = freeA.pop(0)
|
|
a_key = a.path
|
|
plist = prefsA.get(a_key, [])
|
|
if not plist:
|
|
# no candidates
|
|
matchA[a_key] = None
|
|
continue
|
|
# propose to next candidate
|
|
i = next_proposal_index[a_key]
|
|
if i >= len(plist):
|
|
matchA[a_key] = None
|
|
continue
|
|
b = plist[i]
|
|
next_proposal_index[a_key] = i + 1
|
|
b_key = b.path
|
|
current = matchB.get(b_key)
|
|
if current is None:
|
|
# b accepts
|
|
matchA[a_key] = b
|
|
matchB[b_key] = a
|
|
else:
|
|
# b decides preference between current and proposer
|
|
rank_map = rankB.get(b_key, {})
|
|
r_current = rank_map.get(current.path, float("inf"))
|
|
r_proposer = rank_map.get(a_key, float("inf"))
|
|
if r_proposer < r_current:
|
|
# b prefers new proposer
|
|
matchA[a_key] = b
|
|
matchB[b_key] = a
|
|
# previous current becomes free again
|
|
matchA[current.path] = None
|
|
freeA.append(current)
|
|
else:
|
|
# b rejects proposer -> proposer remains free (if more prefs)
|
|
freeA.append(a)
|
|
|
|
# Build results list: pairs for matched A entries
|
|
results: List[Tuple[Optional[FileMeta], Optional[FileMeta]]] = []
|
|
usedB = set()
|
|
for a in baseline_files:
|
|
b = matchA.get(a.path)
|
|
if b is None:
|
|
results.append((a, None))
|
|
else:
|
|
results.append((a, b))
|
|
usedB.add(b.path)
|
|
|
|
# Any B not matched are added as (None, b)
|
|
for b in current_files:
|
|
if b.path not in usedB:
|
|
results.append((None, b))
|
|
|
|
return results
|
|
|
|
@staticmethod
|
|
def _diff_file_pair(fileA_path: Optional[str], fileB_path: Optional[str]) -> Dict:
|
|
res = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
|
|
if fileA_path is None and fileB_path is None:
|
|
return res
|
|
if fileA_path is None:
|
|
# all lines are added
|
|
try:
|
|
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
lines = f.readlines()
|
|
res["added"] = len(lines)
|
|
except Exception:
|
|
res["added"] = 0
|
|
return res
|
|
if fileB_path is None:
|
|
try:
|
|
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
lines = f.readlines()
|
|
res["deleted"] = len(lines)
|
|
except Exception:
|
|
res["deleted"] = 0
|
|
return res
|
|
|
|
# both exist; line-based diff
|
|
try:
|
|
with open(fileA_path, "r", encoding="utf-8", errors="ignore") as fa:
|
|
a_lines = fa.readlines()
|
|
except Exception:
|
|
a_lines = []
|
|
try:
|
|
with open(fileB_path, "r", encoding="utf-8", errors="ignore") as fb:
|
|
b_lines = fb.readlines()
|
|
except Exception:
|
|
b_lines = []
|
|
|
|
sm = difflib.SequenceMatcher(a=a_lines, b=b_lines)
|
|
|
|
# DEBUG: Log if files are identical but difflib finds differences
|
|
has_differences = False
|
|
for tag, i1, i2, j1, j2 in sm.get_opcodes():
|
|
if tag != "equal":
|
|
has_differences = True
|
|
break
|
|
|
|
if has_differences and len(a_lines) == len(b_lines):
|
|
# Files have same line count but difflib sees differences
|
|
print(f"[DIFFER] ANOMALY DETECTED:")
|
|
print(f" FileA: {fileA_path}")
|
|
print(f" FileB: {fileB_path}")
|
|
print(f" Lines: {len(a_lines)} vs {len(b_lines)}")
|
|
# Check first differing line
|
|
for i, (line_a, line_b) in enumerate(zip(a_lines, b_lines)):
|
|
if line_a != line_b:
|
|
print(f" First diff at line {i+1}:")
|
|
print(f" A: {repr(line_a[:80])}")
|
|
print(f" B: {repr(line_b[:80])}")
|
|
break
|
|
|
|
for tag, i1, i2, j1, j2 in sm.get_opcodes():
|
|
if tag == "equal":
|
|
res["unmodified"] += i2 - i1
|
|
elif tag == "delete":
|
|
res["deleted"] += i2 - i1
|
|
elif tag == "insert":
|
|
res["added"] += j2 - j1
|
|
elif tag == "replace":
|
|
la = i2 - i1
|
|
lb = j2 - j1
|
|
res["modified"] += min(la, lb)
|
|
if la > lb:
|
|
res["deleted"] += la - lb
|
|
elif lb > la:
|
|
res["added"] += lb - la
|
|
return res
|
|
|
|
def diff(self) -> Dict:
|
|
baseline_files = self.baseline.files
|
|
current_files = self.build_current_file_list()
|
|
pairs = self.match_files(baseline_files, current_files)
|
|
|
|
total = {"added": 0, "deleted": 0, "modified": 0, "unmodified": 0}
|
|
matched_results = []
|
|
|
|
# helper to construct absolute paths
|
|
def abs_path_for(meta: FileMeta) -> str:
|
|
return (
|
|
os.path.join(self.current_dir, meta.path) if meta is not None else None
|
|
)
|
|
|
|
# process pairs possibly in parallel
|
|
tasks = []
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as ex:
|
|
futures = []
|
|
for a, b in pairs:
|
|
fa = (
|
|
os.path.join(self.baseline_files_dir, a.path)
|
|
if a is not None
|
|
else None
|
|
)
|
|
fb = os.path.join(self.current_dir, b.path) if b is not None else None
|
|
futures.append(ex.submit(self._diff_file_pair, fa, fb))
|
|
for (a, b), fut in zip(pairs, futures):
|
|
res = fut.result()
|
|
total["added"] += res["added"]
|
|
total["deleted"] += res["deleted"]
|
|
total["modified"] += res["modified"]
|
|
total["unmodified"] += res["unmodified"]
|
|
|
|
# Extract countings and metrics from baseline and current files
|
|
baseline_countings = (
|
|
a.countings if (a is not None and hasattr(a, "countings")) else None
|
|
)
|
|
baseline_metrics = (
|
|
a.metrics if (a is not None and hasattr(a, "metrics")) else None
|
|
)
|
|
current_countings = (
|
|
b.countings if (b is not None and hasattr(b, "countings")) else None
|
|
)
|
|
current_metrics = (
|
|
b.metrics if (b is not None and hasattr(b, "metrics")) else None
|
|
)
|
|
|
|
# Compute deltas for countings
|
|
countings_delta = None
|
|
if baseline_countings and current_countings:
|
|
countings_delta = {
|
|
"physical_lines": current_countings.get("physical_lines", 0)
|
|
- baseline_countings.get("physical_lines", 0),
|
|
"code_lines": current_countings.get("code_lines", 0)
|
|
- baseline_countings.get("code_lines", 0),
|
|
"comment_lines": current_countings.get("comment_lines", 0)
|
|
- baseline_countings.get("comment_lines", 0),
|
|
"blank_lines": current_countings.get("blank_lines", 0)
|
|
- baseline_countings.get("blank_lines", 0),
|
|
# UCC extended deltas
|
|
"comment_whole": current_countings.get("comment_whole", 0)
|
|
- baseline_countings.get("comment_whole", 0),
|
|
"comment_embedded": current_countings.get("comment_embedded", 0)
|
|
- baseline_countings.get("comment_embedded", 0),
|
|
"compiler_directives": current_countings.get(
|
|
"compiler_directives", 0
|
|
)
|
|
- baseline_countings.get("compiler_directives", 0),
|
|
"data_declarations": current_countings.get(
|
|
"data_declarations", 0
|
|
)
|
|
- baseline_countings.get("data_declarations", 0),
|
|
"exec_instructions": current_countings.get(
|
|
"exec_instructions", 0
|
|
)
|
|
- baseline_countings.get("exec_instructions", 0),
|
|
"logical_sloc": current_countings.get("logical_sloc", 0)
|
|
- baseline_countings.get("logical_sloc", 0),
|
|
"physical_sloc": current_countings.get("physical_sloc", 0)
|
|
- baseline_countings.get("physical_sloc", 0),
|
|
}
|
|
# DEBUG LOGGING: Show comparison details when there's a delta
|
|
if any(v != 0 for v in countings_delta.values()):
|
|
fileA_path = a.path if a else "None"
|
|
fileB_path = b.path if b else "None"
|
|
print(
|
|
f"[DIFFER] DELTA DETECTED for {fileA_path} vs {fileB_path}"
|
|
)
|
|
print(f" Baseline: {baseline_countings}")
|
|
print(f" Current: {current_countings}")
|
|
print(f" Delta: {countings_delta}")
|
|
|
|
# Compute deltas for metrics
|
|
metrics_delta = None
|
|
if baseline_metrics and current_metrics:
|
|
metrics_delta = {
|
|
"func_count": current_metrics.get("func_count", 0)
|
|
- baseline_metrics.get("func_count", 0),
|
|
"avg_cc": current_metrics.get("avg_cc", 0.0)
|
|
- baseline_metrics.get("avg_cc", 0.0),
|
|
"max_cc": current_metrics.get("max_cc", 0)
|
|
- baseline_metrics.get("max_cc", 0),
|
|
"mi": current_metrics.get("mi", 0.0)
|
|
- baseline_metrics.get("mi", 0.0),
|
|
}
|
|
|
|
matched_results.append(
|
|
{
|
|
"fileA": a.path if a is not None else None,
|
|
"fileB": b.path if b is not None else None,
|
|
"counts": res,
|
|
"baseline_countings": baseline_countings,
|
|
"current_countings": current_countings,
|
|
"countings_delta": countings_delta,
|
|
"baseline_metrics": baseline_metrics,
|
|
"current_metrics": current_metrics,
|
|
"metrics_delta": metrics_delta,
|
|
}
|
|
)
|
|
|
|
result = {
|
|
"baseline_id": self.baseline.baseline_id,
|
|
"compared_at": time.time(),
|
|
"total": total,
|
|
"pairs": matched_results,
|
|
}
|
|
|
|
# Compute summary statistics from baseline and current file metadata
|
|
try:
|
|
# Calculate baseline summary from baseline files (which have embedded countings/metrics)
|
|
baseline_counts = {
|
|
"physical_lines": 0,
|
|
"code_lines": 0,
|
|
"comment_lines": 0,
|
|
"blank_lines": 0,
|
|
"file_count": 0,
|
|
}
|
|
baseline_metrics = {
|
|
"file_count": 0,
|
|
"total_func_count": 0,
|
|
"avg_avg_cc": 0.0,
|
|
"avg_mi": 0.0,
|
|
}
|
|
baseline_metrics_count = 0
|
|
|
|
for fm in baseline_files:
|
|
if fm.countings:
|
|
baseline_counts["physical_lines"] += fm.countings.get(
|
|
"physical_lines", 0
|
|
)
|
|
baseline_counts["code_lines"] += fm.countings.get("code_lines", 0)
|
|
baseline_counts["comment_lines"] += fm.countings.get(
|
|
"comment_lines", 0
|
|
)
|
|
baseline_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
|
|
baseline_counts["file_count"] += 1
|
|
if fm.metrics:
|
|
baseline_metrics["total_func_count"] += fm.metrics.get(
|
|
"func_count", 0
|
|
)
|
|
baseline_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
|
|
baseline_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
|
|
baseline_metrics_count += 1
|
|
|
|
if baseline_metrics_count > 0:
|
|
baseline_metrics["avg_avg_cc"] /= baseline_metrics_count
|
|
baseline_metrics["avg_mi"] /= baseline_metrics_count
|
|
baseline_metrics["file_count"] = baseline_metrics_count
|
|
|
|
# Calculate current summary from current files (which have embedded countings/metrics)
|
|
current_counts = {
|
|
"physical_lines": 0,
|
|
"code_lines": 0,
|
|
"comment_lines": 0,
|
|
"blank_lines": 0,
|
|
"file_count": 0,
|
|
}
|
|
current_metrics = {
|
|
"file_count": 0,
|
|
"total_func_count": 0,
|
|
"avg_avg_cc": 0.0,
|
|
"avg_mi": 0.0,
|
|
}
|
|
current_metrics_count = 0
|
|
|
|
for fm in current_files:
|
|
if fm.countings:
|
|
current_counts["physical_lines"] += fm.countings.get(
|
|
"physical_lines", 0
|
|
)
|
|
current_counts["code_lines"] += fm.countings.get("code_lines", 0)
|
|
current_counts["comment_lines"] += fm.countings.get(
|
|
"comment_lines", 0
|
|
)
|
|
current_counts["blank_lines"] += fm.countings.get("blank_lines", 0)
|
|
current_counts["file_count"] += 1
|
|
if fm.metrics:
|
|
current_metrics["total_func_count"] += fm.metrics.get(
|
|
"func_count", 0
|
|
)
|
|
current_metrics["avg_avg_cc"] += fm.metrics.get("avg_cc", 0.0)
|
|
current_metrics["avg_mi"] += fm.metrics.get("mi", 0.0)
|
|
current_metrics_count += 1
|
|
|
|
if current_metrics_count > 0:
|
|
current_metrics["avg_avg_cc"] /= current_metrics_count
|
|
current_metrics["avg_mi"] /= current_metrics_count
|
|
current_metrics["file_count"] = current_metrics_count
|
|
|
|
# Compute deltas
|
|
delta_counts = {
|
|
"physical_lines": current_counts["physical_lines"]
|
|
- baseline_counts["physical_lines"],
|
|
"code_lines": current_counts["code_lines"]
|
|
- baseline_counts["code_lines"],
|
|
"comment_lines": current_counts["comment_lines"]
|
|
- baseline_counts["comment_lines"],
|
|
"blank_lines": current_counts["blank_lines"]
|
|
- baseline_counts["blank_lines"],
|
|
"file_count": current_counts["file_count"]
|
|
- baseline_counts["file_count"],
|
|
}
|
|
delta_metrics = {
|
|
"total_func_count": current_metrics["total_func_count"]
|
|
- baseline_metrics["total_func_count"],
|
|
"avg_avg_cc": current_metrics["avg_avg_cc"]
|
|
- baseline_metrics["avg_avg_cc"],
|
|
"avg_mi": current_metrics["avg_mi"] - baseline_metrics["avg_mi"],
|
|
}
|
|
|
|
result["summary"] = {
|
|
"baseline": {"countings": baseline_counts, "metrics": baseline_metrics},
|
|
"current": {"countings": current_counts, "metrics": current_metrics},
|
|
"delta": {"countings": delta_counts, "metrics": delta_metrics},
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|