SXXXXXXX_GitUtility/git_commands.py
VALLONGOL e3bf97eeb5 Chore: Stop tracking files based on .gitignore update.
Summary:
- Rule "*.ini" untracked 1 file(s).
- Rule "*.log" untracked 1 file(s).
- Rule "_build/" untracked 15 file(s).
- Rule "_dist/" untracked 946 file(s).
2025-04-18 13:13:39 +02:00

1103 lines
52 KiB
Python

# git_commands.py
import os
import subprocess
import logging
import re # Modulo per espressioni regolari (usato per validazione nomi)
# --- Definizione Eccezione Personalizzata ---
class GitCommandError(Exception):
"""
Custom exception for handling Git command errors.
Includes the original command and stderr details if available.
"""
def __init__(self, message, command=None, stderr=None):
"""
Initialize the GitCommandError.
Args:
message (str): The main error message.
command (list, optional): The command list that caused the error.
stderr (str, optional): The standard error output from the command.
"""
super().__init__(message)
self.command = command
self.stderr = stderr
def __str__(self):
"""Return a formatted string representation including command details."""
base_message = super().__str__()
details = []
if self.command:
# Safely convert all command parts to string for joining
safe_command = [str(part) for part in self.command]
command_str = " ".join(safe_command)
details.append(f"Command: '{command_str}'")
if self.stderr:
# Add stripped stderr if available
stderr_str = self.stderr.strip()
if stderr_str: # Only add if stderr has content
details.append(f"Stderr: {stderr_str}")
# Combine base message with details if details exist
if details:
details_str = "; ".join(details)
return f"{base_message} ({details_str})"
else:
return base_message
# --- Classe Principale per Comandi Git ---
class GitCommands:
"""
Manages Git command execution, logging, and error handling.
Decoupled from the GUI, operates on provided directory paths.
Includes functionalities for core operations, tags, and branches.
"""
def __init__(self, logger):
"""
Initializes the GitCommands class with a logger instance.
Args:
logger (logging.Logger): Instance for logging messages.
Raises:
ValueError: If the provided logger is not a valid logging.Logger.
"""
if not isinstance(logger, logging.Logger):
raise ValueError("A valid logging.Logger instance is required.")
self.logger = logger
def log_and_execute(self, command, working_directory, check=True, log_output_level=logging.INFO):
"""
Executes a shell command, logs details, handles errors, and controls output logging level.
Args:
command (list): Command and arguments as a list of strings.
working_directory (str): The directory to execute the command in.
check (bool, optional): If True, raises GitCommandError on non-zero exit. Defaults to True.
log_output_level (int, optional): Logging level for stdout/stderr on success.
Defaults to logging.INFO. Use logging.DEBUG
to hide noisy command output from INFO logs.
Returns:
subprocess.CompletedProcess: Result object.
Raises:
GitCommandError, ValueError, FileNotFoundError, PermissionError.
"""
# --- FINE MODIFICA ---
safe_command = [str(part) for part in command]
command_str = " ".join(safe_command)
# Log command execution always at DEBUG level
self.logger.debug(f"Executing in '{working_directory}': {command_str}")
# --- Validate Working Directory ---
# (Logica validazione working_directory invariata)
if not working_directory:
msg = "Working directory cannot be None or empty."
self.logger.error(msg)
raise ValueError(msg)
# Use '.' as a special case for current working directory if needed by caller
if working_directory == ".":
cwd = os.getcwd()
else:
abs_path = os.path.abspath(working_directory)
if not os.path.isdir(abs_path):
msg = f"Working directory does not exist or is not a directory: {abs_path}"
self.logger.error(msg)
raise GitCommandError(msg, command=safe_command)
cwd = abs_path
# self.logger.debug(f"Effective Working Directory: {cwd}") # Already logged above
# --- Execute Command ---
try:
# (Logica startupinfo/creationflags invariata)
startupinfo = None
creationflags = 0
if os.name == "nt":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
result = subprocess.run(
safe_command,
cwd=cwd,
capture_output=True,
text=True,
check=check,
encoding="utf-8",
errors="replace",
startupinfo=startupinfo,
creationflags=creationflags,
)
# Log command output based on specified level for success
# Error output is always logged at ERROR level below
if check or result.returncode == 0:
# --- MODIFICA: Usa log_output_level per l'output di successo ---
# Only log stdout/stderr if the requested level is met by the logger config
if self.logger.isEnabledFor(log_output_level):
stdout_log = result.stdout.strip() if result.stdout else "<no stdout>"
stderr_log = result.stderr.strip() if result.stderr else "<no stderr>"
# Use the passed level for logging the output
self.logger.log(
log_output_level,
f"Command successful. Output:\n"
f"--- stdout ---\n{stdout_log}\n"
f"--- stderr ---\n{stderr_log}\n---"
)
else:
# Log minimal success message if output level is suppressed
self.logger.debug(f"Command successful (output logging suppressed by level).")
# --- FINE MODIFICA ---
return result
except subprocess.CalledProcessError as e:
# (Gestione CalledProcessError invariata - logga sempre a ERROR)
stderr_err = e.stderr.strip() if e.stderr else "<no stderr>"
stdout_err = e.stdout.strip() if e.stdout else "<no stdout>"
error_log_msg = (
f"Command failed with return code {e.returncode} in '{cwd}'.\n"
f"--- command ---\n{command_str}\n"
f"--- stderr ---\n{stderr_err}\n"
f"--- stdout ---\n{stdout_err}\n---"
)
self.logger.error(error_log_msg)
raise GitCommandError(
f"Git command failed in '{cwd}'.", command=safe_command, stderr=e.stderr
) from e
except FileNotFoundError as e:
# (Gestione FileNotFoundError invariata)
error_msg = f"Command not found: '{safe_command[0]}'. Is Git installed and in system PATH? (Working directory: '{cwd}')"
self.logger.error(error_msg)
self.logger.debug(f"FileNotFoundError details: {e}")
raise GitCommandError(error_msg, command=safe_command) from e
except PermissionError as e:
# (Gestione PermissionError invariata)
error_msg = f"Permission denied executing command in '{cwd}'."
self.logger.error(error_msg)
self.logger.debug(f"PermissionError details: {e}")
raise GitCommandError(error_msg, command=safe_command, stderr=str(e)) from e
except Exception as e:
# (Gestione Exception generica invariata)
self.logger.exception(f"Unexpected error executing command in '{cwd}': {e}")
raise GitCommandError(
f"Unexpected command execution error: {e}", command=safe_command
) from e
# --- Core Repo Operations ---
def prepare_svn_for_git(self, working_directory):
"""
Prepares a directory for Git use: initializes if needed, ensures
.gitignore ignores '.svn'.
Args:
working_directory (str): Path to the target directory.
Raises:
GitCommandError, ValueError: If operations fail.
"""
self.logger.info(f"Preparing directory for Git: '{working_directory}'")
# Basic validation
if not working_directory:
raise ValueError("Working directory cannot be None or empty.")
if not os.path.isdir(working_directory):
raise GitCommandError(f"Directory does not exist: {working_directory}")
# Define relevant paths
gitignore_path = os.path.join(working_directory, ".gitignore")
git_dir_path = os.path.join(working_directory, ".git")
# 1. Initialize Git repository if '.git' directory doesn't exist
if not os.path.exists(git_dir_path):
self.logger.info("No existing Git repository found. Initializing...")
try:
init_command = ["git", "init"]
# Use check=True to ensure init succeeds
self.log_and_execute(init_command, working_directory, check=True)
self.logger.info("Git repository initialized successfully.")
except (GitCommandError, ValueError) as e:
# Handle specific errors from git init
self.logger.error(f"Failed to initialize Git repository: {e}")
raise # Re-raise to signal preparation failure
except Exception as e:
# Handle unexpected errors during init
self.logger.exception(f"Unexpected error initializing repository: {e}")
raise GitCommandError(f"Unexpected init error: {e}") from e
else:
# Repository already exists
self.logger.info("Git repository already exists. Skipping initialization.")
# 2. Ensure .gitignore handles '.svn'
self.logger.debug(f"Checking/updating .gitignore file: {gitignore_path}")
try:
svn_ignore_entry = ".svn" # Entry to ensure is ignored
needs_write = False # Flag to track if file needs modification
content_to_write = "" # Content to add if needed
if not os.path.exists(gitignore_path):
# .gitignore doesn't exist, create it with the entry
self.logger.info(
"'.gitignore' file not found. Creating with .svn entry."
)
content_to_write = f"{svn_ignore_entry}\n"
needs_write = True
else:
# .gitignore exists, check if '.svn' is already ignored
try:
with open(gitignore_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# Check ignores .svn directory or files within it
is_ignored = any(
line.strip() == svn_ignore_entry
or line.strip().startswith(svn_ignore_entry + "/")
for line in lines
)
if not is_ignored:
# Not ignored, prepare to append the entry
self.logger.info(
f"'{svn_ignore_entry}' not found. Appending..."
)
current_content = "".join(lines)
# Ensure it's added on a new line
if not current_content.endswith("\n"):
content_to_write = f"\n{svn_ignore_entry}\n"
else:
content_to_write = f"{svn_ignore_entry}\n"
needs_write = True # Mark file for update
else:
# Already ignored, no action needed
self.logger.info(f"'{svn_ignore_entry}' entry already present.")
except IOError as e:
# Handle error reading the existing file
self.logger.warning(
f"Could not read existing '.gitignore': {e}. "
f"Cannot verify {svn_ignore_entry} entry."
)
# Decide on recovery: maybe try appending anyway? Risky.
# For safety, let's not modify if we can't read it.
needs_write = False
# Write to file only if modification is necessary
if needs_write:
# Use 'a'ppend mode if file exists, 'w'rite mode if new
mode = "a" if os.path.exists(gitignore_path) else "w"
try:
# Write with consistent newline handling
with open(
gitignore_path, mode, encoding="utf-8", newline="\n"
) as f:
f.write(content_to_write)
self.logger.info("Updated '.gitignore' file successfully.")
except IOError as e:
# Handle error during write/append
self.logger.error(f"Error writing to '.gitignore': {e}")
raise GitCommandError(f"Failed to update .gitignore: {e}") from e
except IOError as e: # Catch errors from os.path.exists or final write attempt
self.logger.error(f"Error accessing or writing '.gitignore': {e}")
raise GitCommandError(f"File I/O error for .gitignore: {e}") from e
except Exception as e: # Catch other unexpected errors
self.logger.exception(f"Unexpected error managing '.gitignore': {e}")
raise GitCommandError(f"Unexpected error with .gitignore: {e}") from e
self.logger.info(f"Directory preparation complete for '{working_directory}'.")
def create_git_bundle(self, working_directory, bundle_path):
"""Creates a Git bundle file containing all refs."""
# Normalize path for cross-OS compatibility and command usage
normalized_bundle_path = os.path.normpath(bundle_path)
normalized_bundle_path = normalized_bundle_path.replace("\\", "/")
command = ["git", "bundle", "create", normalized_bundle_path, "--all"]
self.logger.info(f"Attempting to create bundle: {normalized_bundle_path}")
try:
# Use check=False to manually handle 'empty bundle' warning
result = self.log_and_execute(command, working_directory, check=False)
if result.returncode != 0:
# Command failed, check stderr for specific non-fatal cases
stderr_lower = result.stderr.lower() if result.stderr else ""
if "refusing to create empty bundle" in stderr_lower:
# This is a warning, not an error for our purposes
self.logger.warning(
f"Bundle creation skipped: No commits in '{working_directory}'."
)
# Do not raise error, let caller handle info
else:
# An actual error occurred
error_msg = f"Bundle command failed (code {result.returncode})."
raise GitCommandError(
error_msg, command=command, stderr=result.stderr
)
# Check file existence and size even on success (paranoid check)
elif (
not os.path.exists(normalized_bundle_path)
or os.path.getsize(normalized_bundle_path) == 0
):
self.logger.warning(f"Bundle cmd success, but file missing/empty.")
else:
self.logger.info(
f"Bundle created successfully: '{normalized_bundle_path}'."
)
except (GitCommandError, ValueError) as e:
self.logger.error(f"Failed create bundle for '{working_directory}': {e}")
raise
except Exception as e:
self.logger.exception(f"Unexpected bundle creation error: {e}")
raise GitCommandError(
f"Unexpected bundle error: {e}", command=command
) from e
def fetch_from_git_bundle(self, working_directory, bundle_path):
"""Fetches from a bundle file and merges fetched refs."""
normalized_bundle_path = os.path.normpath(bundle_path).replace("\\", "/")
self.logger.info(
f"Fetching from '{normalized_bundle_path}' into '{working_directory}'"
)
fetch_command = ["git", "fetch", normalized_bundle_path]
# Merge FETCH_HEAD, creating merge commit if necessary (--no-ff)
merge_command = ["git", "merge", "FETCH_HEAD", "--no-ff"]
try:
# 1. Fetch
self.logger.debug("Executing fetch...")
self.log_and_execute(fetch_command, working_directory, check=True)
self.logger.info("Fetch successful.")
# 2. Merge
self.logger.debug("Executing merge...")
# Use check=False as conflicts are expected failures
merge_result = self.log_and_execute(
merge_command, working_directory, check=False
)
# Analyze merge result
stdout_log = merge_result.stdout.strip() if merge_result.stdout else ""
stderr_log = merge_result.stderr.strip() if merge_result.stderr else ""
if merge_result.returncode == 0:
# Success or already up-to-date
if "already up to date" in stdout_log.lower():
self.logger.info("Repository already up-to-date.")
else:
self.logger.info("Merge successful.")
else:
# Merge failed, likely conflicts
output_lower = (stderr_log + stdout_log).lower()
if "conflict" in output_lower:
conflict_msg = (
f"Merge conflict occurred. Resolve manually in "
f"'{working_directory}' and commit."
)
self.logger.error(conflict_msg)
# Raise specific error for caller to handle
raise GitCommandError(
conflict_msg, command=merge_command, stderr=merge_result.stderr
)
else:
# Other merge error
error_msg = f"Merge failed (code {merge_result.returncode})."
self.logger.error(error_msg)
raise GitCommandError(
error_msg, command=merge_command, stderr=merge_result.stderr
)
except (GitCommandError, ValueError) as e:
self.logger.error(f"Fetch/merge error for '{working_directory}': {e}")
raise
except Exception as e:
self.logger.exception(f"Unexpected fetch/merge error: {e}")
raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e
# --- Commit and Status ---
def git_commit(self, working_directory, message):
"""Stages all changes and commits them with the given message."""
self.logger.info(
f"Attempting commit in '{working_directory}' with msg: '{message}'"
)
if not message:
# Disallow empty commit messages programmatically
raise ValueError("Commit message cannot be empty.")
try:
# 1. Stage all changes using 'git add .'
add_command = ["git", "add", "."]
self.logger.debug("Staging all changes...")
# Use check=True to ensure staging succeeds
self.log_and_execute(add_command, working_directory, check=True)
self.logger.debug("Staging successful.")
# 2. Commit staged changes
commit_command = ["git", "commit", "-m", message]
self.logger.debug("Attempting commit...")
# Use check=False to handle 'nothing to commit' case gracefully
result = self.log_and_execute(
commit_command, working_directory, check=False
)
# Analyze commit result
stdout_lower = result.stdout.lower() if result.stdout else ""
stderr_lower = result.stderr.lower() if result.stderr else ""
if result.returncode == 0:
# Standard successful commit
self.logger.info("Commit successful.")
return True
# Check for various 'nothing to commit' messages
elif (
"nothing to commit" in stdout_lower
or "no changes added to commit" in stdout_lower
or "nothing added to commit" in stdout_lower
or (result.returncode == 1 and not stderr_lower and not stdout_lower)
):
# Common non-error case: repo was clean or add didn't stage anything new
self.logger.info("No changes were available to commit.")
return False # Indicate no commit was made
else:
# An unexpected error occurred during the commit
error_msg = (
f"Commit cmd failed unexpectedly (code {result.returncode})."
)
# Details likely logged by log_and_execute already
raise GitCommandError(
error_msg, command=commit_command, stderr=result.stderr
)
except (GitCommandError, ValueError) as e:
# Catch errors from staging or commit steps, or validation
self.logger.error(f"Error during staging or commit: {e}")
raise # Re-raise the specific error
except Exception as e:
# Catch any other unexpected error
self.logger.exception(f"Unexpected error during commit process: {e}")
raise GitCommandError(f"Unexpected commit error: {e}") from e
def git_status_has_changes(self, working_directory):
"""Checks if the Git repository has uncommitted changes."""
self.logger.debug(f"Checking Git status in '{working_directory}'...")
try:
# Use '--porcelain' for reliable script parsing
status_command = ["git", "status", "--porcelain"]
# Use check=True to ensure the status command itself runs correctly
result = self.log_and_execute(status_command, working_directory, check=True)
# If porcelain output is non-empty, there are changes
has_changes = bool(result.stdout.strip())
self.logger.debug(f"Status check complete. Has changes: {has_changes}")
return has_changes
except (GitCommandError, ValueError) as e:
self.logger.error(f"Status check error: {e}")
raise # Re-raise specific error
except Exception as e:
self.logger.exception(f"Unexpected status check error: {e}")
raise GitCommandError(f"Unexpected status error: {e}") from e
# --- Tag Management ---
def list_tags(self, working_directory):
"""Lists tags with subjects, sorted by creation date (desc)."""
self.logger.info(f"Listing tags with subjects in '{working_directory}'...")
# Format: Tagname<TAB>SubjectLine
fmt = "%(refname:short)%09%(contents:subject)"
cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"]
tags_data = [] # List to store (name, subject) tuples
try:
result = self.log_and_execute(cmd, working_directory, check=True)
for line in result.stdout.splitlines():
line_strip = line.strip()
if line_strip:
# Split only on the first tab to handle subjects with tabs
parts = line_strip.split("\t", 1)
name = parts[0].strip()
# Provide default if subject is missing
subject = parts[1].strip() if len(parts) > 1 else "(No subject)"
tags_data.append((name, subject))
self.logger.info(f"Found {len(tags_data)} tags.")
self.logger.debug(f"Tags found: {tags_data}")
return tags_data
except (GitCommandError, ValueError) as e:
# Log error but return empty list for graceful GUI handling
self.logger.error(f"Error listing tags: {e}")
return []
except Exception as e:
# Log unexpected errors
self.logger.exception(f"Unexpected error listing tags: {e}")
return []
def create_tag(self, working_directory, tag_name, message):
"""Creates a new annotated Git tag."""
self.logger.info(f"Creating tag '{tag_name}' in '{working_directory}'")
# Validate inputs
if not tag_name:
raise ValueError("Tag name cannot be empty.")
if not message:
raise ValueError("Tag message cannot be empty.")
# Validate tag name format using regex
pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
if not re.match(pattern, tag_name):
raise ValueError(f"Invalid tag name format: '{tag_name}'.")
# Build and execute command
cmd = ["git", "tag", "-a", tag_name, "-m", message]
try:
# Use check=True to catch errors like 'tag already exists'
self.log_and_execute(cmd, working_directory, check=True)
self.logger.info(f"Tag '{tag_name}' created successfully.")
except GitCommandError as e:
# Check stderr for specific common errors
stderr = e.stderr.lower() if e.stderr else ""
if "already exists" in stderr:
msg = f"Tag '{tag_name}' already exists."
self.logger.error(msg)
# Re-raise with specific message
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
else:
# Re-raise other Git command errors
self.logger.error(f"Failed create tag '{tag_name}': {e}")
raise
except ValueError as ve:
# Re-raise validation errors
self.logger.error(f"Validation error creating tag: {ve}")
raise
except Exception as e:
# Handle unexpected errors
self.logger.exception(f"Unexpected error creating tag: {e}")
raise GitCommandError(f"Unexpected tag error: {e}", command=cmd) from e
def checkout_tag(self, working_directory, tag_name):
"""Checks out a specific Git tag (enters detached HEAD state)."""
self.logger.info(f"Checking out tag '{tag_name}' in '{working_directory}'...")
if not tag_name:
raise ValueError("Tag name cannot be empty for checkout.")
cmd = ["git", "checkout", tag_name]
try:
# Use check=True; will raise error if tag not found
result = self.log_and_execute(cmd, working_directory, check=True)
self.logger.info(f"Successfully checked out tag '{tag_name}'.")
# Check output for detached HEAD warning
output = (result.stderr + result.stdout).lower()
if "detached head" in output:
self.logger.warning("Repository is now in a 'detached HEAD' state.")
return True # Indicate success
except GitCommandError as e:
# Check stderr for specific 'tag not found' errors
stderr = e.stderr.lower() if e.stderr else ""
not_found_patterns = [
"did not match any file(s)",
f"pathspec '{tag_name.lower()}' did not match", # Check lowercase
]
if any(p in stderr for p in not_found_patterns):
msg = f"Tag '{tag_name}' not found or invalid."
self.logger.error(msg)
# Re-raise with specific message
raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e
else:
# Re-raise other Git errors
self.logger.error(f"Failed checkout tag '{tag_name}': {e}")
raise
except ValueError as ve:
# Re-raise validation errors
self.logger.error(f"Validation error checking out tag: {ve}")
raise
except Exception as e:
# Handle unexpected errors
self.logger.exception(f"Unexpected error checking out tag: {e}")
raise GitCommandError(f"Unexpected checkout error: {e}", command=cmd) from e
# --- Branch Management ---
def list_branches(self, working_directory):
"""
Lists local branches, identifying the current one.
Returns:
tuple: (list of all branch names, current branch name or None).
Returns ([], None) on error.
"""
self.logger.info(f"Listing local branches in '{working_directory}'...")
# Format: '*' if current, space otherwise, then branch name
fmt = "%(HEAD) %(refname:short)"
command = ["git", "branch", "--list", f"--format={fmt}"]
branches = []
current_branch = None
try:
result = self.log_and_execute(command, working_directory, check=True)
for line in result.stdout.splitlines():
line_strip = line.strip()
if line_strip:
is_current = line_strip.startswith("*")
# Remove indicator and leading/trailing spaces
branch_name = line_strip.lstrip("* ").strip()
branches.append(branch_name)
if is_current:
current_branch = branch_name
# Sort branches alphabetically for consistent display order
branches.sort()
self.logger.info(
f"Found {len(branches)} branches. Current: {current_branch}"
)
self.logger.debug(f"Branches: {branches}")
return branches, current_branch
except (GitCommandError, ValueError) as e:
self.logger.error(f"Error listing branches: {e}")
return [], None # Return empty data on error
except Exception as e:
self.logger.exception(f"Unexpected error listing branches: {e}")
return [], None
def checkout_branch(self, working_directory, branch_name):
"""Checks out an existing local branch."""
self.logger.info(
f"Checking out branch '{branch_name}' in '{working_directory}'..."
)
if not branch_name:
raise ValueError("Branch name cannot be empty for checkout.")
# Use 'git switch' if available? Might provide cleaner output/errors.
# Sticking with 'checkout' for broader compatibility for now.
command = ["git", "checkout", branch_name]
try:
# Use check=True to catch errors like branch not found
self.log_and_execute(command, working_directory, check=True)
self.logger.info(f"Successfully checked out branch '{branch_name}'.")
return True # Indicate success
except GitCommandError as e:
# Check for specific 'branch not found' errors
stderr = e.stderr.lower() if e.stderr else ""
if (
"did not match any file(s)" in stderr
or f"pathspec '{branch_name.lower()}' did not match" in stderr
):
msg = f"Branch '{branch_name}' not found or invalid."
self.logger.error(msg)
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
else:
# Re-raise other Git errors
self.logger.error(f"Failed checkout branch '{branch_name}': {e}")
raise
except ValueError as ve:
self.logger.error(f"Validation error checking out branch: {ve}")
raise
except Exception as e:
# Handle unexpected errors
self.logger.exception(f"Unexpected error checking out branch: {e}")
raise GitCommandError(
f"Unexpected checkout error: {e}", command=command
) from e
def create_branch(self, working_directory, new_branch_name):
"""Creates a new local branch from the current HEAD position."""
self.logger.info(
f"Creating new branch '{new_branch_name}' in '{working_directory}'..."
)
if not new_branch_name:
raise ValueError("New branch name cannot be empty.")
# Validate branch name format (simplified check for common issues)
# Disallows spaces, consecutive/trailing dots/slashes, control chars, etc.
# Also prevents creating a branch named 'HEAD'.
pattern = (
r"^(?![./]|.*([./]{2,}|[.]$|@\{|\\\\))[^ \t\n\r\f\v~^:?*[\\]+(?<![/.])$"
)
if not re.match(pattern, new_branch_name) or new_branch_name.lower() == "head":
raise ValueError(f"Invalid branch name format: '{new_branch_name}'.")
command = ["git", "branch", new_branch_name]
try:
# Use check=True to catch errors like 'branch already exists'
self.log_and_execute(command, working_directory, check=True)
self.logger.info(f"Branch '{new_branch_name}' created successfully.")
return True # Indicate success
except GitCommandError as e:
# Check for specific 'already exists' error
stderr = e.stderr.lower() if e.stderr else ""
if "already exists" in stderr:
msg = f"Branch '{new_branch_name}' already exists."
self.logger.error(msg)
raise GitCommandError(msg, command=command, stderr=e.stderr) from e
else:
# Re-raise other Git errors
self.logger.error(f"Failed create branch '{new_branch_name}': {e}")
raise
except ValueError as ve:
# Re-raise validation errors
self.logger.error(f"Validation error creating branch: {ve}")
raise
except Exception as e:
# Handle unexpected errors
self.logger.exception(f"Unexpected error creating branch: {e}")
raise GitCommandError(
f"Unexpected create branch error: {e}", command=command
) from e
# --- History / Log ---
def get_commit_log(self, working_directory, max_count=100, branch=None):
"""
Retrieves formatted commit log, optionally filtered by branch.
Args:
working_directory (str): Path to the Git repository.
max_count (int): Maximum number of log entries to return.
branch (str, optional): Filter history to only this branch/ref.
If None, shows history from all refs.
Returns:
list: List of formatted log strings, newest first. Empty on error.
"""
log_scope = f"branch '{branch}'" if branch else "all branches/tags"
self.logger.info(
f"Retrieving commit log for {log_scope} in "
f"'{working_directory}' (max {max_count})..."
)
# Define log format: AbbrevHash Date | Author | Refs Subject
log_format = "%h %ad | %an |%d %s"
# Define date format (e.g., 'iso-strict', 'relative', 'short')
date_format = "iso-strict"
# Build the base git log command
command = [
"git",
"log",
f"--max-count={max_count}", # Limit number of entries
f"--pretty=format:{log_format}", # Apply custom format
f"--date={date_format}", # Format commit date
"--decorate", # Show ref names (branches, tags)
]
# Add branch filter or --all
if branch:
# Add the specific branch name to filter the log
command.append(branch)
else:
# Show history from all reachable commits if no branch specified
command.append("--all")
try:
# Execute the log command
result = self.log_and_execute(
command,
working_directory,
check=True, # Ensure log command itself succeeds
)
# Process the output lines
log_lines = [
line.strip() for line in result.stdout.splitlines() if line.strip()
] # Remove empty lines
self.logger.info(f"Retrieved {len(log_lines)} log entries for {log_scope}.")
return log_lines
except (GitCommandError, ValueError) as e:
# Log known errors and return empty list
self.logger.error(f"Error retrieving commit log: {e}")
return []
except Exception as e:
# Log unexpected errors and return empty list
self.logger.exception(f"Unexpected error retrieving commit log: {e}")
return []
def clone_from_bundle(self, bundle_path, destination_directory):
"""
Clones a repository from a Git bundle file into the specified directory.
The destination directory should not exist or be empty.
Args:
bundle_path (str): Absolute path to the .bundle file.
destination_directory (str): Path to the directory where the repo
will be cloned.
Returns:
bool: True on successful clone.
Raises:
GitCommandError: If git clone fails.
ValueError: If bundle_path or destination_directory are invalid.
FileNotFoundError: If the bundle file does not exist.
IOError: If the destination directory exists and is not empty.
"""
self.logger.info(
f"Attempting to clone from bundle '{bundle_path}' into '{destination_directory}'"
)
# --- Validate Inputs ---
if not bundle_path:
raise ValueError("Bundle path cannot be empty.")
if not destination_directory:
raise ValueError("Destination directory cannot be empty.")
abs_bundle_path = os.path.abspath(bundle_path)
if not os.path.isfile(abs_bundle_path):
msg = f"Bundle file not found: {abs_bundle_path}"
self.logger.error(msg)
raise FileNotFoundError(msg)
abs_dest_path = os.path.abspath(destination_directory)
# Check destination directory constraints
if os.path.exists(abs_dest_path):
if not os.path.isdir(abs_dest_path):
msg = f"Destination path exists but is not a directory: {abs_dest_path}"
self.logger.error(msg)
raise IOError(msg)
# Check if the directory is empty
if os.listdir(abs_dest_path):
msg = f"Destination directory is not empty: {abs_dest_path}. Cannot clone here."
self.logger.error(msg)
raise IOError(msg)
self.logger.debug("Destination directory exists and is empty. Proceeding.")
else:
# Attempt to create the destination directory if it doesn't exist
try:
os.makedirs(abs_dest_path, exist_ok=True)
self.logger.info(f"Created destination directory: {abs_dest_path}")
except OSError as e:
msg = f"Failed to create destination directory '{abs_dest_path}': {e}"
self.logger.error(msg, exc_info=True)
raise IOError(msg) from e
# --- Prepare and Execute Clone Command ---
# Use --bundle option and specify the bundle file and destination directory.
# Use --no-hardlinks potentially for robustness across filesystems (optional).
command = [
"git",
"clone",
#"--bundle",
abs_bundle_path, # Path to the bundle file
abs_dest_path, # Path to the destination directory
# "--no-hardlinks" # Consider adding if moving between different drives/filesystems
]
try:
# Execute the clone command. Run it from a neutral directory (like parent of dest)
# or let it run from the script's default CWD. check=True ensures failure raises error.
# Cloning doesn't typically need a specific 'working_directory' context like fetch/merge.
self.log_and_execute(command, working_directory=".", check=True) # Use '.' for current dir or script location
self.logger.info(f"Successfully cloned repository into '{abs_dest_path}' from bundle.")
return True
except GitCommandError as clone_error:
# Log specific Git errors and re-raise
self.logger.error(
f"Failed to clone from bundle '{abs_bundle_path}': {clone_error}",
exc_info=True
)
# Attempt cleanup: remove potentially partially created destination directory
if os.path.isdir(abs_dest_path) and not os.listdir(abs_dest_path): # Check if empty after failure
try:
os.rmdir(abs_dest_path)
self.logger.info(f"Removed empty destination directory after failed clone: {abs_dest_path}")
except OSError as rm_e:
self.logger.warning(f"Could not remove destination directory '{abs_dest_path}' after failed clone: {rm_e}")
raise clone_error # Re-raise the original error
except Exception as e:
# Catch any other unexpected errors
self.logger.exception(f"Unexpected error cloning from bundle: {e}")
raise GitCommandError(f"Unexpected clone error: {e}", command=command) from e
def get_tracked_files(self, working_directory):
"""
Retrieves a list of all files currently tracked by Git in the repository.
Logs command output only at DEBUG level.
"""
self.logger.debug(f"Getting tracked files for '{working_directory}'")
command = ["git", "ls-files", "-z"]
try:
# --- MODIFICA: Passa log_output_level=logging.DEBUG ---
result = self.log_and_execute(
command,
working_directory,
check=True,
log_output_level=logging.DEBUG # Log stdout/stderr only if logger level is DEBUG
)
# --- FINE MODIFICA ---
tracked_files = [
f for f in result.stdout.split('\0') if f
]
# Log the *count* at INFO level, the list itself might only appear in DEBUG log now.
self.logger.info(f"Found {len(tracked_files)} tracked files.")
self.logger.debug(f"Tracked file list: {tracked_files}") # Log list at DEBUG
return tracked_files
except (GitCommandError, ValueError) as e:
self.logger.error(f"Failed to list tracked files: {e}")
raise
except Exception as e:
self.logger.exception(f"Unexpected error listing tracked files: {e}")
raise GitCommandError(f"Unexpected error listing files: {e}", command=command) from e
def check_if_would_be_ignored(self, working_directory, path_to_check):
"""
Checks if a given path would be ignored by current .gitignore rules,
regardless of whether it's currently tracked.
Args:
working_directory (str): Path to the Git repository.
path_to_check (str): The relative path within the repo to check.
Returns:
bool: True if the path matches an ignore rule, False otherwise.
Raises:
GitCommandError: If the git command fails unexpectedly.
"""
# Use --no-index to check against .gitignore rules directly,
# not the index (tracked status).
# Use --quiet to suppress output, rely only on exit code.
command = ["git", "check-ignore", "--quiet", "--no-index", "--", path_to_check]
self.logger.debug(f"Checking ignore status for path: '{path_to_check}'")
try:
# check=False because exit code 1 (not ignored) is not an error here
result = self.log_and_execute(command, working_directory, check=False)
if result.returncode == 0:
self.logger.debug(f"Path '{path_to_check}' WOULD be ignored.")
return True # Exit code 0 means it IS ignored
elif result.returncode == 1:
self.logger.debug(f"Path '{path_to_check}' would NOT be ignored.")
return False # Exit code 1 means it is NOT ignored
else:
# Other non-zero exit codes (e.g., 128) indicate an error
error_msg = f"git check-ignore failed with exit code {result.returncode}"
self.logger.error(f"{error_msg}. Stderr: {result.stderr}")
raise GitCommandError(error_msg, command=command, stderr=result.stderr)
except (GitCommandError, ValueError) as e:
self.logger.error(f"Failed check_if_would_be_ignored for '{path_to_check}': {e}")
raise
except Exception as e:
self.logger.exception(f"Unexpected error in check_if_would_be_ignored: {e}")
raise GitCommandError(f"Unexpected check-ignore error: {e}", command=command) from e
def remove_from_tracking(self, working_directory, files_to_untrack):
"""
Removes specified files/directories from Git tracking (index)
without deleting them from the working directory. Processes files in batches
to avoid command line length limits.
Args:
working_directory (str): Path to the Git repository.
files_to_untrack (list): List of relative paths to untrack.
Returns:
bool: True if all batches executed successfully.
Raises:
GitCommandError: If any git rm batch command fails.
ValueError: If the list of files is empty.
"""
if not files_to_untrack:
# Non è un errore se la lista è vuota, semplicemente non c'è nulla da fare.
self.logger.debug("No files provided to remove_from_tracking.")
return True # Considera successo non fare nulla
self.logger.info(f"Removing {len(files_to_untrack)} items from Git tracking in batches...")
# --- MODIFICA: Logica Batch ---
batch_size = 100 # Processa N file alla volta (regola se necessario)
all_batches_succeeded = True
for i in range(0, len(files_to_untrack), batch_size):
batch = files_to_untrack[i : i + batch_size]
if not batch: # Salta batch vuoti (non dovrebbe succedere con range)
continue
self.logger.info(f"Processing untrack batch {i // batch_size + 1}: {len(batch)} items...")
self.logger.debug(f"Batch items: {batch}")
# Costruisci il comando per questo batch
command = ["git", "rm", "--cached", "-r", "--ignore-unmatch", "--"] # Aggiunto -r e --ignore-unmatch
# -r: Necessario se alcuni 'file' sono in realtà directory (come _build/, _dist/)
# --ignore-unmatch: Evita errori se un file è già stato rimosso o non trovato per qualche motivo strano
command.extend(batch)
try:
# Esegui il comando per il batch corrente
# Logga l'output a DEBUG per non inondare il log INFO
self.log_and_execute(command, working_directory, check=True, log_output_level=logging.DEBUG)
self.logger.info(f"Batch {i // batch_size + 1} untracked successfully.")
except (GitCommandError, ValueError) as e:
self.logger.error(f"Failed to process untrack batch {i // batch_size + 1}: {e}")
# Puoi decidere se interrompere o continuare con gli altri batch.
# Interrompere è generalmente più sicuro.
all_batches_succeeded = False
raise GitCommandError(f"Failed untracking batch {i // batch_size + 1}. Error: {e}", command=command, stderr=getattr(e, 'stderr', None)) from e
except Exception as e:
self.logger.exception(f"Unexpected error processing untrack batch {i // batch_size + 1}: {e}")
all_batches_succeeded = False
raise GitCommandError(f"Unexpected untrack batch error: {e}", command=command) from e
# Se arriviamo qui, il batch ha avuto successo
# Ritorna True solo se TUTTI i batch hanno avuto successo
if all_batches_succeeded:
self.logger.info("All untracking batches completed successfully.")
return True
else:
# Questo caso non dovrebbe essere raggiunto se solleviamo eccezioni sopra,
# ma lo teniamo per sicurezza.
self.logger.error("Untracking process completed with errors in some batches.")
return False
def get_matching_gitignore_rule(self, working_directory, path_to_check):
"""
Checks if a given path matches a .gitignore rule and returns the matching pattern.
Args:
working_directory (str): Path to the Git repository.
path_to_check (str): The relative path within the repo to check.
Returns:
str or None: The pattern from .gitignore that matched the path,
or None if the path is not ignored.
Raises:
GitCommandError: If the git command fails unexpectedly.
"""
# Use -v (--verbose) to get the matching rule details.
# Use --no-index to check against .gitignore rules directly.
command = ["git", "check-ignore", "-v", "--no-index", "--", path_to_check]
self.logger.debug(f"Getting matching ignore rule for path: '{path_to_check}'")
try:
# check=False because exit code 1 (not ignored) is not an error here
result = self.log_and_execute(command, working_directory, check=False)
if result.returncode == 0:
# Output format: <source>:<line_num>:<pattern>\t<pathname>
# Example: .gitignore:5:*.log logs/latest.log
output_line = result.stdout.strip()
if output_line and '\t' in output_line:
rule_part = output_line.split('\t', 1)[0]
# Extract pattern (part after the second colon)
parts = rule_part.split(':', 2)
if len(parts) == 3:
pattern = parts[2]
self.logger.debug(f"Path '{path_to_check}' matched rule: '{pattern}'")
return pattern
else:
self.logger.warning(f"Could not parse pattern from check-ignore output: {output_line}")
return None # Indicate parsing failure rather than no match
else:
self.logger.warning(f"Unexpected output format from check-ignore -v: {output_line}")
return None # Indicate unexpected output
elif result.returncode == 1:
# Exit code 1 means the path is NOT ignored
self.logger.debug(f"Path '{path_to_check}' is not ignored by any rule.")
return None
else:
# Other non-zero exit codes (e.g., 128) indicate an error
error_msg = f"git check-ignore -v failed with exit code {result.returncode}"
self.logger.error(f"{error_msg}. Stderr: {result.stderr}")
raise GitCommandError(error_msg, command=command, stderr=result.stderr)
except (GitCommandError, ValueError) as e:
self.logger.error(f"Failed get_matching_gitignore_rule for '{path_to_check}': {e}")
raise
except Exception as e:
self.logger.exception(f"Unexpected error in get_matching_gitignore_rule: {e}")
raise GitCommandError(f"Unexpected check-ignore -v error: {e}", command=command) from e