# git_commands.py import os import subprocess import logging import re # Ensure re is imported class GitCommandError(Exception): """ Custom exception for handling Git command errors. Includes the original command and error details if available. """ def __init__(self, message, command=None, stderr=None): """ Initialize the GitCommandError. Args: message (str): The error message. command (list, optional): The command that caused the error. Defaults to None. stderr (str, optional): The standard error output. Defaults to None. """ super().__init__(message) self.command = command self.stderr = stderr def __str__(self): """Return a formatted string representation of the error.""" base_message = super().__str__() details = [] if self.command: # Ensure command is list of strings for join safe_command = [str(part) for part in self.command] command_str = ' '.join(safe_command) details.append(f"Command: '{command_str}'") if self.stderr: stderr_str = self.stderr.strip() details.append(f"Stderr: {stderr_str}") if details: details_str = '; '.join(details) return f"{base_message} ({details_str})" else: return base_message class GitCommands: """ Manages Git commands execution, logging, and error handling. Includes tag and branch management functionalities. """ def __init__(self, logger): """ Initializes the GitCommands with a logger. Args: logger (logging.Logger): Logger instance for logging messages. """ if not isinstance(logger, logging.Logger): # Raise error if logger is not a valid logger instance raise ValueError("A valid logging.Logger instance is required.") self.logger = logger def log_and_execute(self, command, working_directory, check=True): """ Executes a command within a specific working directory, logs it, and handles errors. Args: command (list): List representing the command and its arguments. working_directory (str): Path to the directory for command execution. check (bool, optional): Raise error on non-zero exit code if True. Defaults to True. Returns: subprocess.CompletedProcess: The result object from subprocess.run. Raises: GitCommandError: If path invalid, command fails (and check=True), etc. ValueError: If working_directory is None or empty. """ # Ensure all parts of command are strings for logging and execution safe_command = [str(part) for part in command] command_str = ' '.join(safe_command) log_message = f"Executing: {command_str}" self.logger.debug(log_message) # Validate working directory if not working_directory: msg = "Working directory cannot be None or empty." self.logger.error(msg) raise ValueError(msg) abs_path = os.path.abspath(working_directory) if not os.path.isdir(abs_path): msg = f"Working directory does not exist or is not a directory: {abs_path}" self.logger.error(msg) # Include command context in the error raise GitCommandError(msg, command=safe_command) cwd = abs_path self.logger.debug(f"Working directory: {cwd}") try: # Platform-specific setup to hide console window on Windows startupinfo = None creationflags = 0 # Default creation flags if os.name == 'nt': startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE # Optional: Use CREATE_NO_WINDOW if std I/O redirection not needed # creationflags = subprocess.CREATE_NO_WINDOW # Execute the command result = subprocess.run( safe_command, # Use the validated command list cwd=cwd, capture_output=True, text=True, check=check, encoding='utf-8', errors='replace', # Handle potential decoding errors startupinfo=startupinfo, creationflags=creationflags ) # Log stdout and stderr clearly stdout_log = result.stdout.strip() if result.stdout else "" stderr_log = result.stderr.strip() if result.stderr else "" # Log success with output details # Use DEBUG level for full output, INFO for summary? Let's use INFO for now. self.logger.info( f"Command successful. Output:\n" f"--- stdout ---\n{stdout_log}\n" f"--- stderr ---\n{stderr_log}\n---" ) return result except subprocess.CalledProcessError as e: # Log detailed error information from the failed process stderr_err = e.stderr.strip() if e.stderr else "" stdout_err = e.stdout.strip() if e.stdout else "" error_log_msg = ( f"Command failed with return code {e.returncode} in '{cwd}'.\n" f"--- command ---\n{command_str}\n" f"--- stderr ---\n{stderr_err}\n" f"--- stdout ---\n{stdout_err}\n" f"---" ) self.logger.error(error_log_msg) # Wrap the original exception for consistent error handling raise GitCommandError( f"Git command failed in '{cwd}'.", command=safe_command, stderr=e.stderr ) from e except FileNotFoundError as e: # Log error if the command itself (e.g., 'git') is not found error_msg = ( f"Command not found: '{safe_command[0]}'. Is Git installed " f"and in system PATH? (WD: '{cwd}')" ) self.logger.error(error_msg) # Log exception details at debug level for more info if needed self.logger.debug(f"FileNotFoundError details: {e}") raise GitCommandError(error_msg, command=safe_command) from e except PermissionError as e: # Handle errors related to execution permissions error_msg = f"Permission denied executing command in '{cwd}'." self.logger.error(error_msg) self.logger.debug(f"PermissionError details: {e}") raise GitCommandError(error_msg, command=safe_command, stderr=str(e)) from e except Exception as e: # Catch any other unexpected errors during execution self.logger.exception(f"Unexpected error executing command in '{cwd}': {e}") raise GitCommandError( f"Unexpected error during command execution: {e}", command=safe_command ) from e def create_git_bundle(self, working_directory, bundle_path): """ Creates a Git bundle file from the repository in working_directory. Args: working_directory (str): Path to the local Git repository. bundle_path (str): Full path where the bundle file should be saved. Raises: GitCommandError: If command fails or path invalid. ValueError: If working_directory is None or empty. """ normalized_bundle_path = os.path.normpath(bundle_path) normalized_bundle_path = normalized_bundle_path.replace("\\", "/") command = ["git", "bundle", "create", normalized_bundle_path, "--all"] self.logger.info(f"Attempting to create Git bundle: {normalized_bundle_path}") try: result = self.log_and_execute( command, working_directory, check=False # Check result manually ) if result.returncode != 0: stderr_lower = result.stderr.lower() if result.stderr else "" if "refusing to create empty bundle" in stderr_lower: self.logger.warning(f"Bundle creation skipped: Empty repo at '{working_directory}'.") # Not an error, do not raise else: error_msg = f"Git bundle command failed (code {result.returncode})." raise GitCommandError(error_msg, command, result.stderr) elif not os.path.exists(normalized_bundle_path) or \ os.path.getsize(normalized_bundle_path) == 0: self.logger.warning(f"Bundle success, but file '{normalized_bundle_path}' missing/empty.") # Consider if this is an error case else: self.logger.info(f"Git bundle created successfully: '{normalized_bundle_path}'.") except (GitCommandError, ValueError) as e: self.logger.error(f"Failed create bundle for '{working_directory}': {e}") raise except Exception as e: self.logger.exception(f"Unexpected bundle error for '{working_directory}': {e}") raise GitCommandError(f"Unexpected bundle error: {e}", command) from e def fetch_from_git_bundle(self, working_directory, bundle_path): """ Fetches changes from a Git bundle file and merges them. Args: working_directory (str): Path to the local Git repository. bundle_path (str): Path to the Git bundle file. Raises: GitCommandError: If fetch or merge fails. ValueError: If arguments invalid. """ normalized_bundle_path = os.path.normpath(bundle_path).replace("\\", "/") self.logger.info(f"Fetching from '{normalized_bundle_path}' into '{working_directory}'") fetch_command = ["git", "fetch", normalized_bundle_path] merge_command = ["git", "merge", "FETCH_HEAD", "--no-ff"] # No fast-forward merge try: # 1. Fetch changes self.logger.debug("Executing fetch command...") self.log_and_execute(fetch_command, working_directory, check=True) # Error if fetch fails self.logger.info("Successfully fetched from Git bundle.") # 2. Merge fetched changes self.logger.debug("Executing merge command...") merge_result = self.log_and_execute(merge_command, working_directory, check=False) # Check manually # Analyze merge result stdout_log = merge_result.stdout.strip() if merge_result.stdout else "" stderr_log = merge_result.stderr.strip() if merge_result.stderr else "" if merge_result.returncode == 0: if "already up to date" in stdout_log.lower(): self.logger.info("Repository already up-to-date.") else: self.logger.info("Successfully merged fetched changes.") else: # Merge failed, likely conflicts output_lower = (stderr_log + stdout_log).lower() if "conflict" in output_lower: conflict_msg = (f"Merge conflict after fetching. Resolve manually " f"in '{working_directory}' and commit.") self.logger.error(conflict_msg) # Raise specific error for caller raise GitCommandError(conflict_msg, merge_command, merge_result.stderr) else: # Other merge error error_msg = f"Merge command failed (code {merge_result.returncode})." self.logger.error(error_msg) raise GitCommandError(error_msg, merge_command, merge_result.stderr) except (GitCommandError, ValueError) as e: self.logger.error(f"Fetch/merge error for '{working_directory}': {e}") raise except Exception as e: self.logger.exception(f"Unexpected fetch/merge error for '{working_directory}': {e}") raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e def prepare_svn_for_git(self, working_directory): """ Prepares a directory for Git: initializes repo and ensures .gitignore. Args: working_directory (str): Path to the directory. Raises: GitCommandError/ValueError/IOError: On failure. """ self.logger.info(f"Preparing directory for Git: '{working_directory}'") # Validate path if not working_directory: raise ValueError("Working directory cannot be empty.") if not os.path.isdir(working_directory): raise GitCommandError(f"Directory does not exist: {working_directory}") # Define paths gitignore_path = os.path.join(working_directory, ".gitignore") git_dir_path = os.path.join(working_directory, ".git") # 1. Initialize Git repository if needed if not os.path.exists(git_dir_path): self.logger.info("No existing Git repo found. Initializing...") try: init_command = ["git", "init"] self.log_and_execute(init_command, working_directory, check=True) self.logger.info("Git repository initialized successfully.") except (GitCommandError, ValueError) as e: self.logger.error(f"Failed to initialize Git repository: {e}") raise # Re-raise to signal failure except Exception as e: self.logger.exception(f"Unexpected error initializing repo: {e}") raise GitCommandError(f"Unexpected init error: {e}") from e else: self.logger.info("Git repository already exists. Skipping init.") # 2. Ensure .gitignore exists and ignores .svn self.logger.debug(f"Checking/updating .gitignore: {gitignore_path}") try: svn_ignore_entry = ".svn" needs_write = False content_to_write = "" if not os.path.exists(gitignore_path): # File doesn't exist, create it with the entry self.logger.info("'.gitignore' not found. Creating with .svn entry.") content_to_write = f"{svn_ignore_entry}\n" needs_write = True else: # File exists, check content try: with open(gitignore_path, "r", encoding='utf-8') as f: lines = f.readlines() # Check if entry or entry/ exists is_ignored = any( line.strip() == svn_ignore_entry or \ line.strip().startswith(svn_ignore_entry + '/') for line in lines ) if not is_ignored: # Entry not found, need to append self.logger.info(f"'{svn_ignore_entry}' not found. Appending.") current_content = "".join(lines) # Add newline before entry if file doesn't end with one if not current_content.endswith('\n'): content_to_write = f"\n{svn_ignore_entry}\n" else: content_to_write = f"{svn_ignore_entry}\n" needs_write = True else: self.logger.info(f"'{svn_ignore_entry}' already ignored.") except IOError as e: # Cannot read existing file, log warning, don't modify self.logger.warning(f"Could not read existing '.gitignore': {e}.") # Write to file only if necessary if needs_write: # Use 'a'ppend mode if appending, 'w'rite if creating new mode = 'a' if os.path.exists(gitignore_path) and content_to_write.startswith("\n") else 'w' # If creating new, content shouldn't start with newline if mode == 'w': content_to_write = content_to_write.lstrip() try: with open(gitignore_path, mode, encoding='utf-8', newline='\n') as f: f.write(content_to_write) self.logger.info("Updated '.gitignore' file.") except IOError as e: self.logger.error(f"Error writing to '.gitignore': {e}") raise GitCommandError(f"Failed update .gitignore: {e}") from e except IOError as e: # Catch errors from os.path.exists or final write self.logger.error(f"Error accessing/writing '.gitignore': {e}") raise GitCommandError(f"File I/O error for .gitignore: {e}") from e except Exception as e: # Catch other unexpected errors self.logger.exception(f"Unexpected error managing '.gitignore': {e}") raise GitCommandError(f"Unexpected error with .gitignore: {e}") from e self.logger.info(f"Directory preparation complete for '{working_directory}'.") def git_commit(self, working_directory, message="Autocommit"): """ Stages all changes and commits them. """ self.logger.info(f"Attempting commit in '{working_directory}': '{message}'") try: # 1. Stage all changes add_command = ["git", "add", "."] self.logger.debug("Staging all changes (git add .)...") self.log_and_execute(add_command, working_directory, check=True) self.logger.debug("Staging successful.") # 2. Commit staged changes commit_command = ["git", "commit", "-m", message] self.logger.debug("Attempting commit...") result = self.log_and_execute(commit_command, working_directory, check=False) # Analyze result stdout_lower = result.stdout.lower() if result.stdout else "" stderr_lower = result.stderr.lower() if result.stderr else "" nothing_to_commit = False if "nothing to commit" in stdout_lower: nothing_to_commit = True if "no changes added to commit" in stdout_lower: nothing_to_commit = True if "nothing added to commit" in stdout_lower: nothing_to_commit = True # Handle Git returning 1 with no output for no changes case if result.returncode == 1 and not stderr_lower and not stdout_lower: nothing_to_commit = True if result.returncode == 0: self.logger.info("Commit successful.") return True elif nothing_to_commit: self.logger.info("No changes to commit.") return False else: # Unexpected error error_msg = f"Commit command failed (code {result.returncode})." raise GitCommandError(error_msg, commit_command, result.stderr) except (GitCommandError, ValueError) as e: self.logger.error(f"Commit process error: {e}") raise except Exception as e: self.logger.exception(f"Unexpected commit error: {e}") raise GitCommandError(f"Unexpected commit error: {e}") from e def git_status_has_changes(self, working_directory): """ Checks if the Git repository has uncommitted changes. """ self.logger.debug(f"Checking Git status in '{working_directory}'...") try: status_command = ["git", "status", "--porcelain"] result = self.log_and_execute(status_command, working_directory, check=True) # Any output from porcelain means changes exist has_changes = bool(result.stdout.strip()) self.logger.debug(f"Status check complete. Has changes: {has_changes}") return has_changes except (GitCommandError, ValueError) as e: self.logger.error(f"Status check error: {e}") raise except Exception as e: self.logger.exception(f"Unexpected status error: {e}") raise GitCommandError(f"Unexpected status error: {e}") from e def list_tags(self, working_directory): """ Lists tags with subjects, sorted newest first. """ self.logger.info(f"Listing tags with subjects in '{working_directory}'...") format_string = "%(refname:short)%09%(contents:subject)" command = ["git", "tag", "--list", f"--format={format_string}", "--sort=-creatordate"] tags_with_subjects = [] try: result = self.log_and_execute(command, working_directory, check=True) output_lines = result.stdout.splitlines() for line in output_lines: line_stripped = line.strip() if line_stripped: parts = line_stripped.split('\t', 1) # Split only on first tab tag_name = parts[0].strip() tag_subject = parts[1].strip() if len(parts) > 1 else "(No subject)" tags_with_subjects.append((tag_name, tag_subject)) count = len(tags_with_subjects) self.logger.info(f"Found {count} tags with subjects.") self.logger.debug(f"Tags found: {tags_with_subjects}") return tags_with_subjects except (GitCommandError, ValueError) as e: self.logger.error(f"Error listing tags: {e}") return [] # Return empty list on known errors except Exception as e: self.logger.exception(f"Unexpected error listing tags: {e}") return [] def create_tag(self, working_directory, tag_name, message): """ Creates an annotated tag. """ self.logger.info(f"Creating tag '{tag_name}' in '{working_directory}'") # Validate inputs if not tag_name: raise ValueError("Tag name cannot be empty.") if not message: raise ValueError("Tag message cannot be empty.") # Validate tag name format pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?" except (GitCommandError, ValueError) as e: self.logger.error(f"Error getting current branch: {e}") return "" except Exception as e: self.logger.exception(f"Unexpected error getting branch: {e}") return "" def list_branches(self, working_directory): """ Lists local Git branches. """ self.logger.info(f"Listing local branches in '{working_directory}'...") cmd = ["git", "branch", "--list", "--no-color"] branches = [] try: result = self.log_and_execute(cmd, working_directory, check=True) for line in result.stdout.splitlines(): # Remove decoration ('*' ) and whitespace name = line.lstrip('* ').strip() # Filter out potential detached HEAD message if present if name and "HEAD detached" not in name: branches.append(name) count = len(branches) self.logger.info(f"Found {count} local branches.") self.logger.debug(f"Branches: {branches}") return branches except (GitCommandError, ValueError) as e: self.logger.error(f"Error listing branches: {e}") return [] except Exception as e: self.logger.exception(f"Unexpected error listing branches: {e}") return [] def create_branch(self, working_directory, branch_name, start_point=None): """ Creates a new local Git branch. """ self.logger.info(f"Creating branch '{branch_name}' in '{working_directory}'...") if not branch_name: raise ValueError("Branch name cannot be empty.") # Branch name validation pattern = r"^(?![./]|.*([./]{2,}|[.]$|[/]$|@\{))[^ \t\n\r\f\v~^:?*[\\]+(?