# git_commands.py import os import subprocess import logging import re # Modulo per espressioni regolari (usato per validazione nomi) # --- Definizione Eccezione Personalizzata --- class GitCommandError(Exception): """ Custom exception for handling Git command errors. Includes the original command and stderr details if available. """ def __init__(self, message, command=None, stderr=None): """ Initialize the GitCommandError. Args: message (str): The main error message. command (list, optional): The command list that caused the error. stderr (str, optional): The standard error output from the command. """ super().__init__(message) self.command = command self.stderr = stderr def __str__(self): """Return a formatted string representation including command details.""" base_message = super().__str__() details = [] if self.command: # Safely convert all command parts to string for joining safe_command = [str(part) for part in self.command] command_str = " ".join(safe_command) details.append(f"Command: '{command_str}'") if self.stderr: # Add stripped stderr if available stderr_str = self.stderr.strip() if stderr_str: # Only add if stderr has content details.append(f"Stderr: {stderr_str}") # Combine base message with details if details exist if details: details_str = "; ".join(details) return f"{base_message} ({details_str})" else: return base_message # --- Classe Principale per Comandi Git --- class GitCommands: """ Manages Git command execution, logging, and error handling. Decoupled from the GUI, operates on provided directory paths. Includes functionalities for core operations, tags, and branches. """ def __init__(self, logger): """ Initializes the GitCommands class with a logger instance. Args: logger (logging.Logger): Instance for logging messages. Raises: ValueError: If the provided logger is not a valid logging.Logger. """ if not isinstance(logger, logging.Logger): raise ValueError("A valid logging.Logger instance is required.") self.logger = logger def log_and_execute(self, command, working_directory, check=True, log_output_level=logging.INFO): """ Executes a shell command, logs details, handles errors, and controls output logging level. Args: command (list): Command and arguments as a list of strings. working_directory (str): The directory to execute the command in. check (bool, optional): If True, raises GitCommandError on non-zero exit. Defaults to True. log_output_level (int, optional): Logging level for stdout/stderr on success. Defaults to logging.INFO. Use logging.DEBUG to hide noisy command output from INFO logs. Returns: subprocess.CompletedProcess: Result object. Raises: GitCommandError, ValueError, FileNotFoundError, PermissionError. """ # --- FINE MODIFICA --- safe_command = [str(part) for part in command] command_str = " ".join(safe_command) # Log command execution always at DEBUG level self.logger.debug(f"Executing in '{working_directory}': {command_str}") # --- Validate Working Directory --- # (Logica validazione working_directory invariata) if not working_directory: msg = "Working directory cannot be None or empty." self.logger.error(msg) raise ValueError(msg) # Use '.' as a special case for current working directory if needed by caller if working_directory == ".": cwd = os.getcwd() else: abs_path = os.path.abspath(working_directory) if not os.path.isdir(abs_path): msg = f"Working directory does not exist or is not a directory: {abs_path}" self.logger.error(msg) raise GitCommandError(msg, command=safe_command) cwd = abs_path # self.logger.debug(f"Effective Working Directory: {cwd}") # Already logged above # --- Execute Command --- try: # (Logica startupinfo/creationflags invariata) startupinfo = None creationflags = 0 if os.name == "nt": startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE result = subprocess.run( safe_command, cwd=cwd, capture_output=True, text=True, check=check, encoding="utf-8", errors="replace", startupinfo=startupinfo, creationflags=creationflags, ) # Log command output based on specified level for success # Error output is always logged at ERROR level below if check or result.returncode == 0: # --- MODIFICA: Usa log_output_level per l'output di successo --- # Only log stdout/stderr if the requested level is met by the logger config if self.logger.isEnabledFor(log_output_level): stdout_log = result.stdout.strip() if result.stdout else "" stderr_log = result.stderr.strip() if result.stderr else "" # Use the passed level for logging the output self.logger.log( log_output_level, f"Command successful. Output:\n" f"--- stdout ---\n{stdout_log}\n" f"--- stderr ---\n{stderr_log}\n---" ) else: # Log minimal success message if output level is suppressed self.logger.debug(f"Command successful (output logging suppressed by level).") # --- FINE MODIFICA --- return result except subprocess.CalledProcessError as e: # (Gestione CalledProcessError invariata - logga sempre a ERROR) stderr_err = e.stderr.strip() if e.stderr else "" stdout_err = e.stdout.strip() if e.stdout else "" error_log_msg = ( f"Command failed with return code {e.returncode} in '{cwd}'.\n" f"--- command ---\n{command_str}\n" f"--- stderr ---\n{stderr_err}\n" f"--- stdout ---\n{stdout_err}\n---" ) self.logger.error(error_log_msg) raise GitCommandError( f"Git command failed in '{cwd}'.", command=safe_command, stderr=e.stderr ) from e except FileNotFoundError as e: # (Gestione FileNotFoundError invariata) error_msg = f"Command not found: '{safe_command[0]}'. Is Git installed and in system PATH? (Working directory: '{cwd}')" self.logger.error(error_msg) self.logger.debug(f"FileNotFoundError details: {e}") raise GitCommandError(error_msg, command=safe_command) from e except PermissionError as e: # (Gestione PermissionError invariata) error_msg = f"Permission denied executing command in '{cwd}'." self.logger.error(error_msg) self.logger.debug(f"PermissionError details: {e}") raise GitCommandError(error_msg, command=safe_command, stderr=str(e)) from e except Exception as e: # (Gestione Exception generica invariata) self.logger.exception(f"Unexpected error executing command in '{cwd}': {e}") raise GitCommandError( f"Unexpected command execution error: {e}", command=safe_command ) from e # --- Core Repo Operations --- def prepare_svn_for_git(self, working_directory): """ Prepares a directory for Git use: initializes if needed, ensures .gitignore ignores '.svn'. Args: working_directory (str): Path to the target directory. Raises: GitCommandError, ValueError: If operations fail. """ self.logger.info(f"Preparing directory for Git: '{working_directory}'") # Basic validation if not working_directory: raise ValueError("Working directory cannot be None or empty.") if not os.path.isdir(working_directory): raise GitCommandError(f"Directory does not exist: {working_directory}") # Define relevant paths gitignore_path = os.path.join(working_directory, ".gitignore") git_dir_path = os.path.join(working_directory, ".git") # 1. Initialize Git repository if '.git' directory doesn't exist if not os.path.exists(git_dir_path): self.logger.info("No existing Git repository found. Initializing...") try: init_command = ["git", "init"] # Use check=True to ensure init succeeds self.log_and_execute(init_command, working_directory, check=True) self.logger.info("Git repository initialized successfully.") except (GitCommandError, ValueError) as e: # Handle specific errors from git init self.logger.error(f"Failed to initialize Git repository: {e}") raise # Re-raise to signal preparation failure except Exception as e: # Handle unexpected errors during init self.logger.exception(f"Unexpected error initializing repository: {e}") raise GitCommandError(f"Unexpected init error: {e}") from e else: # Repository already exists self.logger.info("Git repository already exists. Skipping initialization.") # 2. Ensure .gitignore handles '.svn' self.logger.debug(f"Checking/updating .gitignore file: {gitignore_path}") try: svn_ignore_entry = ".svn" # Entry to ensure is ignored needs_write = False # Flag to track if file needs modification content_to_write = "" # Content to add if needed if not os.path.exists(gitignore_path): # .gitignore doesn't exist, create it with the entry self.logger.info( "'.gitignore' file not found. Creating with .svn entry." ) content_to_write = f"{svn_ignore_entry}\n" needs_write = True else: # .gitignore exists, check if '.svn' is already ignored try: with open(gitignore_path, "r", encoding="utf-8") as f: lines = f.readlines() # Check ignores .svn directory or files within it is_ignored = any( line.strip() == svn_ignore_entry or line.strip().startswith(svn_ignore_entry + "/") for line in lines ) if not is_ignored: # Not ignored, prepare to append the entry self.logger.info( f"'{svn_ignore_entry}' not found. Appending..." ) current_content = "".join(lines) # Ensure it's added on a new line if not current_content.endswith("\n"): content_to_write = f"\n{svn_ignore_entry}\n" else: content_to_write = f"{svn_ignore_entry}\n" needs_write = True # Mark file for update else: # Already ignored, no action needed self.logger.info(f"'{svn_ignore_entry}' entry already present.") except IOError as e: # Handle error reading the existing file self.logger.warning( f"Could not read existing '.gitignore': {e}. " f"Cannot verify {svn_ignore_entry} entry." ) # Decide on recovery: maybe try appending anyway? Risky. # For safety, let's not modify if we can't read it. needs_write = False # Write to file only if modification is necessary if needs_write: # Use 'a'ppend mode if file exists, 'w'rite mode if new mode = "a" if os.path.exists(gitignore_path) else "w" try: # Write with consistent newline handling with open( gitignore_path, mode, encoding="utf-8", newline="\n" ) as f: f.write(content_to_write) self.logger.info("Updated '.gitignore' file successfully.") except IOError as e: # Handle error during write/append self.logger.error(f"Error writing to '.gitignore': {e}") raise GitCommandError(f"Failed to update .gitignore: {e}") from e except IOError as e: # Catch errors from os.path.exists or final write attempt self.logger.error(f"Error accessing or writing '.gitignore': {e}") raise GitCommandError(f"File I/O error for .gitignore: {e}") from e except Exception as e: # Catch other unexpected errors self.logger.exception(f"Unexpected error managing '.gitignore': {e}") raise GitCommandError(f"Unexpected error with .gitignore: {e}") from e self.logger.info(f"Directory preparation complete for '{working_directory}'.") def create_git_bundle(self, working_directory, bundle_path): """Creates a Git bundle file containing all refs.""" # Normalize path for cross-OS compatibility and command usage normalized_bundle_path = os.path.normpath(bundle_path) normalized_bundle_path = normalized_bundle_path.replace("\\", "/") command = ["git", "bundle", "create", normalized_bundle_path, "--all"] self.logger.info(f"Attempting to create bundle: {normalized_bundle_path}") try: # Use check=False to manually handle 'empty bundle' warning result = self.log_and_execute(command, working_directory, check=False) if result.returncode != 0: # Command failed, check stderr for specific non-fatal cases stderr_lower = result.stderr.lower() if result.stderr else "" if "refusing to create empty bundle" in stderr_lower: # This is a warning, not an error for our purposes self.logger.warning( f"Bundle creation skipped: No commits in '{working_directory}'." ) # Do not raise error, let caller handle info else: # An actual error occurred error_msg = f"Bundle command failed (code {result.returncode})." raise GitCommandError( error_msg, command=command, stderr=result.stderr ) # Check file existence and size even on success (paranoid check) elif ( not os.path.exists(normalized_bundle_path) or os.path.getsize(normalized_bundle_path) == 0 ): self.logger.warning(f"Bundle cmd success, but file missing/empty.") else: self.logger.info( f"Bundle created successfully: '{normalized_bundle_path}'." ) except (GitCommandError, ValueError) as e: self.logger.error(f"Failed create bundle for '{working_directory}': {e}") raise except Exception as e: self.logger.exception(f"Unexpected bundle creation error: {e}") raise GitCommandError( f"Unexpected bundle error: {e}", command=command ) from e def fetch_from_git_bundle(self, working_directory, bundle_path): """Fetches from a bundle file and merges fetched refs.""" normalized_bundle_path = os.path.normpath(bundle_path).replace("\\", "/") self.logger.info( f"Fetching from '{normalized_bundle_path}' into '{working_directory}'" ) fetch_command = ["git", "fetch", normalized_bundle_path] # Merge FETCH_HEAD, creating merge commit if necessary (--no-ff) merge_command = ["git", "merge", "FETCH_HEAD", "--no-ff"] try: # 1. Fetch self.logger.debug("Executing fetch...") self.log_and_execute(fetch_command, working_directory, check=True) self.logger.info("Fetch successful.") # 2. Merge self.logger.debug("Executing merge...") # Use check=False as conflicts are expected failures merge_result = self.log_and_execute( merge_command, working_directory, check=False ) # Analyze merge result stdout_log = merge_result.stdout.strip() if merge_result.stdout else "" stderr_log = merge_result.stderr.strip() if merge_result.stderr else "" if merge_result.returncode == 0: # Success or already up-to-date if "already up to date" in stdout_log.lower(): self.logger.info("Repository already up-to-date.") else: self.logger.info("Merge successful.") else: # Merge failed, likely conflicts output_lower = (stderr_log + stdout_log).lower() if "conflict" in output_lower: conflict_msg = ( f"Merge conflict occurred. Resolve manually in " f"'{working_directory}' and commit." ) self.logger.error(conflict_msg) # Raise specific error for caller to handle raise GitCommandError( conflict_msg, command=merge_command, stderr=merge_result.stderr ) else: # Other merge error error_msg = f"Merge failed (code {merge_result.returncode})." self.logger.error(error_msg) raise GitCommandError( error_msg, command=merge_command, stderr=merge_result.stderr ) except (GitCommandError, ValueError) as e: self.logger.error(f"Fetch/merge error for '{working_directory}': {e}") raise except Exception as e: self.logger.exception(f"Unexpected fetch/merge error: {e}") raise GitCommandError(f"Unexpected fetch/merge error: {e}") from e # --- Commit and Status --- def git_commit(self, working_directory, message): """Stages all changes and commits them with the given message.""" self.logger.info( f"Attempting commit in '{working_directory}' with msg: '{message}'" ) if not message: # Disallow empty commit messages programmatically raise ValueError("Commit message cannot be empty.") try: # 1. Stage all changes using 'git add .' add_command = ["git", "add", "."] self.logger.debug("Staging all changes...") # Use check=True to ensure staging succeeds self.log_and_execute(add_command, working_directory, check=True) self.logger.debug("Staging successful.") # 2. Commit staged changes commit_command = ["git", "commit", "-m", message] self.logger.debug("Attempting commit...") # Use check=False to handle 'nothing to commit' case gracefully result = self.log_and_execute( commit_command, working_directory, check=False ) # Analyze commit result stdout_lower = result.stdout.lower() if result.stdout else "" stderr_lower = result.stderr.lower() if result.stderr else "" if result.returncode == 0: # Standard successful commit self.logger.info("Commit successful.") return True # Check for various 'nothing to commit' messages elif ( "nothing to commit" in stdout_lower or "no changes added to commit" in stdout_lower or "nothing added to commit" in stdout_lower or (result.returncode == 1 and not stderr_lower and not stdout_lower) ): # Common non-error case: repo was clean or add didn't stage anything new self.logger.info("No changes were available to commit.") return False # Indicate no commit was made else: # An unexpected error occurred during the commit error_msg = ( f"Commit cmd failed unexpectedly (code {result.returncode})." ) # Details likely logged by log_and_execute already raise GitCommandError( error_msg, command=commit_command, stderr=result.stderr ) except (GitCommandError, ValueError) as e: # Catch errors from staging or commit steps, or validation self.logger.error(f"Error during staging or commit: {e}") raise # Re-raise the specific error except Exception as e: # Catch any other unexpected error self.logger.exception(f"Unexpected error during commit process: {e}") raise GitCommandError(f"Unexpected commit error: {e}") from e def git_status_has_changes(self, working_directory): """Checks if the Git repository has uncommitted changes.""" self.logger.debug(f"Checking Git status in '{working_directory}'...") try: # Use '--porcelain' for reliable script parsing status_command = ["git", "status", "--porcelain"] # Use check=True to ensure the status command itself runs correctly result = self.log_and_execute(status_command, working_directory, check=True) # If porcelain output is non-empty, there are changes has_changes = bool(result.stdout.strip()) self.logger.debug(f"Status check complete. Has changes: {has_changes}") return has_changes except (GitCommandError, ValueError) as e: self.logger.error(f"Status check error: {e}") raise # Re-raise specific error except Exception as e: self.logger.exception(f"Unexpected status check error: {e}") raise GitCommandError(f"Unexpected status error: {e}") from e # --- Tag Management --- def list_tags(self, working_directory): """Lists tags with subjects, sorted by creation date (desc).""" self.logger.info(f"Listing tags with subjects in '{working_directory}'...") # Format: TagnameSubjectLine fmt = "%(refname:short)%09%(contents:subject)" cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"] tags_data = [] # List to store (name, subject) tuples try: result = self.log_and_execute(cmd, working_directory, check=True) for line in result.stdout.splitlines(): line_strip = line.strip() if line_strip: # Split only on the first tab to handle subjects with tabs parts = line_strip.split("\t", 1) name = parts[0].strip() # Provide default if subject is missing subject = parts[1].strip() if len(parts) > 1 else "(No subject)" tags_data.append((name, subject)) self.logger.info(f"Found {len(tags_data)} tags.") self.logger.debug(f"Tags found: {tags_data}") return tags_data except (GitCommandError, ValueError) as e: # Log error but return empty list for graceful GUI handling self.logger.error(f"Error listing tags: {e}") return [] except Exception as e: # Log unexpected errors self.logger.exception(f"Unexpected error listing tags: {e}") return [] def create_tag(self, working_directory, tag_name, message): """Creates a new annotated Git tag.""" self.logger.info(f"Creating tag '{tag_name}' in '{working_directory}'") # Validate inputs if not tag_name: raise ValueError("Tag name cannot be empty.") if not message: raise ValueError("Tag message cannot be empty.") # Validate tag name format using regex pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$))[^ \t\n\r\f\v~^:?*[\\]+(?::\t # Example: .gitignore:5:*.log logs/latest.log output_line = result.stdout.strip() if output_line and '\t' in output_line: rule_part = output_line.split('\t', 1)[0] # Extract pattern (part after the second colon) parts = rule_part.split(':', 2) if len(parts) == 3: pattern = parts[2] self.logger.debug(f"Path '{path_to_check}' matched rule: '{pattern}'") return pattern else: self.logger.warning(f"Could not parse pattern from check-ignore output: {output_line}") return None # Indicate parsing failure rather than no match else: self.logger.warning(f"Unexpected output format from check-ignore -v: {output_line}") return None # Indicate unexpected output elif result.returncode == 1: # Exit code 1 means the path is NOT ignored self.logger.debug(f"Path '{path_to_check}' is not ignored by any rule.") return None else: # Other non-zero exit codes (e.g., 128) indicate an error error_msg = f"git check-ignore -v failed with exit code {result.returncode}" self.logger.error(f"{error_msg}. Stderr: {result.stderr}") raise GitCommandError(error_msg, command=command, stderr=result.stderr) except (GitCommandError, ValueError) as e: self.logger.error(f"Failed get_matching_gitignore_rule for '{path_to_check}': {e}") raise except Exception as e: self.logger.exception(f"Unexpected error in get_matching_gitignore_rule: {e}") raise GitCommandError(f"Unexpected check-ignore -v error: {e}", command=command) from e def get_status_short(self, working_directory): """ Gets the repository status in short format (-z for null termination). Returns: list: A list of strings, each representing a changed file and its status (e.g., " M filename", "?? newfile"). Returns empty list on error. """ self.logger.debug(f"Getting short status for '{working_directory}'") # -z null terminates filenames, safer for paths with spaces/special chars # --ignored=no : Non mostrare file ignorati (di solito default, ma esplicito) command = ["git", "status", "--short", "-z", "--ignored=no"] try: # L'output di status può essere utile, ma non enorme. Logghiamolo a DEBUG. result = self.log_and_execute(command, working_directory, check=True, log_output_level=logging.DEBUG) # Split by null terminator and filter out empty strings status_lines = [line for line in result.stdout.split('\0') if line] self.logger.info(f"Git status check returned {len(status_lines)} changed/untracked items.") return status_lines except (GitCommandError, ValueError) as e: self.logger.error(f"Failed to get git status: {e}") return [] # Ritorna lista vuota in caso di errore except Exception as e: self.logger.exception(f"Unexpected error getting git status: {e}") return [] def get_file_content_from_ref(self, working_directory, file_path, ref="HEAD"): """ Retrieves the content of a file from a specific Git reference (e.g., HEAD, index). Args: working_directory (str): Path to the Git repository. file_path (str): Relative path to the file within the repo. ref (str): The Git reference (e.g., "HEAD", "master", commit hash, or ":" for index). Returns: str or None: The file content as a string, or None if the file doesn't exist in that ref or an error occurs. """ # Normalizza i separatori per Git (usa sempre '/') git_style_path = file_path.replace(os.path.sep, '/') ref_prefix = f"{ref}:" if ref else ":" # Usa ":" per l'index se ref è vuoto o None # Costruisci l'argomento ref:path ref_path_arg = f"{ref_prefix}{git_style_path}" self.logger.debug(f"Getting file content for '{ref_path_arg}' in '{working_directory}'") command = ["git", "show", ref_path_arg] try: # check=False perché un file non trovato in HEAD/index non è un errore fatale, # significa solo che è nuovo o cancellato. L'output va a DEBUG. result = self.log_and_execute(command, working_directory, check=False, log_output_level=logging.DEBUG) if result.returncode == 0: # Successo, ritorna il contenuto (stdout) return result.stdout elif result.returncode == 128 and ("exists on disk, but not in" in result.stderr or \ "does not exist in" in result.stderr or \ "did not match any file" in result.stderr): # Codice 128 con errore specifico -> file non trovato nel ref self.logger.debug(f"File '{git_style_path}' not found in ref '{ref}'.") return None # Ritorna None per indicare file non esistente nel ref else: # Altro errore di git show self.logger.error(f"git show command failed for '{ref_path_arg}' with code {result.returncode}. Stderr: {result.stderr}") return None # Ritorna None per indicare errore generico except (GitCommandError, ValueError) as e: # Questo catturerebbe errori nell'esecuzione del comando stesso (raro se check=False) self.logger.error(f"Error executing git show for '{ref_path_arg}': {e}") return None except Exception as e: self.logger.exception(f"Unexpected error in get_file_content_from_ref for '{ref_path_arg}': {e}") return None