# git_commands.py import os import subprocess import logging import re # Modulo per espressioni regolari (usato per validazione nomi) # --- Definizione Eccezione Personalizzata --- class GitCommandError(Exception): """ Custom exception for handling Git command errors. Includes the original command and stderr details if available. """ def __init__(self, message, command=None, stderr=None): """ Initialize the GitCommandError. Args: message (str): The main error message. command (list, optional): The command list that caused the error. stderr (str, optional): The standard error output from the command. """ super().__init__(message) self.command = command self.stderr = stderr def __str__(self): """Return a formatted string representation including command details.""" base_message = super().__str__() details = [] if self.command: # Safely convert all command parts to string for joining safe_command = [str(part) for part in self.command] command_str = " ".join(safe_command) details.append(f"Command: '{command_str}'") if self.stderr: # Add stripped stderr if available stderr_str = self.stderr.strip() if stderr_str: # Only add if stderr has content details.append(f"Stderr: {stderr_str}") # Combine base message with details if details exist if details: details_str = "; ".join(details) return f"{base_message} ({details_str})" else: return base_message # --- Classe Principale per Comandi Git --- class GitCommands: """ Manages Git command execution, logging, and error handling. Decoupled from the GUI, operates on provided directory paths. Includes functionalities for core operations, tags, and branches. """ def __init__(self, logger): """ Initializes the GitCommands class with a logger instance. Args: logger (logging.Logger): Instance for logging messages. Raises: ValueError: If the provided logger is not a valid logging.Logger. """ if not isinstance(logger, logging.Logger): raise ValueError("A valid logging.Logger instance is required.") self.logger = logger def log_and_execute(self, command, working_directory, check=True): """ Executes a shell command in a specific directory, logs details, and handles potential errors. Args: command (list): Command and arguments as a list of strings. working_directory (str): The directory to execute the command in. check (bool, optional): If True, raises CalledProcessError (wrapped in GitCommandError) if the command returns a non-zero exit code. Defaults to True. Returns: subprocess.CompletedProcess: Result object containing stdout, stderr, etc. Raises: GitCommandError: For Git-specific errors or execution issues. ValueError: If working_directory is invalid. FileNotFoundError: If the command (e.g., 'git') is not found. PermissionError: If execution permission is denied. """ # Ensure all parts of command are strings for reliable execution and logging safe_command = [str(part) for part in command] command_str = " ".join(safe_command) log_message = f"Executing: {command_str}" # Log command execution at debug level for less verbosity in standard logs self.logger.debug(log_message) # --- Validate Working Directory --- if not working_directory: msg = "Working directory cannot be None or empty." self.logger.error(msg) raise ValueError(msg) abs_path = os.path.abspath(working_directory) if not os.path.isdir(abs_path): msg = f"Working directory does not exist or is not a directory: {abs_path}" self.logger.error(msg) # Raise GitCommandError to signal issue related to command execution context raise GitCommandError(msg, command=safe_command) cwd = abs_path self.logger.debug(f"Effective Working Directory: {cwd}") # --- Execute Command --- try: # Platform-specific setup to hide console window on Windows startupinfo = None # creationflags are used to control process creation (e.g., hide window) creationflags = 0 if os.name == "nt": # Windows specific settings startupinfo = subprocess.STARTUPINFO() # Prevent console window from showing startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE # Alternative flag to completely detach from console (might affect stdio) # creationflags = subprocess.CREATE_NO_WINDOW # Run the command using subprocess.run result = subprocess.run( safe_command, # Use the command list with string parts cwd=cwd, # Set working directory capture_output=True, # Capture stdout and stderr text=True, # Decode output as text (UTF-8 default) check=check, # Raise exception on non-zero exit code if True encoding="utf-8", # Specify encoding explicitly errors="replace", # Handle potential decoding errors gracefully startupinfo=startupinfo, # Windows: hide console window creationflags=creationflags, # Windows: additional process flags ) # Log command output for debugging and info stdout_log = result.stdout.strip() if result.stdout else "" stderr_log = result.stderr.strip() if result.stderr else "" # Log success differently based on whether check=True was used # If check=True, CalledProcessError would have been raised on failure # If check=False, we log success only if return code is 0 if check or result.returncode == 0: self.logger.info( f"Command successful. Output:\n" f"--- stdout ---\n{stdout_log}\n" f"--- stderr ---\n{stderr_log}\n---" ) # Note: If check=False and returncode != 0, errors are typically # handled by the calling method that analyzes the result. return result except subprocess.CalledProcessError as e: # This block runs only if check=True and the command failed stderr_err = e.stderr.strip() if e.stderr else "" stdout_err = e.stdout.strip() if e.stdout else "" error_log_msg = ( f"Command failed with return code {e.returncode} in '{cwd}'.\n" f"--- command ---\n{command_str}\n" f"--- stderr ---\n{stderr_err}\n" f"--- stdout ---\n{stdout_err}\n---" ) self.logger.error(error_log_msg) # Wrap the original exception in our custom GitCommandError raise GitCommandError( f"Git command failed in '{cwd}'.", command=safe_command, stderr=e.stderr ) from e # Preserve original exception context except FileNotFoundError as e: # Handle error if 'git' command (or another part) is not found error_msg = ( f"Command not found: '{safe_command[0]}'. Is Git installed " f"and in system PATH? (Working directory: '{cwd}')" ) self.logger.error(error_msg) self.logger.debug( f"FileNotFoundError details: {e}" ) # Log full details only at debug level raise GitCommandError(error_msg, command=safe_command) from e except PermissionError as e: # Handle errors due to lack of execution permissions error_msg = f"Permission denied executing command in '{cwd}'." self.logger.error(error_msg) self.logger.debug(f"PermissionError details: {e}") raise GitCommandError(error_msg, command=safe_command, stderr=str(e)) from e except Exception as e: # Catch any other unexpected errors during subprocess execution self.logger.exception(f"Unexpected error executing command in '{cwd}': {e}") raise GitCommandError( f"Unexpected command execution error: {e}", command=safe_command ) from e # --- Core Repo Operations --- def prepare_svn_for_git(self, working_directory): """ Prepares a directory for Git use: initializes if needed, ensures .gitignore ignores '.svn'. Args: working_directory (str): Path to the target directory. Raises: GitCommandError, ValueError: If operations fail. """ self.logger.info(f"Preparing directory for Git: '{working_directory}'") # Basic validation if not working_directory: raise ValueError("Working directory cannot be None or empty.") if not os.path.isdir(working_directory): raise GitCommandError(f"Directory does not exist: {working_directory}") # Define relevant paths gitignore_path = os.path.join(working_directory, ".gitignore") git_dir_path = os.path.join(working_directory, ".git") # 1. Initialize Git repository if '.git' directory doesn't exist if not os.path.exists(git_dir_path): self.logger.info("No existing Git repository found. Initializing...") try: init_command = ["git", "init"] # Use check=True to ensure init succeeds self.log_and_execute(init_command, working_directory, check=True) self.logger.info("Git repository initialized successfully.") except (GitCommandError, ValueError) as e: # Handle specific errors from git init self.logger.error(f"Failed to initialize Git repository: {e}") raise # Re-raise to signal preparation failure except Exception as e: # Handle unexpected errors during init self.logger.exception(f"Unexpected error initializing repository: {e}") raise GitCommandError(f"Unexpected init error: {e}") from e else: # Repository already exists self.logger.info("Git repository already exists. Skipping initialization.") # 2. Ensure .gitignore handles '.svn' self.logger.debug(f"Checking/updating .gitignore file: {gitignore_path}") try: svn_ignore_entry = ".svn" # Entry to ensure is ignored needs_write = False # Flag to track if file needs modification content_to_write = "" # Content to add if needed if not os.path.exists(gitignore_path): # .gitignore doesn't exist, create it with the entry self.logger.info( "'.gitignore' file not found. Creating with .svn entry." ) content_to_write = f"{svn_ignore_entry}\n" needs_write = True else: # .gitignore exists, check if '.svn' is already ignored try: with open(gitignore_path, "r", encoding="utf-8") as f: lines = f.readlines() # Check ignores .svn directory or files within it is_ignored = any( line.strip() == svn_ignore_entry or line.strip().startswith(svn_ignore_entry + "/") for line in lines ) if not is_ignored: # Not ignored, prepare to append the entry self.logger.info( f"'{svn_ignore_entry}' not found. Appending..." ) current_content = "".join(lines) # Ensure it's added on a new line if not current_content.endswith("\n"): content_to_write = f"\n{svn_ignore_entry}\n" else: content_to_write = f"{svn_ignore_entry}\n" needs_write = True # Mark file for update else: # Already ignored, no action needed self.logger.info(f"'{svn_ignore_entry}' entry already present.") except IOError as e: # Handle error reading the existing file self.logger.warning( f"Could not read existing '.gitignore': {e}. " f"Cannot verify {svn_ignore_entry} entry." ) # Decide on recovery: maybe try appending anyway? Risky. # For safety, let's not modify if we can't read it. needs_write = False # Write to file only if modification is necessary if needs_write: # Use 'a'ppend mode if file exists, 'w'rite mode if new mode = "a" if os.path.exists(gitignore_path) else "w" try: # Write with consistent newline handling with open( gitignore_path, mode, encoding="utf-8", newline="\n" ) as f: f.write(content_to_write) self.logger.info("Updated '.gitignore' file successfully.") except IOError as e: # Handle error during write/append self.logger.error(f"Error writing to '.gitignore': {e}") raise GitCommandError(f"Failed to update .gitignore: {e}") from e except IOError as e: # Catch errors from os.path.exists or final write attempt self.logger.error(f"Error accessing or writing '.gitignore': {e}") raise GitCommandError(f"File I/O error for .gitignore: {e}") from e except Exception as e: # Catch other unexpected errors self.logger.exception(f"Unexpected error managing '.gitignore': {e}") raise GitCommandError(f"Unexpected error with .gitignore: {e}") from e self.logger.info(f"Directory preparation complete for '{working_directory}'.") def create_git_bundle(self, working_directory, bundle_path): """Creates a Git bundle file containing all refs.""" # Normalize path for cross-OS compatibility and command usage normalized_bundle_path = os.path.normpath(bundle_path) normalized_bundle_path = normalized_bundle_path.replace("\\", "/") command = ["git", "bundle", "create", normalized_bundle_path, "--all"] self.logger.info(f"Attempting to create bundle: {normalized_bundle_path}") try: # Use check=False to manually handle 'empty bundle' warning result = self.log_and_execute(command, working_directory, check=False) if result.returncode != 0: # Command failed, check stderr for specific non-fatal cases stderr_lower = result.stderr.lower() if result.stderr else "" if "refusing to create empty bundle" in stderr_lower: # This is a warning, not an error for our purposes self.logger.warning( f"Bundle creation skipped: No commits in '{working_directory}'." ) # Do not raise error, let caller handle info else: # An actual error occurred error_msg = f"Bundle command failed (code {result.returncode})." raise GitCommandError( error_msg, command=command, stderr=result.stderr ) # Check file existence and size even on success (paranoid check) elif ( not os.path.exists(normalized_bundle_path) or os.path.getsize(normalized_bundle_path) == 0 ): self.logger.warning(f"Bundle cmd success, but file missing/empty.") else: self.logger.info( f"Bundle created successfully: '{normalized_bundle_path}'." ) except (GitCommandError, ValueError) as e: self.logger.error(f"Failed create bundle for '{working_directory}': {e}") raise except Exception as e: self.logger.exception(f"Unexpected bundle creation error: {e}") raise GitCommandError( f"Unexpected bundle error: {e}", command=command ) from e def fetch_from_git_bundle(self, working_directory, bundle_path): """Fetches from a bundle file and merges fetched refs.""" normalized_bundle_path = os.path.normpath(bundle_path).replace("\\", "/") self.logger.info( f"Fetching from '{normalized_bundle_path}' into '{working_directory}'" ) fetch_command = ["git", "fetch", normalized_bundle_path] # Merge FETCH_HEAD, creating merge commit if necessary (--no-ff) merge_command = ["git", "merge", "FETCH_HEAD", "--no-ff"] try: # 1. Fetch self.logger.debug("Executing fetch...") self.log_and_execute(fetch_command, working_directory, check=True) self.logger.info("Fetch successful.") # 2. Merge self.logger.debug("Executing merge...") # Use check=False as conflicts are expected failures merge_result = self.log_and_execute( merge_command, working_directory, check=False ) # Analyze merge result stdout_log = merge_result.stdout.strip() if merge_result.stdout else "" stderr_log = merge_result.stderr.strip() if merge_result.stderr else "" if merge_result.returncode == 0: # Success or already up-to-date if "already up to date" in stdout_log.lower(): self.logger.info("Repository already up-to-date.") else: self.logger.info("Merge successful.") else: # Merge failed, likely conflicts output_lower = (stderr_log + stdout_log).lower() if "conflict" in output_lower: conflict_msg = ( f"Merge conflict occurred. Resolve manually in " f"'{working_directory}' and commit." ) self.logger.error(conflict_msg) # Raise specific error for caller to handle raise GitCommandError( conflict_msg, command=merge_command, stderr=merge_result.stderr ) else: # Other merge error error_msg = f"Merge failed (code {merge_result.returncode})." self.logger.error(error_msg) raise GitCommandError( error_msg, command=merge_command, stderr=merge_result.stderr ) except (GitCommandError, ValueError) as e: self.logger.error(f"Fetch/merge error for '{working_directory}': {e}") raise except Exception as e: self.logger.exception(f"Unexpected fetch/merge error: {e}") raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e # --- Commit and Status --- def git_commit(self, working_directory, message): """Stages all changes and commits them with the given message.""" self.logger.info( f"Attempting commit in '{working_directory}' with msg: '{message}'" ) if not message: # Disallow empty commit messages programmatically raise ValueError("Commit message cannot be empty.") try: # 1. Stage all changes using 'git add .' add_command = ["git", "add", "."] self.logger.debug("Staging all changes...") # Use check=True to ensure staging succeeds self.log_and_execute(add_command, working_directory, check=True) self.logger.debug("Staging successful.") # 2. Commit staged changes commit_command = ["git", "commit", "-m", message] self.logger.debug("Attempting commit...") # Use check=False to handle 'nothing to commit' case gracefully result = self.log_and_execute( commit_command, working_directory, check=False ) # Analyze commit result stdout_lower = result.stdout.lower() if result.stdout else "" stderr_lower = result.stderr.lower() if result.stderr else "" if result.returncode == 0: # Standard successful commit self.logger.info("Commit successful.") return True # Check for various 'nothing to commit' messages elif ( "nothing to commit" in stdout_lower or "no changes added to commit" in stdout_lower or "nothing added to commit" in stdout_lower or (result.returncode == 1 and not stderr_lower and not stdout_lower) ): # Common non-error case: repo was clean or add didn't stage anything new self.logger.info("No changes were available to commit.") return False # Indicate no commit was made else: # An unexpected error occurred during the commit error_msg = ( f"Commit cmd failed unexpectedly (code {result.returncode})." ) # Details likely logged by log_and_execute already raise GitCommandError( error_msg, command=commit_command, stderr=result.stderr ) except (GitCommandError, ValueError) as e: # Catch errors from staging or commit steps, or validation self.logger.error(f"Error during staging or commit: {e}") raise # Re-raise the specific error except Exception as e: # Catch any other unexpected error self.logger.exception(f"Unexpected error during commit process: {e}") raise GitCommandError(f"Unexpected commit error: {e}") from e def git_status_has_changes(self, working_directory): """Checks if the Git repository has uncommitted changes.""" self.logger.debug(f"Checking Git status in '{working_directory}'...") try: # Use '--porcelain' for reliable script parsing status_command = ["git", "status", "--porcelain"] # Use check=True to ensure the status command itself runs correctly result = self.log_and_execute(status_command, working_directory, check=True) # If porcelain output is non-empty, there are changes has_changes = bool(result.stdout.strip()) self.logger.debug(f"Status check complete. Has changes: {has_changes}") return has_changes except (GitCommandError, ValueError) as e: self.logger.error(f"Status check error: {e}") raise # Re-raise specific error except Exception as e: self.logger.exception(f"Unexpected status check error: {e}") raise GitCommandError(f"Unexpected status error: {e}") from e # --- Tag Management --- def list_tags(self, working_directory): """Lists tags with subjects, sorted by creation date (desc).""" self.logger.info(f"Listing tags with subjects in '{working_directory}'...") # Format: TagnameSubjectLine fmt = "%(refname:short)%09%(contents:subject)" cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"] tags_data = [] # List to store (name, subject) tuples try: result = self.log_and_execute(cmd, working_directory, check=True) for line in result.stdout.splitlines(): line_strip = line.strip() if line_strip: # Split only on the first tab to handle subjects with tabs parts = line_strip.split("\t", 1) name = parts[0].strip() # Provide default if subject is missing subject = parts[1].strip() if len(parts) > 1 else "(No subject)" tags_data.append((name, subject)) self.logger.info(f"Found {len(tags_data)} tags.") self.logger.debug(f"Tags found: {tags_data}") return tags_data except (GitCommandError, ValueError) as e: # Log error but return empty list for graceful GUI handling self.logger.error(f"Error listing tags: {e}") return [] except Exception as e: # Log unexpected errors self.logger.exception(f"Unexpected error listing tags: {e}") return [] def create_tag(self, working_directory, tag_name, message): """Creates a new annotated Git tag.""" self.logger.info(f"Creating tag '{tag_name}' in '{working_directory}'") # Validate inputs if not tag_name: raise ValueError("Tag name cannot be empty.") if not message: raise ValueError("Tag message cannot be empty.") # Validate tag name format using regex pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?