SXXXXXXX_GitUtility/gitutility/core/history_cleaner.py
2025-07-07 15:05:51 +02:00

331 lines
14 KiB
Python

# --- FILE: gitsync_tool/core/history_cleaner.py ---
import os
import shutil
import tempfile
import subprocess
from typing import Dict, List, Any, Tuple, Optional
# Importa usando il percorso assoluto dal pacchetto
from gitutility.logging_setup import log_handler
from gitutility.commands.git_commands import GitCommands, GitCommandError
class HistoryCleaner:
"""
Handles the analysis and purging of unwanted files from a Git repository's history.
This class orchestrates the use of 'git-filter-repo' for safe history rewriting.
"""
def __init__(self, git_commands: GitCommands):
"""
Initializes the HistoryCleaner.
Args:
git_commands (GitCommands): An instance for executing Git commands.
Raises:
TypeError: If git_commands is not a valid GitCommands instance.
"""
if not isinstance(git_commands, GitCommands):
raise TypeError("HistoryCleaner requires a GitCommands instance.")
self.git_commands: GitCommands = git_commands
log_handler.log_debug("HistoryCleaner initialized.", func_name="__init__")
@staticmethod
def _check_filter_repo_installed() -> bool:
"""
Checks if 'git-filter-repo' is installed and accessible in the system's PATH.
Returns:
bool: True if git-filter-repo is found, False otherwise.
"""
func_name = "_check_filter_repo_installed"
try:
# Execute with --version, which is a lightweight command.
# Use subprocess.run directly to avoid circular dependencies or complex setups.
subprocess.run(
["git-filter-repo", "--version"],
check=True,
capture_output=True,
text=True,
# On Windows, prevent console window from flashing
startupinfo=(
subprocess.STARTUPINFO(dwFlags=subprocess.STARTF_USESHOWWINDOW)
if os.name == "nt"
else None
),
)
log_handler.log_info(
"'git-filter-repo' is installed and accessible.", func_name=func_name
)
return True
except FileNotFoundError:
log_handler.log_error(
"'git-filter-repo' command not found. It must be installed and in the system's PATH.",
func_name=func_name,
)
return False
except (subprocess.CalledProcessError, Exception) as e:
log_handler.log_error(
f"Error checking for 'git-filter-repo': {e}", func_name=func_name
)
return False
def analyze_repo_for_purgeable_files(
self, repo_path: str
) -> List[Dict[str, Any]]:
"""
Analyzes the entire repository history to find committed files that
are now covered by .gitignore rules.
Args:
repo_path (str): The absolute path to the Git repository.
Returns:
List[Dict[str, Any]]: A list of dictionaries, where each dictionary
represents a file to be purged and contains
'path' and 'size' keys.
"""
func_name = "analyze_repo_for_purgeable_files"
log_handler.log_info(
f"Starting history analysis for purgeable files in '{repo_path}'...",
func_name=func_name,
)
purge_candidates: Dict[str, int] = {} # Use dict to store unique paths
try:
# 1. Get a list of all blobs (file versions) in the repository's history
# Returns a list of (hash, path) tuples
all_blobs = self.git_commands.list_all_historical_blobs(repo_path)
if not all_blobs:
log_handler.log_info(
"No historical file blobs found. Analysis complete.",
func_name=func_name,
)
return []
log_handler.log_debug(
f"Found {len(all_blobs)} total blobs. Checking against .gitignore...",
func_name=func_name,
)
# 2. Iterate and find files that are now ignored
for blob_hash, file_path in all_blobs:
# Avoid reprocessing a path we already identified as a candidate
if file_path in purge_candidates:
continue
# Check if the current .gitignore would ignore this path
if self.git_commands.check_if_would_be_ignored(repo_path, file_path):
# It's a candidate for purging. Get its size.
try:
blob_size = self.git_commands.get_blob_size(
repo_path, blob_hash
)
# Store the file path and its size. If a path appears multiple
# times with different hashes, we'll just keep the first one found.
purge_candidates[file_path] = blob_size
log_handler.log_debug(
f"Candidate for purge: '{file_path}' (Size: {blob_size} bytes)",
func_name=func_name,
)
except GitCommandError as size_err:
log_handler.log_warning(
f"Could not get size for blob {blob_hash} ('{file_path}'): {size_err}",
func_name=func_name,
)
# 3. Format the results for the GUI
# Convert dict to the list of dicts format
result_list = [
{"path": path, "size": size}
for path, size in purge_candidates.items()
]
# Sort by size descending for better presentation
result_list.sort(key=lambda x: x["size"], reverse=True)
log_handler.log_info(
f"Analysis complete. Found {len(result_list)} unique purgeable file paths.",
func_name=func_name,
)
return result_list
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Analysis failed due to a Git command error: {e}", func_name=func_name
)
raise # Re-raise to be handled by the async worker
except Exception as e:
log_handler.log_exception(
f"An unexpected error occurred during repository analysis: {e}",
func_name=func_name,
)
raise
def purge_files_from_history(
self,
repo_path: str,
files_to_remove: List[str],
remote_name: str,
remote_url: str,
) -> Tuple[bool, str]:
"""
Rewrites the repository's history to completely remove the specified files.
This is a DESTRUCTIVE operation.
Args:
repo_path (str): The absolute path to the Git repository.
files_to_remove (List[str]): A list of file paths to purge.
remote_name (str): The name of the remote to force-push to after cleaning.
remote_url (str): The URL of the remote, needed to re-add it after cleaning.
Returns:
Tuple[bool, str]: A tuple of (success_status, message).
"""
func_name = "purge_files_from_history"
log_handler.log_warning(
f"--- DESTRUCTIVE OPERATION STARTED: Purging {len(files_to_remove)} file paths from history in '{repo_path}' ---",
func_name=func_name,
)
# 1. Prerequisite check
if not self._check_filter_repo_installed():
error_msg = "'git-filter-repo' is not installed. This tool is required to safely clean the repository history."
log_handler.log_critical(error_msg, func_name=func_name)
return False, error_msg
if not files_to_remove:
return True, "No files were specified for removal. No action taken."
if not remote_url:
return False, "Remote URL is required to re-configure the remote after cleaning, but it was not provided."
# 2. Use a temporary file to list the paths for git-filter-repo
# This is safer than passing many arguments on the command line.
try:
with tempfile.NamedTemporaryFile(
mode="w", delete=False, encoding="utf-8", suffix=".txt"
) as tmp_file:
tmp_file_path = tmp_file.name
for file_path in files_to_remove:
# git-filter-repo expects paths to be literals, one per line
tmp_file.write(f"{file_path}\n")
log_handler.log_info(
f"Created temporary file with paths to remove: {tmp_file_path}",
func_name=func_name,
)
# 3. Run git-filter-repo
self.git_commands.run_filter_repo(repo_path, paths_file=tmp_file_path)
log_handler.log_info(
"History rewriting with git-filter-repo completed successfully.",
func_name=func_name,
)
# 4. ---<<< NUOVO PASSAGGIO CORRETTIVO >>>---
# Ri-aggiungi il remote che git-filter-repo ha rimosso.
log_handler.log_info(
f"Re-adding remote '{remote_name}' with URL '{remote_url}' after filtering...",
func_name=func_name,
)
# Dobbiamo prima verificare se esiste già (in rari casi potrebbe non essere rimosso).
# Se esiste, lo aggiorniamo, altrimenti lo aggiungiamo.
existing_remotes = self.git_commands.get_remotes(repo_path)
if remote_name in existing_remotes:
self.git_commands.set_remote_url(repo_path, remote_name, remote_url)
else:
self.git_commands.add_remote(repo_path, remote_name, remote_url)
log_handler.log_info(
f"Remote '{remote_name}' successfully re-configured.",
func_name=func_name
)
# ---<<< FINE NUOVO PASSAGGIO >>>---
# 5. Force push the rewritten history to the remote
log_handler.log_warning(
f"Force-pushing rewritten history to remote '{remote_name}'...",
func_name=func_name,
)
# 5. Force push the rewritten history to the remote
log_handler.log_warning(
f"Force-pushing rewritten history to remote '{remote_name}'...",
func_name=func_name,
)
# --- Get list of local branches before push ---
# Questo ci serve per sapere quali branch riconfigurare dopo
local_branches_before_push, _ = self.git_commands.list_branches(repo_path)
# Force push all branches
self.git_commands.force_push_all(repo_path, remote_name)
log_handler.log_info(
f"Force-pushed all branches to remote '{remote_name}'.",
func_name=func_name,
)
# Force push all tags
self.git_commands.force_push_tags(repo_path, remote_name)
log_handler.log_info(
f"Force-pushed all tags to remote '{remote_name}'.", func_name=func_name
)
# 6. ---<<< NUOVO PASSAGGIO CORRETTIVO 2 >>>---
# Re-establish upstream tracking for all local branches that were pushed.
log_handler.log_info(
"Re-establishing upstream tracking for local branches...",
func_name=func_name
)
for branch_name in local_branches_before_push:
try:
self.git_commands.set_branch_upstream(repo_path, branch_name, remote_name)
log_handler.log_debug(
f"Successfully set upstream for branch '{branch_name}' to '{remote_name}/{branch_name}'.",
func_name=func_name
)
except GitCommandError as upstream_err:
# Logga un avviso ma non far fallire l'intera operazione per questo.
# Potrebbe accadere se un branch locale non ha una controparte remota.
log_handler.log_warning(
f"Could not set upstream for branch '{branch_name}'. It might not exist on the remote. Error: {upstream_err}",
func_name=func_name
)
log_handler.log_info("Upstream tracking re-established.", func_name=func_name)
success_message = (
f"Successfully purged {len(files_to_remove)} file paths from history "
f"and force-pushed to remote '{remote_name}'.\n\n"
"IMPORTANT: Any other clones of this repository are now out of sync."
)
log_handler.log_info(success_message, func_name=func_name)
return True, success_message
except (GitCommandError, ValueError) as e:
error_msg = f"History cleaning failed: {e}"
log_handler.log_error(error_msg, func_name=func_name)
return False, error_msg
except Exception as e:
error_msg = f"An unexpected error occurred during history cleaning: {e}"
log_handler.log_exception(error_msg, func_name=func_name)
return False, error_msg
finally:
# Clean up the temporary file
if "tmp_file_path" in locals() and os.path.exists(tmp_file_path):
try:
os.remove(tmp_file_path)
log_handler.log_debug(
f"Cleaned up temporary file: {tmp_file_path}",
func_name=func_name,
)
except OSError as e:
log_handler.log_warning(
f"Could not remove temporary file {tmp_file_path}: {e}",
func_name=func_name,
)