SXXXXXXX_PyUCC/pyucc/core/duplicates.py

287 lines
9.0 KiB
Python

"""Minimal duplicate detection utilities inspired by UCC.
This module provides a simple, portable duplicate finder for use from the
CLI. It implements:
- exact duplicate detection via normalized SHA1
- a simple fuzzy duplicate check using line-based SequenceMatcher
The implementation favors clarity over performance and is meant as a
first-step integration. For large projects a winnowing/fingerprinting
approach should be preferred.
"""
from __future__ import annotations
import hashlib
import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from collections import Counter, defaultdict
import math
try:
from pygount import analysis as _pg_analysis # type: ignore
_HAS_PYGOUNT = True
except Exception:
_HAS_PYGOUNT = False
def _normalize_bytes(b: bytes) -> bytes:
if b.startswith(b"\xef\xbb\xbf"):
b = b[3:]
b = b.replace(b"\r\n", b"\n")
b = b.replace(b"\r", b"\n")
return b
def _sha1_of_bytes(b: bytes) -> str:
h = hashlib.sha1()
h.update(b)
return h.hexdigest()
def _read_normalized_text(path: Path) -> Optional[str]:
try:
raw = path.read_bytes()
except Exception:
return None
norm = _normalize_bytes(raw)
try:
return norm.decode("utf-8", errors="replace")
except Exception:
return None
def _lines_for_similarity(path: Path) -> List[str]:
"""Return a list of normalized lines suitable for sequence comparison.
Prefer using pygount to strip comments/blank when available; otherwise a
simple fallback (strip blank lines and rstrip) is used.
"""
text = _read_normalized_text(path)
if text is None:
return []
if _HAS_PYGOUNT:
try:
sa = _pg_analysis.SourceAnalysis.from_string(text, filename=str(path))
lines = []
# SourceAnalysis keeps per-line categories in .lines (pygount 3.x)
for ln in getattr(sa, "lines", []):
# ln is a Line object with .type in {"empty", "documentation", "source", ...}
if getattr(ln, "type", None) == "empty":
continue
if getattr(ln, "type", None) == "documentation":
continue
# treat source and string lines as code
lines.append(getattr(ln, "text", "").rstrip())
if lines:
return lines
except Exception:
# fall back to simple approach
pass
# fallback: keep non-empty lines and strip trailing spaces
return [l.rstrip() for l in text.splitlines() if l.strip()]
def _normalize_for_fingerprinting(text: str) -> str:
"""Normalize text for fingerprinting: remove leading/trailing whitespace
and collapse all whitespace to single space. Keep case (code is case-sensitive).
"""
# Replace all whitespace sequences with a single space
return " ".join(text.split())
def _compute_kgram_hashes(s: str, k: int) -> List[int]:
"""Compute rolling Rabin-Karp 64-bit hashes for all k-grams in s."""
if len(s) < k:
return []
mask = (1 << 64) - 1
base = 257
# initial hash
h = 0
for i in range(k):
h = ((h * base) + ord(s[i])) & mask
hashes = [h]
# precompute base^k
pow_k = 1
for _ in range(k):
pow_k = (pow_k * base) & mask
for i in range(k, len(s)):
h = (h * base - ord(s[i - k]) * pow_k + ord(s[i])) & mask
hashes.append(h)
return hashes
def _winnow_hashes(hashes: List[int], window: int) -> set:
"""Apply winnowing to list of hashes, returning a set of selected fingerprints."""
if not hashes:
return set()
if window <= 1:
return set(hashes)
fingerprints = set()
# Use deque-like sliding min selection but simple O(n*window)
n = len(hashes)
for i in range(0, n - window + 1):
window_slice = hashes[i : i + window]
min_hash = min(window_slice)
fingerprints.add(min_hash)
return fingerprints
def _fingerprints_for_text(text: str, k: int = 25, window: int = 4) -> set:
"""Return fingerprint set for given text using k-gram winnowing.
Defaults chosen to be conservative; for shorter inputs parameters adapt.
"""
norm = _normalize_for_fingerprinting(text)
if not norm:
return set()
# For small content adapt k
k_eff = min(k, max(3, len(norm) // 10))
hashes = _compute_kgram_hashes(norm, k_eff)
w_eff = min(window, max(1, len(hashes)))
fps = _winnow_hashes(hashes, w_eff)
return fps
def find_duplicates_in_dir(
root: Optional[str] = None,
extensions: Optional[List[str]] = None,
dup_threshold: float = 5.0,
k: int = 25,
window: int = 4,
file_list: Optional[List[str]] = None,
) -> Dict[str, List[Tuple[str, str]]]:
"""Scan `root` for duplicate files.
Returns a dict with two keys: `exact` and `fuzzy` listing pairs of
duplicate paths. `dup_threshold` is the maximal percent of changed
lines allowed to mark two files as duplicates (0..100).
"""
# Normalize extension list to a set with leading dots and lower-case
allowed_exts = None
if extensions:
allowed_exts = set()
for e in extensions:
ee = e.strip().lower()
if not ee:
continue
if not ee.startswith("."):
ee = "." + ee
allowed_exts.add(ee)
files: List[Path] = []
# If caller provided an explicit file list, use it (no additional filesystem walk)
if file_list is not None:
for f in file_list:
p = Path(f)
if not p.is_file():
continue
if allowed_exts and p.suffix.lower() not in allowed_exts:
continue
files.append(p)
else:
if not root:
return {"exact": [], "fuzzy": []}
rootp = Path(root)
for p in rootp.rglob("*"):
if not p.is_file():
continue
if allowed_exts and p.suffix.lower() not in allowed_exts:
continue
files.append(p)
# Normalize & sha1 for exact dup detection
idx_by_sha: Dict[str, List[Path]] = {}
content_cache: Dict[Path, str] = {}
for p in files:
try:
raw = p.read_bytes()
except Exception:
continue
norm = _normalize_bytes(raw)
sha = _sha1_of_bytes(norm)
idx_by_sha.setdefault(sha, []).append(p)
# store normalized text for later fuzzy compare
try:
content_cache[p] = norm.decode("utf-8", errors="replace")
except Exception:
content_cache[p] = ""
exact: List[Tuple[str, str]] = []
fuzzy: List[Tuple[str, str]] = []
# collect exact duplicates
for sha, group in idx_by_sha.items():
if len(group) > 1:
base = str(group[0])
for other in group[1:]:
exact.append((base, str(other)))
# fuzzy duplicates: winnowing + inverted index to avoid full O(n^2)
# Compute fingerprints for all files (use normalized content from cache)
fps_list: List[set] = []
for p in files:
txt = content_cache.get(p, None)
if txt is None:
txt = _read_normalized_text(p) or ""
# prefer using lines-only text to reduce noise
# join logical lines for fingerprinting
lines = _lines_for_similarity(p)
if lines:
text_for_fp = "\n".join(lines)
else:
text_for_fp = txt
fps = _fingerprints_for_text(text_for_fp, k=k, window=window)
fps_list.append(fps)
# Build inverted index from fingerprint -> file indices
inverted = defaultdict(set) # hash -> set(file_idx)
for idx, fps in enumerate(fps_list):
for h in fps:
inverted[h].add(idx)
seen_pairs = set()
for i, fps in enumerate(fps_list):
if not fps:
continue
# gather candidate files that share at least one fingerprint
candidate_counts = Counter()
for h in fps:
for j in inverted.get(h, []):
if j == i:
continue
candidate_counts[j] += 1
# Evaluate each candidate (only those with some shared fingerprints)
for j, cnt in candidate_counts.items():
if i >= j:
continue # handle pair only once
# quick size filter
try:
sa = files[i].stat().st_size
sb = files[j].stat().st_size
except Exception:
sa = sb = 0
size_diff_pct = 100.0 * abs(sa - sb) / max(sa, sb, 1)
if size_diff_pct > dup_threshold + 20.0:
continue
set_i = fps_list[i]
set_j = fps_list[j]
if not set_i or not set_j:
continue
inter = len(set_i & set_j)
union = len(set_i | set_j)
if union == 0:
continue
jaccard = inter / union
pct_change = 100.0 * (1.0 - jaccard)
if pct_change <= dup_threshold:
fuzzy.append((str(files[i]), str(files[j])))
return {"exact": exact, "fuzzy": fuzzy}