SXXXXXXX_GitUtility/git_commands.py
2025-04-07 14:01:15 +02:00

750 lines
32 KiB
Python

# git_commands.py
import os
import subprocess
import logging
import re # Ensure re is imported
class GitCommandError(Exception):
"""
Custom exception for handling Git command errors.
Includes the original command and error details if available.
"""
def __init__(self, message, command=None, stderr=None):
"""
Initialize the GitCommandError.
Args:
message (str): The error message.
command (list, optional): The command that caused the error. Defaults to None.
stderr (str, optional): The standard error output. Defaults to None.
"""
super().__init__(message)
self.command = command
self.stderr = stderr
def __str__(self):
"""Return a formatted string representation of the error."""
base_message = super().__str__()
details = []
if self.command:
# Ensure command is list of strings for join
safe_command = [str(part) for part in self.command]
command_str = ' '.join(safe_command)
details.append(f"Command: '{command_str}'")
if self.stderr:
stderr_str = self.stderr.strip()
details.append(f"Stderr: {stderr_str}")
if details:
details_str = '; '.join(details)
return f"{base_message} ({details_str})"
else:
return base_message
class GitCommands:
"""
Manages Git commands execution, logging, and error handling.
Includes tag management functionalities.
"""
def __init__(self, logger):
"""
Initializes the GitCommands with a logger.
Args:
logger (logging.Logger): Logger instance for logging messages.
"""
if not isinstance(logger, logging.Logger):
# Raise error if logger is not a valid logger instance
raise ValueError("A valid logging.Logger instance is required.")
self.logger = logger
def log_and_execute(self, command, working_directory, check=True):
"""
Executes a command within a specific working directory, logs it,
and handles errors.
Args:
command (list): List representing the command and its arguments.
working_directory (str): Path to the directory for command execution.
check (bool, optional): Raise error on non-zero exit code if True.
Defaults to True.
Returns:
subprocess.CompletedProcess: The result object from subprocess.run.
Raises:
GitCommandError: If path invalid, command fails (and check=True), etc.
ValueError: If working_directory is None or empty.
"""
# Ensure all parts of command are strings for logging and execution
safe_command = [str(part) for part in command]
command_str = ' '.join(safe_command)
log_message = f"Executing: {command_str}"
self.logger.debug(log_message)
# Validate working directory
if not working_directory:
msg = "Working directory cannot be None or empty."
self.logger.error(msg)
raise ValueError(msg)
abs_path = os.path.abspath(working_directory)
if not os.path.isdir(abs_path):
msg = f"Working directory does not exist or is not a directory: {abs_path}"
self.logger.error(msg)
# Include command context in the error
raise GitCommandError(msg, command=safe_command)
cwd = abs_path
self.logger.debug(f"Working directory: {cwd}")
try:
# Platform-specific setup to hide console window on Windows
startupinfo = None
creationflags = 0 # Default creation flags
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
# Optional: Use CREATE_NO_WINDOW if std I/O redirection not needed
# creationflags = subprocess.CREATE_NO_WINDOW
# Execute the command
result = subprocess.run(
safe_command, # Use the validated command list
cwd=cwd,
capture_output=True,
text=True,
check=check,
encoding='utf-8',
errors='replace', # Handle potential decoding errors
startupinfo=startupinfo,
creationflags=creationflags
)
# Log stdout and stderr clearly
stdout_log = result.stdout.strip() if result.stdout else "<no stdout>"
stderr_log = result.stderr.strip() if result.stderr else "<no stderr>"
# Log success with output details
self.logger.info(
f"Command successful. Output:\n"
f"--- stdout ---\n{stdout_log}\n"
f"--- stderr ---\n{stderr_log}\n---"
)
return result
except subprocess.CalledProcessError as e:
# Log detailed error information from the failed process
stderr_err = e.stderr.strip() if e.stderr else "<no stderr>"
stdout_err = e.stdout.strip() if e.stdout else "<no stdout>"
error_log_msg = (
f"Command failed with return code {e.returncode} in '{cwd}'.\n"
f"--- command ---\n{command_str}\n"
f"--- stderr ---\n{stderr_err}\n"
f"--- stdout ---\n{stdout_err}\n"
f"---"
)
self.logger.error(error_log_msg)
# Wrap the original exception for consistent error handling
raise GitCommandError(
f"Git command failed in '{cwd}'.",
command=safe_command,
stderr=e.stderr
) from e
except FileNotFoundError as e:
# Log error if the command itself (e.g., 'git') is not found
error_msg = (
f"Command not found: '{safe_command[0]}'. Is Git installed "
f"and in system PATH? (WD: '{cwd}')"
)
self.logger.error(error_msg)
# Log exception details at debug level for more info if needed
self.logger.debug(f"FileNotFoundError details: {e}")
raise GitCommandError(error_msg, command=safe_command) from e
except PermissionError as e:
# Handle errors related to execution permissions
error_msg = f"Permission denied executing command in '{cwd}'."
self.logger.error(error_msg)
self.logger.debug(f"PermissionError details: {e}")
raise GitCommandError(error_msg, command=safe_command, stderr=str(e)) from e
except Exception as e:
# Catch any other unexpected errors during execution
self.logger.exception(f"Unexpected error executing command in '{cwd}': {e}")
raise GitCommandError(
f"Unexpected error during command execution: {e}",
command=safe_command
) from e
def create_git_bundle(self, working_directory, bundle_path):
"""
Creates a Git bundle file from the repository in working_directory.
Args:
working_directory (str): Path to the local Git repository.
bundle_path (str): Full path where the bundle file should be saved.
Raises:
GitCommandError: If command fails or path invalid.
ValueError: If working_directory is None or empty.
"""
# Normalize bundle path for consistency across OS
normalized_bundle_path = os.path.normpath(bundle_path)
normalized_bundle_path = normalized_bundle_path.replace("\\", "/")
command = ["git", "bundle", "create", normalized_bundle_path, "--all"]
self.logger.info(f"Attempting to create Git bundle: {normalized_bundle_path}")
try:
# Execute command, allow non-zero exit for specific warning
# Use check=False and manually check return code and stderr
result = self.log_and_execute(
command,
working_directory,
check=False
)
# Check for non-fatal warnings vs actual errors
if result.returncode != 0:
stderr_lower = result.stderr.lower() if result.stderr else ""
if "refusing to create empty bundle" in stderr_lower:
self.logger.warning(
f"Bundle creation skipped: Repository at "
f"'{working_directory}' has no commits to bundle."
)
# This is not treated as an error, do not raise
else:
# An actual error occurred during bundle creation
error_msg = (
f"Git bundle command failed with return code "
f"{result.returncode}."
)
# log_and_execute already logged details in case of failure
# Raise specific error here to signal failure
raise GitCommandError(
error_msg,
command=command,
stderr=result.stderr
)
# Check if file exists and has size even if return code was 0
elif not os.path.exists(normalized_bundle_path) or \
os.path.getsize(normalized_bundle_path) == 0:
self.logger.warning(
f"Bundle command returned success, but file "
f"'{normalized_bundle_path}' is missing or empty."
)
# Consider if this should be an error depending on requirements
else:
self.logger.info(
f"Git bundle created successfully: '{normalized_bundle_path}'."
)
except (GitCommandError, ValueError) as e:
# Log context and re-raise known errors
self.logger.error(
f"Failed to create Git bundle for repo "
f"'{working_directory}'. Reason: {e}"
)
raise
except Exception as e:
# Catch unexpected errors
self.logger.exception(
f"Unexpected error during Git bundle creation for "
f"'{working_directory}': {e}"
)
raise GitCommandError(
f"Unexpected error creating bundle: {e}",
command=command
) from e
def fetch_from_git_bundle(self, working_directory, bundle_path):
"""
Fetches changes from a Git bundle file into the specified local
repository and merges them.
Args:
working_directory (str): Path to the local Git repository.
bundle_path (str): Path to the Git bundle file to fetch from.
Raises:
GitCommandError: If fetch or merge fails, or paths are invalid.
ValueError: If working_directory is None or empty.
"""
# Normalize bundle path
normalized_bundle_path = os.path.normpath(bundle_path)
normalized_bundle_path = normalized_bundle_path.replace("\\", "/")
self.logger.info(
f"Attempting to fetch from bundle '{normalized_bundle_path}' "
f"into '{working_directory}'"
)
# Define commands
fetch_command = ["git", "fetch", normalized_bundle_path]
# Merge strategy: No fast-forward, create merge commit if needed
merge_command = ["git", "merge", "FETCH_HEAD", "--no-ff"]
try:
# 1. Fetch changes from the bundle
self.logger.debug(f"Executing fetch command...")
# Use check=True to ensure fetch succeeds or raises error
self.log_and_execute(fetch_command, working_directory, check=True)
self.logger.info("Successfully fetched from Git bundle.")
# 2. Merge the fetched changes
self.logger.debug(f"Executing merge command...")
# Use check=False for merge, as conflicts return non-zero code
merge_result = self.log_and_execute(
merge_command,
working_directory,
check=False
)
# Analyze merge result
stdout_log = merge_result.stdout.strip() if merge_result.stdout else ""
stderr_log = merge_result.stderr.strip() if merge_result.stderr else ""
if merge_result.returncode == 0:
# Merge was successful or nothing to merge
if "already up to date" in stdout_log.lower():
self.logger.info("Repository was already up-to-date.")
else:
self.logger.info("Successfully merged fetched changes.")
else:
# Merge likely failed due to conflicts or other issues
output_lower = (stderr_log + stdout_log).lower()
if "conflict" in output_lower:
conflict_msg = (
f"Merge conflict occurred after fetching from bundle "
f"'{os.path.basename(bundle_path)}'. Please resolve "
f"conflicts manually in '{working_directory}' and commit."
)
self.logger.error(conflict_msg)
# Raise a specific error indicating conflict for the caller
raise GitCommandError(
conflict_msg,
command=merge_command,
stderr=merge_result.stderr
)
else:
# Other merge error
error_msg = (
f"Merge command failed unexpectedly after fetch "
f"(return code {merge_result.returncode})."
)
self.logger.error(error_msg)
# Raise specific error, details logged by log_and_execute
raise GitCommandError(
error_msg,
command=merge_command,
stderr=merge_result.stderr
)
except (GitCommandError, ValueError) as e:
# Log context and re-raise known errors
self.logger.error(
f"Error during fetch/merge for repo '{working_directory}' "
f"from bundle '{bundle_path}'. Reason: {e}"
)
raise
except Exception as e:
# Catch unexpected errors
self.logger.exception(
f"Unexpected error during fetch/merge for '{working_directory}': {e}"
)
raise GitCommandError(f"Unexpected error during fetch/merge: {e}") from e
def prepare_svn_for_git(self, working_directory):
"""
Prepares a directory for Git: initializes repo (if needed) and
ensures .gitignore ignores '.svn'.
Args:
working_directory (str): The path to the directory to prepare.
Raises:
GitCommandError: If command fails, file ops fail, path invalid.
ValueError: If working_directory is None or empty.
"""
self.logger.info(f"Preparing directory for Git: '{working_directory}'")
# Basic path validation
if not working_directory:
raise ValueError("Working directory cannot be None or empty.")
if not os.path.isdir(working_directory):
raise GitCommandError(f"Directory does not exist: {working_directory}")
gitignore_path = os.path.join(working_directory, ".gitignore")
git_dir_path = os.path.join(working_directory, ".git")
# 1. Initialize Git repository if it doesn't exist
if not os.path.exists(git_dir_path):
self.logger.info("No existing Git repository found. Initializing...")
try:
init_command = ["git", "init"]
self.log_and_execute(init_command, working_directory, check=True)
self.logger.info("Git repository initialized successfully.")
except (GitCommandError, ValueError) as e:
self.logger.error(f"Failed to initialize Git repository: {e}")
raise # Re-raise to signal preparation failure
except Exception as e:
self.logger.exception(f"Unexpected error initializing repo: {e}")
raise GitCommandError(f"Unexpected init error: {e}") from e
else:
self.logger.info("Git repository already exists. Skipping init.")
# 2. Ensure .gitignore exists and ignores .svn
self.logger.debug(f"Checking/updating .gitignore: {gitignore_path}")
try:
svn_ignore_entry = ".svn"
needs_write = False
content_to_write = ""
if not os.path.exists(gitignore_path):
self.logger.info("'.gitignore' not found. Creating.")
content_to_write = f"{svn_ignore_entry}\n"
needs_write = True
else:
# Check if .svn is already ignored
try:
with open(gitignore_path, "r", encoding='utf-8') as f:
lines = f.readlines()
# Check ignores .svn directory or files within it
is_ignored = any(
line.strip() == svn_ignore_entry or \
line.strip().startswith(svn_ignore_entry + '/')
for line in lines
)
if not is_ignored:
self.logger.info(f"'{svn_ignore_entry}' not found. Appending.")
# Prepare content to append, ensuring newline separation
current_content = "".join(lines)
if not current_content.endswith('\n'):
content_to_write = f"\n{svn_ignore_entry}\n"
else:
content_to_write = f"{svn_ignore_entry}\n"
needs_write = True # Need to append
else:
self.logger.info(f"'{svn_ignore_entry}' already ignored.")
except IOError as e:
self.logger.warning(f"Could not read '.gitignore': {e}.")
# Cannot verify, maybe attempt write? Safer to leave alone.
# needs_write = True # Or False, depending on desired safety
# Write to file only if necessary
if needs_write:
# Append or write new file
mode = 'a' if os.path.exists(gitignore_path) else 'w'
try:
# Use newline='\n' for consistent line endings
with open(gitignore_path, mode, encoding='utf-8', newline='\n') as f:
f.write(content_to_write)
self.logger.info("Updated '.gitignore' file.")
except IOError as e:
self.logger.error(f"Error writing to '.gitignore': {e}")
raise GitCommandError(f"Failed update .gitignore: {e}") from e
except IOError as e: # Catch errors from os.path.exists or final write
self.logger.error(f"Error accessing/writing '.gitignore': {e}")
raise GitCommandError(f"File I/O error for .gitignore: {e}") from e
except Exception as e: # Catch other unexpected errors
self.logger.exception(f"Unexpected error managing '.gitignore': {e}")
raise GitCommandError(f"Unexpected error with .gitignore: {e}") from e
self.logger.info(f"Directory preparation complete for '{working_directory}'.")
def git_commit(self, working_directory, message="Autocommit"):
"""
Stages all changes (git add .) and commits them.
Args:
working_directory (str): Path to the local Git repository.
message (str): The commit message. Defaults to "Autocommit".
Returns:
bool: True if a commit was made, False if no changes.
Raises:
GitCommandError: If staging or commit fails unexpectedly.
ValueError: If working_directory is None or empty.
"""
self.logger.info(
f"Attempting commit in '{working_directory}' with msg: '{message}'"
)
try:
# 1. Stage all changes
add_command = ["git", "add", "."]
self.logger.debug("Staging all changes (git add .)...")
self.log_and_execute(add_command, working_directory, check=True)
self.logger.debug("Staging successful.")
# 2. Commit staged changes
commit_command = ["git", "commit", "-m", message]
self.logger.debug("Attempting commit...")
# check=False because 'nothing to commit' returns non-zero
result = self.log_and_execute(
commit_command,
working_directory,
check=False
)
# Analyze commit result carefully
stdout_lower = result.stdout.lower() if result.stdout else ""
stderr_lower = result.stderr.lower() if result.stderr else ""
if result.returncode == 0:
# Commit successful, new commit created
self.logger.info(f"Commit successful in '{working_directory}'.")
return True
# Check variations of 'nothing to commit' messages
elif "nothing to commit" in stdout_lower or \
"no changes added to commit" in stdout_lower or \
"nothing added to commit" in stdout_lower or \
(result.returncode == 1 and not stderr_lower and not stdout_lower):
# Common non-error case: no changes found
self.logger.info("No changes to commit.")
return False
else:
# An unexpected error occurred during the commit attempt
error_msg = (
f"Commit command failed unexpectedly "
f"(return code {result.returncode})."
)
# Details were logged by log_and_execute if check=False
raise GitCommandError(
error_msg,
command=commit_command,
stderr=result.stderr
)
except (GitCommandError, ValueError) as e:
self.logger.error(f"Error during staging or commit process: {e}")
raise # Re-raise the specific error caught
except Exception as e:
# Catch any other unexpected error during the process
self.logger.exception(f"Unexpected error during staging/commit: {e}")
raise GitCommandError(f"Unexpected commit error: {e}") from e
def git_status_has_changes(self, working_directory):
"""
Checks if the Git repository has uncommitted changes.
Args:
working_directory (str): Path to the local Git repository.
Returns:
bool: True if changes exist, False otherwise.
Raises:
GitCommandError: If git status fails.
ValueError: If working_directory is invalid.
"""
self.logger.debug(f"Checking Git status in '{working_directory}'...")
try:
# Use '--porcelain' for script-friendly output
# Empty output means no changes.
status_command = ["git", "status", "--porcelain"]
result = self.log_and_execute(
status_command,
working_directory,
check=True # Ensure status command itself runs ok
)
# Check if the porcelain output has any content after stripping
has_changes = bool(result.stdout.strip())
self.logger.debug(f"Status check complete. Has changes: {has_changes}")
return has_changes
except (GitCommandError, ValueError) as e:
self.logger.error(f"Error checking Git status: {e}")
raise # Re-raise specific error
except Exception as e:
# Catch unexpected errors
self.logger.exception(f"Unexpected error checking Git status: {e}")
raise GitCommandError(f"Unexpected status error: {e}") from e
def list_tags(self, working_directory):
"""
Lists Git tags with subjects, sorted by creation date (desc).
Args:
working_directory (str): Path to the local Git repository.
Returns:
list: List of tuples `(tag_name, tag_subject)`. Empty on error.
"""
self.logger.info(f"Listing tags with subjects in '{working_directory}'...")
# Format: <tagname><TAB><subjectline>
format_string = "%(refname:short)%09%(contents:subject)"
command = ["git", "tag", "--list", f"--format={format_string}", "--sort=-creatordate"]
tags_with_subjects = []
try:
result = self.log_and_execute(command, working_directory, check=True)
output_lines = result.stdout.splitlines()
for line in output_lines:
line_stripped = line.strip()
if line_stripped:
# Split only on the first tab
parts = line_stripped.split('\t', 1)
tag_name = parts[0].strip()
# Handle cases where subject might be missing
tag_subject = parts[1].strip() if len(parts) > 1 else "(No subject)"
tags_with_subjects.append((tag_name, tag_subject))
self.logger.info(f"Found {len(tags_with_subjects)} tags with subjects.")
self.logger.debug(f"Tags found: {tags_with_subjects}")
return tags_with_subjects
except (GitCommandError, ValueError) as e:
# Log specific error but return empty list for GUI handling
self.logger.error(f"Error listing tags with subjects: {e}")
return [] # Return empty list on known errors
except Exception as e:
# Log unexpected errors and return empty list
self.logger.exception(f"Unexpected error listing tags with subjects: {e}")
return []
def create_tag(self, working_directory, tag_name, message):
"""
Creates a new annotated Git tag.
Args:
working_directory (str): Path to the local Git repository.
tag_name (str): The name for the new tag (e.g., "v1.0").
message (str): The annotation message for the tag.
Raises:
GitCommandError: If tag invalid, exists, or command fails.
ValueError: If args invalid.
"""
self.logger.info(
f"Creating tag '{tag_name}' in '{working_directory}' "
f"with message: '{message}'"
)
# Argument validation
if not tag_name:
raise ValueError("Tag name cannot be empty.")
if not message:
raise ValueError("Tag message cannot be empty.")
# Git tag name validation regex (based on git check-ref-format)
# Avoids leading/trailing '.', '..', '/.', '.lock', invalid chars
invalid_tag_pattern = \
r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?<!\.)$"
if not re.match(invalid_tag_pattern, tag_name):
raise ValueError(f"Invalid tag name format: '{tag_name}'.")
command = ["git", "tag", "-a", tag_name, "-m", message]
try:
# Execute command, check=True will raise CalledProcessError on failure
self.log_and_execute(command, working_directory, check=True)
self.logger.info(f"Tag '{tag_name}' created successfully.")
except GitCommandError as e:
# Check the original exception's stderr if available for specific errors
stderr_content = e.stderr.lower() if e.stderr else ""
if "already exists" in stderr_content:
exists_msg = f"Tag '{tag_name}' already exists."
self.logger.error(exists_msg)
# Re-raise with a more specific message
raise GitCommandError(
exists_msg,
command=command,
stderr=e.stderr
) from e
else:
# Log and re-raise other Git command errors
self.logger.error(f"Failed to create tag '{tag_name}': {e}")
raise
except ValueError as ve:
# Re-raise validation errors directly
self.logger.error(f"Validation error creating tag: {ve}")
raise ve
except Exception as e:
# Catch unexpected errors during tag creation
self.logger.exception(f"Unexpected error creating tag '{tag_name}': {e}")
raise GitCommandError(
f"Unexpected error creating tag: {e}",
command=command
) from e
def checkout_tag(self, working_directory, tag_name):
"""
Checks out a specific Git tag (detached HEAD state).
Args:
working_directory (str): Path to the local Git repository.
tag_name (str): The name of the tag to check out.
Returns:
bool: True if checkout was successful.
Raises:
GitCommandError: If tag not found or checkout fails.
ValueError: If arguments invalid.
"""
self.logger.info(f"Checking out tag '{tag_name}' in '{working_directory}'...")
if not tag_name:
raise ValueError("Tag name cannot be empty for checkout.")
command = ["git", "checkout", tag_name]
try:
# check=True: raises CalledProcessError if tag not found or other error
result = self.log_and_execute(
command,
working_directory,
check=True
)
self.logger.info(f"Successfully checked out tag '{tag_name}'.")
# Log detached HEAD warning based on stderr/stdout
output_lower = (result.stderr + result.stdout).lower()
if "detached head" in output_lower:
self.logger.warning(
"Repository is now in a 'detached HEAD' state. "
"New commits will not belong to any branch unless "
"one is created."
)
return True # Indicate success
except GitCommandError as e:
# Analyze the error further if needed (e.g., specific message for tag not found)
stderr_content = e.stderr.lower() if e.stderr else ""
not_found_patterns = [
"did not match any file(s)",
f"pathspec '{tag_name.lower()}' did not match any file(s)" # Check lowercase too
]
if any(p in stderr_content for p in not_found_patterns):
not_found_msg = f"Tag '{tag_name}' not found or is not a valid reference."
self.logger.error(not_found_msg)
# Re-raise with specific message
raise GitCommandError(
not_found_msg,
command=command,
stderr=e.stderr
) from e
else:
# Re-raise other Git command errors
self.logger.error(f"Failed to checkout tag '{tag_name}': {e}")
raise
except ValueError as ve:
# Re-raise validation errors
self.logger.error(f"Validation error checking out tag: {ve}")
raise ve
except Exception as e:
# Catch unexpected errors
self.logger.exception(f"Unexpected error checking out tag '{tag_name}': {e}")
raise GitCommandError(
f"Unexpected error checking out tag: {e}",
command=command
) from e