# --- FILE: gitsync_tool/commands/git_commands.py --- import os import subprocess import logging # Usato solo per i livelli di logging (es. logging.INFO) import re from typing import Tuple, Dict, List, Optional, Union, Any # Aggiunto Union from gitutility.logging_setup import log_handler # --- Custom Exception Definition --- class GitCommandError(Exception): """ Custom exception for handling Git command execution errors. Includes the original command and stderr details if available. """ def __init__( self, message: str, command: Optional[List[str]] = None, stderr: Optional[str] = None, ): super().__init__(str(message)) self.command: Optional[List[str]] = command self.stderr: str = stderr if stderr is not None else "" def __str__(self) -> str: base_message: str = super().__str__() details: List[str] = [] if self.command: safe_command_parts: List[str] = [str(part) for part in self.command] command_str: str = " ".join(safe_command_parts) details.append(f"Command: '{command_str}'") stderr_str: str = self.stderr.strip() if stderr_str: max_stderr_len = 150 truncated_stderr = ( (stderr_str[:max_stderr_len] + "...") if len(stderr_str) > max_stderr_len else stderr_str ) details.append(f"Stderr: {truncated_stderr}") if details: details_str: str = "; ".join(details) return f"{base_message} ({details_str})" else: return base_message # --- Main Git Commands Class --- class GitCommands: """ Manages Git command execution, logging (via log_handler), error handling, and parsing of command output. """ def __init__( self, logger_ignored: Optional[Any] = None ): # Accetta argomento ma lo ignora """Initializes the GitCommands class.""" # self.logger non è più usato log_handler.log_debug("GitCommands initialized.", func_name="__init__") def log_and_execute( self, command: List[str], working_directory: str, check: bool = True, log_output_level: int = logging.INFO, capture: bool = True, hide_console: bool = True, timeout_seconds: int = 3600, ) -> subprocess.CompletedProcess: """ Executes a shell command, logs details, handles errors, with options for capturing output and hiding the console window. """ func_name: str = "log_and_execute" safe_command_parts: List[str] = [str(part) for part in command] command_str: str = " ".join(safe_command_parts) log_handler.log_debug( f"Executing in '{working_directory}': {command_str} " f"(Capture={capture}, HideConsole={hide_console}, Timeout={timeout_seconds})", func_name=func_name, ) if not working_directory: msg: str = "Working directory cannot be None or empty." log_handler.log_error(msg, func_name=func_name) raise ValueError(msg) if working_directory == ".": effective_cwd: str = os.getcwd() else: effective_cwd = os.path.abspath(working_directory) if not os.path.isdir(effective_cwd): msg = f"Working directory does not exist or is not a directory: {effective_cwd}" log_handler.log_error(msg, func_name=func_name) raise GitCommandError(msg, command=safe_command_parts) try: startupinfo: Optional[subprocess.STARTUPINFO] = None creationflags: int = 0 # Prepare environment to prevent interactive prompts when console is hidden custom_env = os.environ.copy() if hide_console: custom_env["GIT_TERMINAL_PROMPT"] = "0" if hide_console and os.name == "nt": startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE elif not hide_console and os.name == "nt": creationflags = subprocess.CREATE_NEW_CONSOLE result: subprocess.CompletedProcess = subprocess.run( safe_command_parts, cwd=effective_cwd, capture_output=capture, text=True, check=False, encoding="utf-8", errors="replace", timeout=timeout_seconds, startupinfo=startupinfo, creationflags=creationflags, env=custom_env, # Use the custom environment ) log_handler.log_debug( f"Command '{command_str}' finished. RC={result.returncode}", func_name=func_name, ) if capture and (result.returncode == 0 or not check): stdout_repr = repr(result.stdout) if result.stdout else "" stderr_repr = repr(result.stderr) if result.stderr else "" log_handler.log_debug( f"Command RC={result.returncode}. RAW Output:\n" f"--- stdout (repr) ---\n{stdout_repr}\n" f"--- stderr (repr) ---\n{stderr_repr}\n" f"--- End RAW Output ---", func_name=func_name, ) if log_output_level > logging.DEBUG: stdout_log_brief = ( result.stdout.strip()[:200] + "..." if result.stdout and len(result.stdout) > 200 else (result.stdout.strip() if result.stdout else "") ) stderr_log_brief = ( result.stderr.strip()[:200] + "..." if result.stderr and len(result.stderr) > 200 else (result.stderr.strip() if result.stderr else "") ) log_handler.log_message( log_output_level, f"Command RC={result.returncode}. Output(brief): stdout='{stdout_log_brief}', stderr='{stderr_log_brief}'", func_name=func_name, ) if check and result.returncode != 0: raise subprocess.CalledProcessError( returncode=result.returncode, cmd=safe_command_parts, output=result.stdout if capture else None, stderr=result.stderr if capture else None, ) return result except subprocess.TimeoutExpired as e: log_handler.log_error( f"Command timed out after {timeout_seconds}s: {command_str}", func_name=func_name, ) stderr_out = ( repr(e.stderr) if capture and e.stderr else "" ) stdout_out = ( repr(e.stdout) if capture and e.stdout else "" ) log_handler.log_error(f"Timeout stdout: {stdout_out}", func_name=func_name) log_handler.log_error(f"Timeout stderr: {stderr_out}", func_name=func_name) raise GitCommandError( f"Timeout after {timeout_seconds}s.", command=safe_command_parts, stderr=e.stderr if capture else None, ) from e except subprocess.CalledProcessError as e: stderr_repr = ( repr(e.stderr) if capture and e.stderr else "" ) stdout_repr = ( repr(e.stdout) if capture and e.stdout else "" ) err_msg: str = ( f"Command failed (RC {e.returncode}) in '{effective_cwd}'.\n" f"CMD: {command_str}\nRAW_STDERR: {stderr_repr}\nRAW_STDOUT: {stdout_repr}" ) log_handler.log_error(err_msg, func_name=func_name) raise GitCommandError( f"Git command failed in '{effective_cwd}'. RC: {e.returncode}", command=safe_command_parts, stderr=e.stderr if capture else None, ) from e except FileNotFoundError as e: error_msg: str = ( f"Command '{safe_command_parts[0]}' not found. Is Git installed and in your system's PATH?" ) log_handler.log_error( f"FileNotFoundError for command: {safe_command_parts[0]} - {e}", func_name=func_name, ) raise GitCommandError(error_msg, command=safe_command_parts) from e except PermissionError as e: error_msg: str = ( f"Permission denied executing command in '{effective_cwd}'." ) log_handler.log_error( f"PermissionError executing in '{effective_cwd}': {e}", func_name=func_name, ) raise GitCommandError( error_msg, command=safe_command_parts, stderr=str(e) ) from e except Exception as e: log_handler.log_exception( f"Unexpected Exception executing command {command_str}: {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected execution error: {e}", command=safe_command_parts ) from e # --- Core Repo Operations --- def prepare_svn_for_git(self, working_directory: str) -> None: """ Initializes a Git repository if needed and ensures .svn is in .gitignore. Raises errors on failure. """ func_name: str = "prepare_svn_for_git" log_handler.log_info( f"Preparing directory for Git: '{working_directory}'", func_name=func_name ) # Basic path validation if not working_directory or not os.path.isdir(working_directory): raise ValueError(f"Invalid target directory: {working_directory}") gitignore_path: str = os.path.join(working_directory, ".gitignore") git_dir_path: str = os.path.join(working_directory, ".git") svn_ignore_pattern: str = ".svn" # Pattern to ignore # 1. Initialize Git Repo if it doesn't exist if not os.path.exists(git_dir_path): log_handler.log_info( "No existing Git repo found. Initializing...", func_name=func_name ) try: init_cmd: List[str] = ["git", "init"] # Use check=True as init failure is critical here self.log_and_execute(init_cmd, working_directory, check=True) log_handler.log_info( "Git repository initialized successfully.", func_name=func_name ) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Failed to initialize Git repository: {e}", func_name=func_name ) raise # Re-raise critical error else: log_handler.log_info( "Existing Git repository found. Skipping initialization.", func_name=func_name, ) # 2. Check and Update .gitignore log_handler.log_debug( f"Checking/updating .gitignore file: {gitignore_path}", func_name=func_name ) try: needs_write: bool = False current_content: str = "" new_content_to_append: str = "" if not os.path.exists(gitignore_path): # File doesn't exist, create it with the pattern log_handler.log_info( "'.gitignore' not found. Creating file with '.svn' entry.", func_name=func_name, ) new_content_to_append = f"{svn_ignore_pattern}\n" needs_write = True else: # File exists, read its content try: with open(gitignore_path, "r", encoding="utf-8") as f: lines: List[str] = f.readlines() current_content = "".join(lines) # Store full original content # Check if pattern (exactly or starting a dir) is already present is_ignored: bool = any( line.strip() == svn_ignore_pattern or line.strip().startswith(svn_ignore_pattern + "/") for line in lines ) if not is_ignored: log_handler.log_info( f"Pattern '{svn_ignore_pattern}' not found in .gitignore. Appending...", func_name=func_name, ) # Add newline before appending if file doesn't end with one prefix = ( "\n" if current_content and not current_content.endswith("\n") else "" ) new_content_to_append = f"{prefix}{svn_ignore_pattern}\n" needs_write = True else: log_handler.log_info( f"Pattern '{svn_ignore_pattern}' already present in .gitignore.", func_name=func_name, ) except IOError as read_err: log_handler.log_error( f"Could not read existing .gitignore: {read_err}", func_name=func_name, ) raise IOError( f"Failed to read .gitignore: {read_err}" ) from read_err # Write to file if necessary if needs_write: # Use 'a' (append) mode if adding to existing content, 'w' (write) if creating new write_mode = "a" if os.path.exists(gitignore_path) else "w" log_handler.log_debug( f"Writing to {gitignore_path} (Mode: {write_mode})", func_name=func_name, ) try: # Ensure consistent newline handling with open( gitignore_path, write_mode, encoding="utf-8", newline="\n" ) as f: f.write(new_content_to_append) log_handler.log_info( f"Successfully updated '{gitignore_path}'.", func_name=func_name ) except IOError as write_err: log_handler.log_error( f"Error writing to .gitignore: {write_err}", func_name=func_name ) raise IOError( f"Failed to write .gitignore: {write_err}" ) from write_err except IOError as io_err: # Catch IOErrors from reading/writing and re-raise log_handler.log_error( f"I/O error processing .gitignore: {io_err}", func_name=func_name ) raise io_err except Exception as e: # Catch unexpected errors during gitignore handling log_handler.log_exception( f"Unexpected error managing .gitignore: {e}", func_name=func_name ) raise GitCommandError(f"Unexpected .gitignore error: {e}") from e log_handler.log_info( f"Directory preparation completed successfully for '{working_directory}'.", func_name=func_name, ) def create_git_bundle(self, working_directory: str, bundle_path: str) -> None: """ Creates a Git bundle file containing all refs. Raises error on failure. Note: Git may refuse to create an empty bundle if the repo has no refs. """ func_name: str = "create_git_bundle" # Normalize path for Git command (use forward slashes) norm_path: str = os.path.normpath(bundle_path) git_path: str = norm_path.replace("\\", "/") log_handler.log_info( f"Attempting to create Git bundle file: {git_path}", func_name=func_name ) log_handler.log_debug( f" Source Repository: {working_directory}", func_name=func_name ) # Command: git bundle create --all command: List[str] = ["git", "bundle", "create", git_path, "--all"] try: # Execute command, use check=False to analyze output/stderr manually result: subprocess.CompletedProcess = self.log_and_execute( command, working_directory, check=False, log_output_level=logging.DEBUG ) # Check result if result.returncode != 0: stderr_lower: str = result.stderr.lower() if result.stderr else "" # Check specific Git messages for empty repo case if ( "refusing to create empty bundle" in stderr_lower or "does not contain any references" in stderr_lower ): # This is not a critical error, but the bundle wasn't created log_handler.log_warning( f"Bundle creation skipped by Git: Repository '{os.path.basename(working_directory)}' is empty or has no references.", func_name=func_name, ) # No exception raised, but caller should check if file exists else: # Other Git error during bundle creation err_msg: str = ( f"Git bundle command failed (RC {result.returncode})." ) log_handler.log_error( f"{err_msg} Stderr: {result.stderr.strip()}", func_name=func_name, ) raise GitCommandError( err_msg, command=command, stderr=result.stderr ) else: # Command succeeded (RC=0), verify file existence and size if not os.path.exists(norm_path) or os.path.getsize(norm_path) == 0: log_handler.log_warning( f"Git bundle command succeeded, but the output file '{norm_path}' is missing or empty.", func_name=func_name, ) # Caller should handle this (e.g., by removing the empty file) else: log_handler.log_info( f"Bundle created successfully: '{norm_path}'.", func_name=func_name, ) except GitCommandError: raise # Re-raise GitCommandError directly except Exception as e: log_handler.log_exception( f"Unexpected error during bundle creation: {e}", func_name=func_name ) raise GitCommandError( f"Unexpected bundle error: {e}", command=command ) from e def fetch_from_git_bundle(self, working_directory: str, bundle_path: str) -> None: """ Fetches updates from a bundle file and merges FETCH_HEAD into the current branch. Raises errors on failure (e.g., file not found, fetch/merge errors, conflicts). """ func_name: str = "fetch_from_git_bundle" # Validate bundle file existence first if not os.path.isfile(bundle_path): raise FileNotFoundError(f"Bundle file not found at path: {bundle_path}") # Normalize path for Git norm_path: str = os.path.normpath(bundle_path).replace("\\", "/") log_handler.log_info( f"Fetching from bundle '{norm_path}' into repository '{working_directory}'", func_name=func_name, ) # 1. Fetch from the bundle fetch_cmd: List[str] = [ "git", "fetch", norm_path, "--verbose", ] # Use verbose for more log info try: log_handler.log_debug( "Executing git fetch from bundle...", func_name=func_name ) # Use check=True for fetch, as fetch failure is usually critical self.log_and_execute( fetch_cmd, working_directory, check=True, log_output_level=logging.INFO ) log_handler.log_info( "Fetch from bundle completed successfully.", func_name=func_name ) except (GitCommandError, ValueError) as fetch_error: log_handler.log_error( f"Failed to fetch from bundle '{norm_path}': {fetch_error}", func_name=func_name, ) raise fetch_error # Re-raise fetch errors # 2. Merge FETCH_HEAD (the fetched commits) into the current branch # Use --no-ff to always create a merge commit for clarity, --no-edit to avoid editor merge_cmd: List[str] = ["git", "merge", "FETCH_HEAD", "--no-ff", "--no-edit"] try: log_handler.log_debug( "Executing git merge FETCH_HEAD...", func_name=func_name ) # Use check=False for merge to detect conflicts (RC=1) merge_res: subprocess.CompletedProcess = self.log_and_execute( merge_cmd, working_directory, check=False, log_output_level=logging.INFO ) # Analyze merge result out_log: str = merge_res.stdout.strip() if merge_res.stdout else "" err_log: str = merge_res.stderr.strip() if merge_res.stderr else "" combined_log: str = out_log + err_log combined_lower: str = combined_log.lower() if merge_res.returncode == 0: # Merge succeeded if "already up to date" in combined_lower: log_handler.log_info( "Repository already up-to-date with fetched bundle content.", func_name=func_name, ) else: log_handler.log_info( "Merge of fetched bundle content successful.", func_name=func_name, ) # Log stdout/stderr from merge if needed (at DEBUG level) log_handler.log_debug( f"Merge output:\n{combined_log}", func_name=func_name ) else: # Merge failed (RC != 0) # Check for merge conflict specifically (usually RC=1) if ( "conflict" in combined_lower or "automatic merge failed" in combined_lower ): msg: str = ( f"Merge conflict occurred after fetching from bundle. Please resolve conflicts manually in '{working_directory}' and then commit the result." ) log_handler.log_error(msg, func_name=func_name) # Raise specific error for conflicts raise GitCommandError( msg, command=merge_cmd, stderr=merge_res.stderr ) else: # Other merge error msg = f"Git merge command failed after fetch (RC {merge_res.returncode})." log_handler.log_error( f"{msg} Stderr: {err_log}", func_name=func_name ) raise GitCommandError( msg, command=merge_cmd, stderr=merge_res.stderr ) except GitCommandError as merge_error: # Catch and re-raise merge errors (including conflicts) log_handler.log_error( f"Error merging FETCH_HEAD from bundle: {merge_error}", func_name=func_name, ) raise merge_error except Exception as e: # Catch unexpected errors during merge log_handler.log_exception( f"Unexpected error merging FETCH_HEAD from bundle: {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected merge error: {e}", command=merge_cmd ) from e # --- Commit and Status --- def git_commit(self, working_directory: str, message: str, stage_all_first: bool = True) -> bool: """ Commits changes in the repository. Args: working_directory (str): Path to the repository. message (str): The commit message. stage_all_first (bool): If True, runs 'git add .' before committing. Set to False if changes are already staged. Returns: bool: True if a commit was made, False otherwise. """ func_name = "git_commit" log_handler.log_info( f"Attempting commit in '{working_directory}' msg: '{message[:50]}...'", func_name=func_name, ) if not message or message.isspace(): raise ValueError("Commit message cannot be empty.") try: if stage_all_first: add_cmd = ["git", "add", "."] log_handler.log_debug( "Staging all changes ('git add .')...", func_name=func_name ) self.log_and_execute(add_cmd, working_directory, check=True) log_handler.log_debug("Staging successful.", func_name=func_name) else: log_handler.log_debug( "Skipping 'git add .', assuming changes are already staged.", func_name=func_name ) commit_cmd = ["git", "commit", "-m", message] log_handler.log_debug("Attempting commit...", func_name=func_name) result = self.log_and_execute(commit_cmd, working_directory, check=False) out_low = (result.stdout or "").lower() err_low = (result.stderr or "").lower() combined_low = out_low + err_low if result.returncode == 0: # Se c'è un output e non è un messaggio di "nessun cambiamento", allora è un successo. if "nothing to commit" not in combined_low: log_handler.log_info("Commit successful.", func_name=func_name) return True else: # Questo caso può accadere se `stage_all_first` è False e non c'era nulla in staging log_handler.log_info( "No changes were available to commit.", func_name=func_name ) return False elif ( "nothing to commit" in combined_low or "no changes added to commit" in combined_low ): log_handler.log_info( "No changes available to commit.", func_name=func_name ) return False else: msg = f"Commit command failed unexpectedly (RC {result.returncode})." log_handler.log_error( f"{msg} Stderr: {result.stderr.strip()}", func_name=func_name ) raise GitCommandError(msg, command=commit_cmd, stderr=result.stderr) except (GitCommandError, ValueError) as e: log_handler.log_error( f"Error during staging/commit: {e}", func_name=func_name ) raise e def git_status_has_changes(self, working_directory: str) -> bool: func_name = "git_status_has_changes" log_handler.log_debug( f"Checking status for changes in '{working_directory}'...", func_name=func_name, ) try: cmd = ["git", "status", "--porcelain=v1", "-unormal"] result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) has_changes = bool(result.stdout.strip()) log_handler.log_debug( f"Status check complete. Has changes: {has_changes}", func_name=func_name, ) return has_changes except GitCommandError as e: log_handler.log_error( f"Git status check command failed: {e}", func_name=func_name ) raise e def git_rm(self, working_directory: str, path: str, force: bool = False) -> None: """ Removes a file or directory from the Git index. Args: working_directory (str): Path to the repository. path (str): The relative path to the item to remove. force (bool): If True, uses the '-f' flag to force removal. """ func_name = "git_rm" action = "Forcibly removing" if force else "Removing" log_handler.log_info( f"{action} path from index: '{path}' in '{working_directory}'", func_name=func_name ) cmd = ["git", "rm"] if force: cmd.append("-f") cmd.append(path) try: self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info(f"Path '{path}' removed from index successfully.", func_name=func_name) except GitCommandError as e: log_handler.log_error(f"Failed to 'git rm {path}': {e}", func_name=func_name) raise # --- Tag Management --- def list_tags(self, working_directory: str) -> List[Tuple[str, str]]: func_name = "list_tags" log_handler.log_info( f"Listing tags in '{working_directory}'...", func_name=func_name ) fmt = "%(refname:short)%09%(contents:subject)" cmd = ["git", "tag", "--list", f"--format={fmt}", "--sort=-creatordate"] tags = [] try: result = self.log_and_execute(cmd, working_directory, check=True) for line in result.stdout.splitlines(): line_strip = line.strip() if line_strip: parts = line_strip.split("\t", 1) name = parts[0].strip() subject = ( parts[1].strip() if len(parts) > 1 and parts[1].strip() else "(No subject/Lightweight)" ) tags.append((name, subject)) log_handler.log_info(f"Found {len(tags)} tags.", func_name=func_name) log_handler.log_debug(f"Tags found: {tags}", func_name=func_name) return tags except GitCommandError as e: log_handler.log_error(f"Error listing tags: {e}", func_name=func_name) return [] except Exception as e: log_handler.log_exception( f"Unexpected error listing tags: {e}", func_name=func_name ) return [] def create_tag(self, working_directory: str, tag_name: str, message: str) -> None: func_name = "create_tag" log_handler.log_info( f"Creating tag '{tag_name}' in '{working_directory}'", func_name=func_name ) if not tag_name or tag_name.isspace(): raise ValueError("Tag name cannot be empty.") if not message or message.isspace(): raise ValueError("Tag message cannot be empty for annotated tag.") pattern = r"^(?![./]|.*([./]{2,}|[.]$|\.lock$|@\{|\\\\))[^ \t\n\r\f\v~^:?*[\\]+(? bool: func_name = "checkout_tag" log_handler.log_info( f"Checking out tag '{tag_name}' in '{working_directory}'...", func_name=func_name, ) if not tag_name or tag_name.isspace(): raise ValueError("Tag name cannot be empty.") cmd = ["git", "checkout", tag_name] try: result = self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info( f"Successfully checked out tag '{tag_name}'.", func_name=func_name ) output = (result.stderr or "" + result.stdout or "").lower() if "detached head" in output: log_handler.log_warning( "Repo is now in 'detached HEAD' state.", func_name=func_name ) return True except GitCommandError as e: stderr_low = (e.stderr or "").lower() not_found = [ "did not match any file(s)", f"pathspec '{tag_name.lower()}' did not match", "reference is not a tree", ] if any(p in stderr_low for p in not_found): msg = f"Tag '{tag_name}' not found or invalid." log_handler.log_error(msg, func_name=func_name) raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e else: log_handler.log_error( f"Failed checkout tag '{tag_name}': {e}", func_name=func_name ) raise e # --- Branch Management --- def list_branches(self, working_directory: str) -> Tuple[List[str], Optional[str]]: func_name = "list_branches" log_handler.log_info( f"Listing local branches in '{working_directory}'...", func_name=func_name ) fmt = "%(HEAD)%(refname:short)" cmd = ["git", "branch", "--list", f"--format={fmt}"] branches = [] current = None try: result = self.log_and_execute(cmd, working_directory, check=True) for line in result.stdout.splitlines(): line_strip = line.strip() if line_strip: is_curr = line_strip.startswith("*") name = line_strip.lstrip("* ").strip() branches.append(name) if is_curr: current = name branches.sort() curr_disp = current if current else "None (Detached?)" log_handler.log_info( f"Found {len(branches)} branches. Current: {curr_disp}", func_name=func_name, ) log_handler.log_debug(f"Branches: {branches}", func_name=func_name) return branches, current except GitCommandError as e: log_handler.log_error(f"Error listing branches: {e}", func_name=func_name) return [], None except Exception as e: log_handler.log_exception( f"Unexpected error listing branches: {e}", func_name=func_name ) return [], None def checkout_branch(self, working_directory: str, branch_name: str) -> bool: func_name = "checkout_branch" log_handler.log_info( f"Checking out branch '{branch_name}' in '{working_directory}'...", func_name=func_name, ) if not branch_name or branch_name.isspace(): raise ValueError("Branch name cannot be empty.") cmd = ["git", "checkout", branch_name] try: self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info( f"Successfully checked out branch '{branch_name}'.", func_name=func_name ) return True except GitCommandError as e: stderr_low = (e.stderr or "").lower() not_found = [ "did not match any file(s)", f"pathspec '{branch_name.lower()}' did not match", "is not a commit and a branch", ] if any(p in stderr_low for p in not_found): msg = f"Branch '{branch_name}' not found or invalid." log_handler.log_error(msg, func_name=func_name) raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e else: log_handler.log_error( f"Failed checkout branch '{branch_name}': {e}", func_name=func_name ) raise e def create_branch(self, working_directory: str, new_branch_name: str) -> bool: func_name = "create_branch" log_handler.log_info( f"Creating branch '{new_branch_name}' in '{working_directory}'...", func_name=func_name, ) if not new_branch_name or new_branch_name.isspace(): raise ValueError("Branch name cannot be empty.") pattern = ( r"^(?!\.| |.*[/.]\.|\.|.*\\|.*@\{|.*[/]$|.*\.\.)[^ \t\n\r\f\v~^:?*[\\]+$" ) if not re.match(pattern, new_branch_name) or new_branch_name.lower() == "head": raise ValueError(f"Invalid branch name format: '{new_branch_name}'.") cmd = ["git", "branch", new_branch_name] try: self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info( f"Branch '{new_branch_name}' created.", func_name=func_name ) return True except GitCommandError as e: stderr_low = (e.stderr or "").lower() if "already exists" in stderr_low: msg = f"Branch '{new_branch_name}' already exists." log_handler.log_error(msg, func_name=func_name) raise GitCommandError(msg, command=cmd, stderr=e.stderr) from e else: log_handler.log_error( f"Failed create branch '{new_branch_name}': {e}", func_name=func_name, ) raise e # --- History / Log --- def get_commit_log( self, working_directory: str, max_count: int = 200, branch: Optional[str] = None ) -> List[str]: func_name = "get_commit_log" scope = f"'{branch}'" if branch else "all" log_handler.log_info( f"Retrieving commit log for {scope} in '{working_directory}' (max {max_count})...", func_name=func_name, ) fmt = "%h %ad | %an |%d %s" date_fmt = "format:%Y-%m-%d %H:%M" cmd = [ "git", "log", f"--max-count={max_count}", f"--pretty=format:{fmt}", f"--date={date_fmt}", "--decorate=short", "--source", ] if branch: cmd.append(branch) else: cmd.append("--all") try: result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) lines = [ line.strip() for line in result.stdout.splitlines() if line.strip() ] log_handler.log_info( f"Retrieved {len(lines)} log entries for {scope}.", func_name=func_name ) return lines except GitCommandError as e: log_handler.log_error( f"Error retrieving commit log for {scope}: {e}", func_name=func_name ) return [] except Exception as e: log_handler.log_exception( f"Unexpected error retrieving commit log for {scope}: {e}", func_name=func_name, ) return [] # --- Clone from Bundle --- def clone_from_bundle(self, bundle_path: str, destination_directory: str) -> bool: func_name = "clone_from_bundle" log_handler.log_info( f"Cloning from '{bundle_path}' into '{destination_directory}'", func_name=func_name, ) if not bundle_path: raise ValueError("Bundle path cannot be empty.") if not destination_directory: raise ValueError("Destination directory cannot be empty.") abs_bundle = os.path.abspath(bundle_path) abs_dest = os.path.abspath(destination_directory) if not os.path.isfile(abs_bundle): msg = f"Bundle file not found: {abs_bundle}" log_handler.log_error(msg, func_name=func_name) raise FileNotFoundError(msg) if os.path.exists(abs_dest): if not os.path.isdir(abs_dest): msg = f"Destination exists but is not a directory: {abs_dest}" log_handler.log_error(msg, func_name=func_name) raise IOError(msg) if os.listdir(abs_dest): msg = f"Destination directory is not empty: {abs_dest}" log_handler.log_error(msg, func_name=func_name) raise IOError(msg) log_handler.log_debug( "Destination directory exists and is empty.", func_name=func_name ) else: log_handler.log_debug( f"Destination directory does not exist. Creating: {abs_dest}", func_name=func_name, ) try: os.makedirs(abs_dest) log_handler.log_info( f"Created destination directory: {abs_dest}", func_name=func_name ) except OSError as e: msg = f"Failed to create destination directory '{abs_dest}': {e}" log_handler.log_error(msg, func_name=func_name) raise IOError(msg) from e cmd = ["git", "clone", abs_bundle, abs_dest, "--progress"] try: self.log_and_execute( cmd, working_directory=".", check=True, log_output_level=logging.INFO ) log_handler.log_info( f"Cloned successfully from bundle into '{abs_dest}'.", func_name=func_name, ) return True except GitCommandError as clone_e: log_handler.log_error( f"Failed to clone from bundle '{os.path.basename(abs_bundle)}': {clone_e}", func_name=func_name, ) if os.path.isdir(abs_dest): try: if not os.listdir(abs_dest): os.rmdir(abs_dest) log_handler.log_info( f"Removed empty destination directory after failed clone: {abs_dest}", func_name=func_name, ) else: log_handler.log_warning( f"Destination directory not empty after failed clone: {abs_dest}", func_name=func_name, ) except OSError as rm_e: log_handler.log_warning( f"Could not remove destination directory '{abs_dest}' after failed clone: {rm_e}", func_name=func_name, ) raise clone_e except Exception as e: log_handler.log_exception( f"Unexpected error cloning from bundle: {e}", func_name=func_name ) raise GitCommandError(f"Unexpected clone error: {e}", command=cmd) from e # --- Gitignore and File Tracking --- def get_tracked_files(self, working_directory: str) -> List[str]: func_name = "get_tracked_files" log_handler.log_debug( f"Getting tracked files in '{working_directory}'", func_name=func_name ) cmd = ["git", "ls-files", "-z"] try: result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) files = [f for f in result.stdout.split("\0") if f] log_handler.log_info( f"Found {len(files)} tracked files.", func_name=func_name ) return files except GitCommandError as e: log_handler.log_error( f"Failed list tracked files: {e}", func_name=func_name ) raise e def check_if_would_be_ignored( self, working_directory: str, path_to_check: str ) -> bool: func_name = "check_if_would_be_ignored" cmd = ["git", "check-ignore", "--quiet", "--no-index", "--", path_to_check] log_handler.log_debug( f"Checking ignore status for: '{path_to_check}'", func_name=func_name ) try: result = self.log_and_execute(cmd, working_directory, check=False) if result.returncode == 0: log_handler.log_debug( f"Path '{path_to_check}' WOULD be ignored.", func_name=func_name ) return True elif result.returncode == 1: log_handler.log_debug( f"Path '{path_to_check}' would NOT be ignored.", func_name=func_name ) return False else: msg = f"check-ignore failed (RC {result.returncode})" log_handler.log_error( f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name ) raise GitCommandError(msg, command=cmd, stderr=result.stderr) except GitCommandError as e: log_handler.log_error( f"Failed check_ignore for '{path_to_check}': {e}", func_name=func_name ) raise e def remove_from_tracking( self, working_directory: str, files_to_untrack: List[str] ) -> bool: func_name = "remove_from_tracking" if not files_to_untrack: log_handler.log_debug("No files provided to untrack.", func_name=func_name) return True num = len(files_to_untrack) log_handler.log_info( f"Removing {num} items from tracking index...", func_name=func_name ) batch_size = 100 succeeded = True for i in range(0, num, batch_size): batch = files_to_untrack[i : i + batch_size] batch_num = i // batch_size + 1 log_handler.log_info( f"Processing untrack batch {batch_num} ({len(batch)} items)...", func_name=func_name, ) log_handler.log_debug( f"Batch {batch_num} items: {batch}", func_name=func_name ) cmd = ["git", "rm", "--cached", "-r", "--ignore-unmatch", "--"] cmd.extend(batch) try: self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) log_handler.log_info( f"Batch {batch_num} untracked successfully.", func_name=func_name ) except GitCommandError as e: log_handler.log_error( f"Failed untrack batch {batch_num}: {e}", func_name=func_name ) succeeded = False raise GitCommandError( f"Failed batch {batch_num}. Error: {e}", command=cmd, stderr=getattr(e, "stderr", None), ) from e log_handler.log_info("All untracking batches completed.", func_name=func_name) return succeeded def get_matching_gitignore_rule( self, working_directory: str, path_to_check: str ) -> Optional[str]: func_name = "get_matching_gitignore_rule" cmd = ["git", "check-ignore", "-v", "--no-index", "--", path_to_check] log_handler.log_debug( f"Getting matching rule for: '{path_to_check}'", func_name=func_name ) try: result = self.log_and_execute(cmd, working_directory, check=False) if result.returncode == 0: line = result.stdout.strip() if line and "\t" in line: rule_part = line.split("\t", 1)[0] parts = rule_part.split(":", 2) if len(parts) == 3: pattern = parts[2] log_handler.log_debug( f"Path '{path_to_check}' matched rule: '{pattern}'", func_name=func_name, ) return pattern else: log_handler.log_warning( f"Could not parse pattern from: {rule_part}", func_name=func_name, ) return None else: log_handler.log_warning( f"Unexpected output from check-ignore -v: {line}", func_name=func_name, ) return None elif result.returncode == 1: log_handler.log_debug( f"Path '{path_to_check}' not ignored.", func_name=func_name ) return None else: msg = f"check-ignore -v failed (RC {result.returncode})" log_handler.log_error( f"{msg}. Stderr: {result.stderr.strip()}", func_name=func_name ) raise GitCommandError(msg, command=cmd, stderr=result.stderr) except GitCommandError as e: log_handler.log_error( f"Failed get_matching_rule for '{path_to_check}': {e}", func_name=func_name, ) raise e def get_status_short(self, working_directory: str) -> List[str]: func_name = "get_status_short" log_handler.log_debug( f"Getting short status for '{working_directory}' (-z)", func_name=func_name ) cmd = ["git", "status", "--short", "-z", "--ignored=no"] try: result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) raw_output = result.stdout log_handler.log_debug( f"Raw stdout length: {len(raw_output)}", func_name=func_name ) log_handler.log_debug( f"Raw stdout repr: {repr(raw_output)}", func_name=func_name ) status_lines = [line for line in raw_output.split("\0") if line] log_handler.log_debug( f"Split resulted in {len(status_lines)} non-empty lines.", func_name=func_name, ) log_handler.log_debug( f"Split lines list: {status_lines}", func_name=func_name ) log_handler.log_info( f"Status check returned {len(status_lines)} items.", func_name=func_name ) return status_lines except GitCommandError as e: log_handler.log_error(f"Failed get status: {e}", func_name=func_name) return [] except Exception as e: log_handler.log_exception( f"Unexpected error getting status: {e}", func_name=func_name ) return [] # --- Remote Interaction Helpers --- def get_file_content_from_ref( self, working_directory: str, file_path: str, ref: str ) -> Optional[str]: # ... (codice invariato) ... func_name = "get_file_content_from_ref" git_file_path = file_path.replace(os.path.sep, "/") git_ref = ref.strip() if ref else "HEAD" if git_ref == ":": ref_path_arg = f":{git_file_path}" else: ref_path_arg = f"{git_ref}:{git_file_path}" log_handler.log_debug( f"Getting content for file='{git_file_path}' at ref='{git_ref}' (using '{ref_path_arg}') in '{working_directory}'", func_name=func_name, ) cmd = ["git", "show", ref_path_arg] try: result = self.log_and_execute( cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) if result.returncode == 0: return result.stdout elif result.returncode == 128 and ( "exists on disk, but not in" in (result.stderr or "") or "does not exist in" in (result.stderr or "") or ( "fatal: Path" in (result.stderr or "") and "does not exist" in (result.stderr or "") ) or "did not match any file(s)" in (result.stderr or "") ): log_handler.log_warning( f"File '{git_file_path}' not found in ref '{git_ref}'.", func_name=func_name, ) return None else: log_handler.log_error( f"git show failed for '{ref_path_arg}' (RC {result.returncode}). Stderr: {(result.stderr or 'N/A').strip()}", func_name=func_name, ) return None except Exception as e: log_handler.log_exception( f"Error executing git show for '{ref_path_arg}': {e}", func_name=func_name, ) return None def add_file(self, working_directory: str, file_path: str, renormalize: bool = False) -> bool: """ Adds a path to the staging area. Args: working_directory (str): Path to the repository. file_path (str): Path to stage (e.g., '.', 'a_file.txt'). renormalize (bool): If True, use '--renormalize' to fix line endings. """ func_name = "add_file" if not file_path or file_path.isspace(): raise ValueError("File path cannot be empty.") log_handler.log_info( f"Adding path to staging: '{file_path}' in '{working_directory}' (renormalize={renormalize})", func_name=func_name, ) cmd = ["git", "add"] if renormalize: cmd.append("--renormalize") cmd.extend(["--", file_path]) try: self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) log_handler.log_info( f"Path '{file_path}' added successfully.", func_name=func_name ) return True except GitCommandError as add_e: log_handler.log_error( f"Failed to add path '{file_path}': {add_e}", func_name=func_name ) stderr = (add_e.stderr or "").lower() if "did not match any files" in stderr: raise GitCommandError( f"Pathspec '{file_path}' did not match any files.", command=cmd, stderr=add_e.stderr, ) from add_e else: raise add_e def get_remotes(self, working_directory: str) -> Dict[str, str]: # ... (codice invariato) ... func_name = "get_remotes" log_handler.log_debug( f"Getting remotes for '{working_directory}'", func_name=func_name ) cmd = ["git", "remote", "-v"] remotes = {} try: result = self.log_and_execute( cmd, working_directory, check=True, log_output_level=logging.DEBUG ) lines = result.stdout.strip().splitlines() for line in lines: parts = line.split() if len(parts) == 3 and parts[2] == "(fetch)": name = parts[0] url = parts[1] remotes[name] = url log_handler.log_info( f"Found {len(remotes)} remotes: {list(remotes.keys())}", func_name=func_name, ) return remotes except GitCommandError as e: is_error = e.stderr and "fatal:" in e.stderr.lower() if not is_error and not remotes: log_handler.log_info("No remotes found.", func_name=func_name) return {} else: log_handler.log_error( f"Failed to get remotes: {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected error getting remotes: {e}", func_name=func_name ) raise GitCommandError(f"Unexpected error getting remotes: {e}") from e def add_remote( self, working_directory: str, remote_name: str, remote_url: str ) -> bool: func_name = "add_remote" log_handler.log_info( f"Adding remote '{remote_name}' -> '{remote_url}'", func_name=func_name ) cmd = ["git", "remote", "add", remote_name, remote_url] try: self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info( f"Remote '{remote_name}' added successfully.", func_name=func_name ) return True except GitCommandError as e: stderr_low = (e.stderr or "").lower() if "already exists" in stderr_low: raise GitCommandError( f"Remote '{remote_name}' already exists.", command=cmd, stderr=e.stderr, ) from e else: log_handler.log_error( f"Failed to add remote '{remote_name}': {e}", func_name=func_name ) raise e except Exception as e: log_handler.log_exception( f"Unexpected error adding remote '{remote_name}': {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected error adding remote: {e}", command=cmd ) from e def set_remote_url( self, working_directory: str, remote_name: str, remote_url: str ) -> bool: func_name = "set_remote_url" log_handler.log_info( f"Setting URL for remote '{remote_name}' to '{remote_url}'", func_name=func_name, ) cmd = ["git", "remote", "set-url", remote_name, remote_url] try: self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info( f"URL for remote '{remote_name}' set successfully.", func_name=func_name ) return True except GitCommandError as e: stderr_low = (e.stderr or "").lower() if "no such remote" in stderr_low: raise GitCommandError( f"Remote '{remote_name}' does not exist, cannot set URL.", command=cmd, stderr=e.stderr, ) from e else: log_handler.log_error( f"Failed to set URL for remote '{remote_name}': {e}", func_name=func_name, ) raise e except Exception as e: log_handler.log_exception( f"Unexpected error setting remote URL for '{remote_name}': {e}", func_name=func_name, ) raise GitCommandError( f"Unexpected error setting remote URL: {e}", command=cmd ) from e # --- Remote Operations (Fetch, Pull, Push, etc.) --- def git_ls_remote( self, working_directory: str, remote_name: str ) -> subprocess.CompletedProcess: func_name = "git_ls_remote" log_handler.log_debug( f"Running ls-remote for '{remote_name}' in '{working_directory}'", func_name=func_name, ) cmd = ["git", "ls-remote", "--exit-code", remote_name] result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) return result def git_fetch_interactive( self, working_directory: str, remote_name: str ) -> subprocess.CompletedProcess: func_name = "git_fetch_interactive" log_handler.log_info( f"Running interactive fetch for '{remote_name}' in '{working_directory}'. User may see a terminal.", func_name=func_name, ) cmd = ["git", "fetch", remote_name] result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=False, hide_console=False, ) return result def git_fetch( self, working_directory: str, remote_name: str, prune: bool = True ) -> subprocess.CompletedProcess: func_name = "git_fetch" log_handler.log_info( f"Fetching from remote '{remote_name}' in '{working_directory}' (Prune={prune})", func_name=func_name, ) cmd = ["git", "fetch", remote_name] if prune: cmd.append("--prune") result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result def git_pull( self, working_directory: str, remote_name: str, branch_name: str ) -> subprocess.CompletedProcess: """ Pulls a specific branch from a remote, allowing for unrelated histories. """ func_name = "git_pull" log_handler.log_info( f"Pulling from remote '{remote_name}' branch '{branch_name}' into current branch in '{working_directory}'", func_name=func_name, ) # Comando esplicito: git pull # Aggiungiamo --allow-unrelated-histories per risolvere i conflitti dopo un reset del repo cmd = ["git", "pull", "--allow-unrelated-histories", remote_name, branch_name] result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result def git_push( self, working_directory: str, remote_name: str, branch_name: str, set_upstream: bool = False, force: bool = False, ) -> subprocess.CompletedProcess: func_name = "git_push" push_options = [] if set_upstream: push_options.append("--set-upstream") log_handler.log_info( f"Pushing branch '{branch_name}' to '{remote_name}' and setting upstream.", func_name=func_name, ) else: log_handler.log_info( f"Pushing branch '{branch_name}' to '{remote_name}'.", func_name=func_name, ) if force: push_options.append("--force") log_handler.log_warning( f"Executing FORCE PUSH for branch '{branch_name}' to '{remote_name}'!", func_name=func_name, ) cmd = ["git", "push"] + push_options + [remote_name, branch_name] result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result def git_push_tags( self, working_directory: str, remote_name: str ) -> subprocess.CompletedProcess: func_name = "git_push_tags" log_handler.log_info( f"Pushing all tags to remote '{remote_name}' from '{working_directory}'", func_name=func_name, ) cmd = ["git", "push", remote_name, "--tags"] result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result # --- Branch Info and Status --- def get_current_branch_name(self, working_directory: str) -> Optional[str]: func_name = "get_current_branch_name" log_handler.log_debug( f"Getting current branch name in '{working_directory}'", func_name=func_name ) cmd = ["git", "symbolic-ref", "--short", "-q", "HEAD"] try: result = self.log_and_execute( cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) if result.returncode == 0 and result.stdout: branch_name = result.stdout.strip() log_handler.log_info( f"Current branch is '{branch_name}'", func_name=func_name ) return branch_name elif result.returncode == 1: log_handler.log_warning( "Currently in detached HEAD state.", func_name=func_name ) return None else: log_handler.log_error( f"Failed to get current branch (RC={result.returncode}). Stderr: {(result.stderr or 'N/A').strip()}", func_name=func_name, ) return None except Exception as e: log_handler.log_exception( f"Unexpected error getting current branch: {e}", func_name=func_name ) return None def get_branch_upstream( self, working_directory: str, branch_name: str ) -> Optional[str]: func_name = "get_branch_upstream" log_handler.log_debug( f"Getting upstream for branch '{branch_name}' in '{working_directory}'", func_name=func_name, ) cmd = ["git", "rev-parse", "--abbrev-ref", f"{branch_name}@{{upstream}}"] try: result = self.log_and_execute( cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) if result.returncode == 0 and result.stdout: upstream_name = result.stdout.strip() remote_cmd = ["git", "config", "--get", f"branch.{branch_name}.remote"] merge_ref_cmd = [ "git", "config", "--get", f"branch.{branch_name}.merge", ] remote_result = self.log_and_execute( remote_cmd, working_directory, check=False, capture=True, hide_console=True, ) merge_ref_result = self.log_and_execute( merge_ref_cmd, working_directory, check=False, capture=True, hide_console=True, ) if remote_result.returncode == 0 and merge_ref_result.returncode == 0: remote = remote_result.stdout.strip() merge_ref = merge_ref_result.stdout.strip() remote_branch_name = merge_ref.split("/")[-1] full_upstream_name = f"{remote}/{remote_branch_name}" log_handler.log_info( f"Upstream for '{branch_name}' is '{full_upstream_name}'", func_name=func_name, ) return full_upstream_name else: log_handler.log_warning( f"Could not determine full upstream name for '{branch_name}'.", func_name=func_name, ) return None elif "no upstream configured" in (result.stderr or "").lower(): log_handler.log_info( f"No upstream configured for branch '{branch_name}'.", func_name=func_name, ) return None else: log_handler.log_error( f"Failed to get upstream for '{branch_name}' (RC={result.returncode}). Stderr: {(result.stderr or 'N/A').strip()}", func_name=func_name, ) return None except Exception as e: log_handler.log_exception( f"Unexpected error getting upstream for '{branch_name}': {e}", func_name=func_name, ) return None def get_ahead_behind_count( self, working_directory: str, local_branch: str, upstream_branch: str ) -> Tuple[Optional[int], Optional[int]]: func_name = "get_ahead_behind_count" log_handler.log_debug( f"Getting ahead/behind count for '{local_branch}' vs '{upstream_branch}' using separate counts.", func_name=func_name, ) ahead_count = None behind_count = None try: ahead_cmd = [ "git", "rev-list", "--count", f"{upstream_branch}..{local_branch}", ] log_handler.log_debug( f"Executing ahead count command: {' '.join(ahead_cmd)}", func_name=func_name, ) ahead_result = self.log_and_execute( ahead_cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) if ahead_result.returncode == 0 and ahead_result.stdout: try: ahead_count = int(ahead_result.stdout.strip()) log_handler.log_debug( f"Ahead count: {ahead_count}", func_name=func_name ) except ValueError: log_handler.log_error( f"Failed to parse ahead count output: '{ahead_result.stdout.strip()}'", func_name=func_name, ) return None, None else: log_handler.log_warning( f"Ahead count command failed (RC={ahead_result.returncode}). Stderr: {(ahead_result.stderr or 'N/A').strip()}", func_name=func_name, ) return None, None behind_cmd = [ "git", "rev-list", "--count", f"{local_branch}..{upstream_branch}", ] log_handler.log_debug( f"Executing behind count command: {' '.join(behind_cmd)}", func_name=func_name, ) behind_result = self.log_and_execute( behind_cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) if behind_result.returncode == 0 and behind_result.stdout: try: behind_count = int(behind_result.stdout.strip()) log_handler.log_debug( f"Behind count: {behind_count}", func_name=func_name ) except ValueError: log_handler.log_error( f"Failed to parse behind count output: '{behind_result.stdout.strip()}'", func_name=func_name, ) return None, None else: log_handler.log_warning( f"Behind count command failed (RC={behind_result.returncode}). Stderr: {(behind_result.stderr or 'N/A').strip()}", func_name=func_name, ) return None, None log_handler.log_info( f"Ahead/Behind for '{local_branch}': Ahead={ahead_count}, Behind={behind_count}", func_name=func_name, ) return ahead_count, behind_count except Exception as e: log_handler.log_exception( f"Unexpected error getting ahead/behind count: {e}", func_name=func_name ) return None, None # --- Clone and Remote Branch Listing --- def git_clone( self, remote_url: str, local_directory_path: str ) -> subprocess.CompletedProcess: func_name = "git_clone" log_handler.log_info( f"Cloning repository from '{remote_url}' into '{local_directory_path}'", func_name=func_name, ) cmd = ["git", "clone", "--progress", remote_url, local_directory_path] # --- MODIFICA PER DEBUG --- # Riduciamo il timeout per forzare un errore invece di un blocco infinito debug_timeout = 20 # secondi log_handler.log_warning( f"DEBUG: Using short timeout ({debug_timeout}s) for git clone.", func_name=func_name ) result = self.log_and_execute( command=cmd, working_directory=".", check=False, capture=True, hide_console=True, log_output_level=logging.INFO, timeout_seconds=debug_timeout, # Usa il timeout di debug ) # --- FINE MODIFICA PER DEBUG --- return result def git_list_remote_branches( self, working_directory: str, remote_name: str ) -> List[str]: func_name = "git_list_remote_branches" log_handler.log_debug( f"Listing remote branches for '{remote_name}' via 'git branch -r' in '{working_directory}'", func_name=func_name, ) cmd = ["git", "branch", "-r"] try: result = self.log_and_execute( cmd, working_directory, check=True, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) remote_branches = [] if result.stdout: prefix_to_match = f"{remote_name}/" raw_lines = result.stdout.splitlines() for line in raw_lines: branch_name_raw = line.strip() if "->" in branch_name_raw: continue if branch_name_raw.startswith(prefix_to_match): remote_branches.append(branch_name_raw) log_handler.log_info( f"Found {len(remote_branches)} remote branches for '{remote_name}'.", func_name=func_name, ) log_handler.log_debug( f"Filtered remote branches for '{remote_name}': {remote_branches}", func_name=func_name, ) return sorted(remote_branches) else: log_handler.log_info( "No remote branches found in 'git branch -r' output.", func_name=func_name, ) return [] except GitCommandError as e: log_handler.log_error( f"Failed to list remote branches using 'git branch -r' for '{remote_name}': {e}", func_name=func_name, ) return [] except Exception as e: log_handler.log_exception( f"Unexpected error listing remote branches for '{remote_name}': {e}", func_name=func_name, ) return [] def checkout_new_branch_from_remote( self, working_directory: str, new_local_branch_name: str, remote_tracking_branch_full_name: str, ) -> subprocess.CompletedProcess: func_name = "checkout_new_branch_from_remote" log_handler.log_info( f"Checking out remote branch '{remote_tracking_branch_full_name}' as new local branch '{new_local_branch_name}' in '{working_directory}'", func_name=func_name, ) cmd = [ "git", "checkout", "-b", new_local_branch_name, remote_tracking_branch_full_name, ] result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result # --- Local Branch/Tag Deletion and Merging --- def delete_local_branch( self, working_directory: str, branch_name: str, force: bool = False ) -> subprocess.CompletedProcess: func_name = "delete_local_branch" delete_option = "-D" if force else "-d" action_type = "Forcing delete" if force else "Deleting" log_handler.log_info( f"{action_type} local branch '{branch_name}' in '{working_directory}' (Option: {delete_option})", func_name=func_name, ) cmd = ["git", "branch", delete_option, branch_name] result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result def git_merge( self, working_directory: str, branch_to_merge: str, commit_msg: Optional[str] = None, no_ff: bool = False, ) -> subprocess.CompletedProcess: func_name = "git_merge" log_handler.log_info( f"Merging branch '{branch_to_merge}' into current branch in '{working_directory}'", func_name=func_name, ) cmd = ["git", "merge"] if no_ff: cmd.append("--no-ff") log_handler.log_debug( "Using --no-ff option for merge.", func_name=func_name ) if commit_msg: cmd.extend(["-m", commit_msg]) log_handler.log_debug( f"Using custom merge commit message: '{commit_msg[:50]}...'", func_name=func_name, ) else: cmd.append("--no-edit") log_handler.log_debug( "Using --no-edit option (default merge message).", func_name=func_name ) cmd.append(branch_to_merge) result = self.log_and_execute( command=cmd, working_directory=working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.INFO, ) return result # --- Diff Tree --- def git_diff_tree( self, working_directory: str, ref1: str, ref2: str ) -> Tuple[int, List[str]]: func_name = "git_diff_tree" log_handler.log_debug( f"Comparing trees: '{ref1}' vs '{ref2}' in '{working_directory}'", func_name=func_name, ) cmd = ["git", "diff-tree", "--no-commit-id", "--name-status", "-r", ref1, ref2] try: result = self.log_and_execute( cmd, working_directory, check=False, capture=True, hide_console=True, log_output_level=logging.DEBUG, ) changed_files = [] if result.returncode == 0 and result.stdout: lines = result.stdout.strip().splitlines() for line in lines: if line.strip(): changed_files.append(line.strip()) log_handler.log_info( f"Found {len(changed_files)} differences between '{ref1}' and '{ref2}'.", func_name=func_name, ) elif result.returncode == 0 and not result.stdout: log_handler.log_info( f"No differences found between '{ref1}' and '{ref2}'.", func_name=func_name, ) else: log_handler.log_error( f"git diff-tree failed (RC={result.returncode}) between '{ref1}' and '{ref2}'. Stderr: {(result.stderr or 'N/A').strip()}", func_name=func_name, ) return result.returncode, changed_files except Exception as e: log_handler.log_exception( f"Unexpected error running git diff-tree: {e}", func_name=func_name ) return -1, [] def git_reset_hard(self, working_directory: str, reference: str) -> None: """ Performs a hard reset to a given reference and cleans the working directory. WARNING: This is a destructive operation. Args: working_directory (str): The path to the Git repository. reference (str): The tag, branch, or commit hash to reset to. Raises: ValueError: If the reference is empty. GitCommandError: If any of the Git commands fail. """ func_name: str = "git_reset_hard" if not reference or reference.isspace(): raise ValueError("Reference for reset cannot be empty.") log_handler.log_warning( f"Performing DESTRUCTIVE operation: git reset --hard {reference}", func_name=func_name, ) # 1. Esegui git reset --hard reset_cmd: List[str] = ["git", "reset", "--hard", reference] try: self.log_and_execute( reset_cmd, working_directory, check=True, log_output_level=logging.INFO ) log_handler.log_info( f"Hard reset to '{reference}' completed successfully.", func_name=func_name ) except GitCommandError as e: log_handler.log_error( f"git reset --hard to '{reference}' FAILED.", func_name=func_name ) # Rilancia l'eccezione per fermare l'operazione raise e log_handler.log_warning( "Performing DESTRUCTIVE operation: git clean -fdx", func_name=func_name ) # 2. Esegui git clean -fdx per rimuovere file non tracciati clean_cmd: List[str] = ["git", "clean", "-fdx"] try: self.log_and_execute( clean_cmd, working_directory, check=True, log_output_level=logging.INFO ) log_handler.log_info( "Clean of untracked files completed successfully.", func_name=func_name ) except GitCommandError as e: log_handler.log_error("git clean -fdx FAILED.", func_name=func_name) # Rilancia l'eccezione raise e # --- History Cleaner Methods --- def list_all_historical_blobs(self, working_directory: str) -> List[Tuple[str, str]]: """ Lists all blobs (file contents) ever recorded in the repository history, along with their original file paths. Returns: List[Tuple[str, str]]: A list of (blob_hash, file_path) tuples. """ func_name = "list_all_historical_blobs" log_handler.log_debug(f"Listing all historical blobs in '{working_directory}'...", func_name=func_name) cmd = ["git", "rev-list", "--all", "--objects"] try: result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG) blobs = [] lines = result.stdout.strip().splitlines() for line in lines: parts = line.split(maxsplit=1) if len(parts) == 2: blob_hash, file_path = parts blobs.append((blob_hash, file_path)) log_handler.log_info(f"Found {len(blobs)} total object entries. These will be filtered.", func_name=func_name) return blobs except GitCommandError as e: log_handler.log_error(f"Failed to list historical objects: {e}", func_name=func_name) raise def get_blob_size(self, working_directory: str, blob_hash: str) -> int: """ Gets the size of a git blob in bytes. Args: blob_hash (str): The SHA-1 hash of the blob. Returns: int: The size of the blob in bytes. """ cmd = ["git", "cat-file", "-s", blob_hash] try: result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG) return int(result.stdout.strip()) except (GitCommandError, ValueError) as e: raise GitCommandError(f"Failed to get size for blob {blob_hash}: {e}", command=cmd) from e def run_filter_repo(self, working_directory: str, paths_file: str) -> None: """ Executes 'git-filter-repo' to remove files specified in a file. Args: working_directory (str): Path to the repository. paths_file (str): Path to a file containing one file path per line to remove. """ func_name = "run_filter_repo" log_handler.log_warning(f"--- Running DESTRUCTIVE git-filter-repo in '{working_directory}' ---", func_name=func_name) cmd = [ "git-filter-repo", "--paths-from-file", paths_file, "--invert-paths", "--force" ] self.log_and_execute( command=cmd, working_directory=working_directory, check=True, capture=True, hide_console=False, log_output_level=logging.INFO, timeout_seconds=7200 ) def force_push_all(self, working_directory: str, remote_name: str) -> None: """Force pushes all local branches to the specified remote.""" func_name = "force_push_all" log_handler.log_warning(f"Force pushing all branches to remote '{remote_name}'...", func_name=func_name) cmd = ["git", "push", remote_name, "--all", "--force"] self.log_and_execute( command=cmd, working_directory=working_directory, check=True, capture=True, hide_console=False ) def force_push_tags(self, working_directory: str, remote_name: str) -> None: """Force pushes all local tags to the specified remote.""" func_name = "force_push_tags" log_handler.log_warning(f"Force pushing all tags to remote '{remote_name}'...", func_name=func_name) cmd = ["git", "push", remote_name, "--tags", "--force"] self.log_and_execute( command=cmd, working_directory=working_directory, check=True, capture=True, hide_console=False ) def set_branch_upstream(self, working_directory: str, branch_name: str, remote_name: str) -> None: """ Sets the upstream for a local branch to track a remote branch with the same name. Args: working_directory (str): The path to the Git repository. branch_name (str): The local branch name. remote_name (str): The name of the remote (e.g., 'origin'). Raises: GitCommandError: If the command fails. """ func_name = "set_branch_upstream" upstream_ref = f"{remote_name}/{branch_name}" log_handler.log_debug( f"Setting upstream for branch '{branch_name}' to '{upstream_ref}'", func_name=func_name ) cmd = ["git", "branch", f"--set-upstream-to={upstream_ref}", branch_name] self.log_and_execute( command=cmd, working_directory=working_directory, check=True ) # --- Submodule Management --- def submodule_add(self, working_directory: str, repo_url: str, path: str, branch: Optional[str] = None) -> None: """Adds a new submodule to the repository, optionally tracking a specific branch.""" func_name = "submodule_add" log_handler.log_info(f"Adding submodule from '{repo_url}' into '{path}'", func_name=func_name) cmd = ["git", "submodule", "add"] if branch: # L'opzione -b dice a git di aggiungere la configurazione del branch in .gitmodules log_handler.log_info(f"Configuring submodule to track branch: '{branch}'", func_name=func_name) cmd.extend(["-b", branch]) cmd.extend(["--", repo_url, path]) self.log_and_execute(cmd, working_directory, check=True) def submodule_status(self, working_directory: str) -> List[Dict[str, str]]: """ Gets the status of all submodules, including details about local modifications. """ func_name = "submodule_status" log_handler.log_debug(f"Getting submodule status in '{working_directory}'", func_name=func_name) cmd = ["git", "submodule", "status", "--recursive"] result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG) submodules = [] status_regex = re.compile( r"^\s*([U +-])?" r"([0-9a-fA-F]+)\s" r"(.*?)" r"(?:\s\((.*?)\))?\s*$" ) for line in result.stdout.strip().splitlines(): line = line.strip() if not line: continue match = status_regex.match(line) if match: status_char, commit, path, description = match.groups() final_status_char = status_char.strip() if status_char else ' ' tracked_branch = "N/A" try: # Diciamo esplicitamente a git config di leggere SOLO dal file .gitmodules branch_cmd = [ "git", "config", "--file", ".gitmodules", f"submodule.{path.strip()}.branch" ] branch_result = self.log_and_execute(branch_cmd, working_directory, check=False) # check=False, quindi dobbiamo controllare il returncode. # Se il comando ha successo (RC=0) e c'è un output, abbiamo trovato il branch. if branch_result.returncode == 0 and branch_result.stdout: tracked_branch = branch_result.stdout.strip() except GitCommandError: # Se il comando stesso fallisce per un'altra ragione, ignoriamo. pass # --- NUOVA LOGICA: ARRICCHIMENTO DATI --- dirty_details = "" # Se lo status indica una modifica ('+' per nuovo commit o 'U' per conflitto), # o se il carattere è spazio (potrebbe avere modifiche non committate) if final_status_char in ['+', 'U', ' ']: submodule_full_path = os.path.join(working_directory, path) if os.path.isdir(submodule_full_path): try: # Controlla se ci sono modifiche locali nel submodule if self.git_status_has_changes(submodule_full_path): dirty_details = "modified content" except GitCommandError: # Potrebbe fallire se la cartella non è un repo git (stato corrotto) dirty_details = "error checking status" # ------------------------------------ submodules.append({ "status_char": final_status_char, "commit": commit.strip(), "path": path.strip(), "description": (description or "N/A").strip(), "dirty_details": dirty_details , "tracked_branch": tracked_branch }) else: log_handler.log_warning(f"Could not parse submodule status line: '{line}'", func_name=func_name) return submodules def submodule_update( self, working_directory: str, init: bool, remote: bool, recursive: bool, merge: bool ) -> None: """Updates submodules with flexible options.""" func_name = "submodule_update" log_handler.log_info(f"Updating submodules in '{working_directory}' (init={init}, remote={remote}, recursive={recursive})", func_name=func_name) cmd = ["git", "submodule", "update"] if init: cmd.append("--init") if remote: cmd.append("--remote") if recursive: cmd.append("--recursive") if merge: cmd.append("--merge") self.log_and_execute(cmd, working_directory, check=True) def submodule_deinit(self, working_directory: str, path: str) -> None: """De-initializes a submodule, a step in removing it.""" func_name = "submodule_deinit" log_handler.log_info(f"De-initializing submodule at '{path}'", func_name=func_name) # Use -f to force deinit even with local changes cmd = ["git", "submodule", "deinit", "-f", "--", path] self.log_and_execute(cmd, working_directory, check=True) def remove_from_index(self, working_directory: str, path: str) -> None: """ Forcibly removes a path from the Git index (staging area) only. This does not delete the file from the working directory. Args: working_directory (str): Path to the repository. path (str): The relative path to the item to remove from the index. Raises: GitCommandError: If the 'git rm --cached' command fails. """ func_name = "remove_from_index" log_handler.log_info( f"Forcibly removing path from index (cached): '{path}' in '{working_directory}'", func_name=func_name ) # Use -f to force removal even if there are local modifications cmd = ["git", "rm", "--cached", "-f", path] try: self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info(f"Path '{path}' removed from index successfully.", func_name=func_name) except GitCommandError as e: log_handler.log_error(f"Failed to 'git rm --cached {path}': {e}", func_name=func_name) raise def remove_config_section(self, working_directory: str, section_name: str) -> None: """ Removes a section from the local .git/config file. Args: working_directory (str): Path to the repository. section_name (str): The full name of the section to remove (e.g., 'submodule.libs/path'). Raises: GitCommandError: If the command fails with an unexpected error. """ func_name = "remove_config_section" log_handler.log_info( f"Removing config section '{section_name}' from local .git/config", func_name=func_name ) cmd = ["git", "config", "-f", ".git/config", "--remove-section", section_name] try: result = self.log_and_execute(cmd, working_directory, check=False) # Git returns non-zero if the section doesn't exist. This is not an error for a clean-up operation. # We only raise an error if the command fails for a different reason. if result.returncode != 0 and "no such section" not in (result.stderr or "").lower(): raise GitCommandError( f"Git config command failed with an unexpected error (RC {result.returncode})", command=cmd, stderr=result.stderr ) log_handler.log_info(f"Config section '{section_name}' removed (if it existed).", func_name=func_name) except GitCommandError as e: log_handler.log_error(f"Failed to remove config section '{section_name}': {e}", func_name=func_name) raise def get_registered_submodules(self, working_directory: str) -> Dict[str, str]: """ Retrieves the list of registered submodules and their URLs from the .gitmodules file. This reads the configuration, not the on-disk status. Args: working_directory (str): Path to the repository. Returns: A dictionary mapping submodule paths to their URLs. """ func_name = "get_registered_submodules" log_handler.log_debug( f"Getting registered submodules from config in '{working_directory}'", func_name=func_name ) submodules = {} # The command 'git config --file .gitmodules --get-regexp path' is perfect for this. # It reads .gitmodules and finds all entries that have a 'path' property. cmd = ["git", "config", "--file", ".gitmodules", "--get-regexp", r"submodule\..*\.path"] try: result = self.log_and_execute(cmd, working_directory, check=False) # If .gitmodules doesn't exist, the command will fail, which is okay. if result.returncode != 0 or not result.stdout: log_handler.log_info("No .gitmodules file found or it's empty. No registered submodules.", func_name=func_name) return {} # Output is like: submodule.libs/log-handler-lib.path libs/log-handler-lib for line in result.stdout.strip().splitlines(): # Extract the section name (e.g., submodule.libs/log-handler-lib) and the path key, path = line.split(maxsplit=1) submodule_name = key.replace("submodule.", "").replace(".path", "") # Now get the URL for this submodule url_cmd = ["git", "config", "--file", ".gitmodules", f"submodule.{submodule_name}.url"] url_result = self.log_and_execute(url_cmd, working_directory, check=True) url = url_result.stdout.strip() submodules[path] = url log_handler.log_info(f"Found {len(submodules)} registered submodules: {list(submodules.keys())}", func_name=func_name) return submodules except GitCommandError as e: # This can happen if .gitmodules is malformed. log_handler.log_error(f"Failed to read registered submodules: {e}", func_name=func_name) return {} # Return empty on error def discover_submodules_in_any_state(self, working_directory: str) -> List[str]: """ Discovers submodule paths by looking in ALL possible Git locations: .gitmodules (on disk and in index), .git/config, and the index itself for gitlink entries. This is the most robust way to find submodules in any consistent or inconsistent state. Args: working_directory (str): Path to the repository. Returns: A list of unique submodule paths found. """ func_name = "discover_submodules_in_any_state" log_handler.log_debug( f"Discovering submodules in any state in '{working_directory}'", func_name=func_name ) found_paths = set() # Method 1: Look in the on-disk .gitmodules file try: cmd_gitmodules = ["git", "config", "--file", ".gitmodules", "--get-regexp", r"submodule\..*\.path"] result_gitmodules = self.log_and_execute(cmd_gitmodules, working_directory, check=False) if result_gitmodules.returncode == 0 and result_gitmodules.stdout: for line in result_gitmodules.stdout.strip().splitlines(): path = line.split(maxsplit=1)[1] found_paths.add(path) except GitCommandError as e: log_handler.log_warning(f"Could not read on-disk .gitmodules: {e}", func_name=func_name) # Method 2: Look in the version of .gitmodules in the Git index try: gitmodules_content_from_index = self.get_file_content_from_ref( working_directory, file_path=".gitmodules", ref=":" ) if gitmodules_content_from_index: log_handler.log_info("Found .gitmodules in Git index. Parsing its content.", func_name=func_name) for match in re.finditer(r"^\s*path\s*=\s*(.*)$", gitmodules_content_from_index, re.MULTILINE): path = match.group(1).strip() if path: found_paths.add(path) except Exception as e_index: log_handler.log_error(f"Error parsing .gitmodules content from index: {e_index}", func_name=func_name) # Method 3: Look in .git/config for submodule sections try: cmd_config = ["git", "config", "-f", ".git/config", "--get-regexp", r"submodule\..*\.path"] result_config = self.log_and_execute(cmd_config, working_directory, check=False) if result_config.returncode == 0 and result_config.stdout: for line in result_config.stdout.strip().splitlines(): path = line.split(maxsplit=1)[1] found_paths.add(path) except GitCommandError as e: log_handler.log_warning(f"Could not read submodule paths from .git/config: {e}", func_name=func_name) # Method 4: Look in the index for gitlink entries (mode 160000) # This is the definitive way to find correctly registered submodules. try: cmd_index = ["git", "ls-files", "--stage"] result_index = self.log_and_execute(cmd_index, working_directory, check=True) for line in result_index.stdout.strip().splitlines(): parts = line.split() if len(parts) >= 4 and parts[0] == "160000": path = parts[3] found_paths.add(path) except GitCommandError as e: log_handler.log_error(f"Failed to read submodules from git index: {e}", func_name=func_name) sorted_paths = sorted(list(found_paths)) log_handler.log_info(f"Discovered {len(sorted_paths)} unique submodule paths: {sorted_paths}", func_name=func_name) return sorted_paths def git_clean(self, working_directory: str) -> None: """ Cleans the working tree by recursively removing untracked files from the current directory. Args: working_directory (str): The directory to clean. Raises: GitCommandError: If the 'git clean' command fails. """ func_name = "git_clean" log_handler.log_warning( f"Performing 'git clean -fd' in '{working_directory}'. This will delete untracked files.", func_name=func_name ) # -f (force) is required. -d removes untracked directories. cmd = ["git", "clean", "-fd"] try: self.log_and_execute(cmd, working_directory, check=True) log_handler.log_info(f"Successfully cleaned working directory: '{working_directory}'", func_name=func_name) except GitCommandError as e: log_handler.log_error(f"Failed to clean directory '{working_directory}': {e}", func_name=func_name) raise def get_diff_files(self, working_directory: str, ref1: str, ref2: Optional[str]) -> List[str]: """ Gets a list of changed files between two references using 'git diff'. If ref2 is None, compares ref1 to the working directory. """ func_name = "get_diff_files" log_handler.log_debug( f"Getting diff files between '{ref1}' and '{ref2 or 'Working Dir'}'", func_name=func_name ) cmd = ["git", "diff", "--name-status", ref1] if ref2: cmd.append(ref2) result = self.log_and_execute(cmd, working_directory, check=True, log_output_level=logging.DEBUG) return [line.strip() for line in result.stdout.strip().splitlines() if line.strip()] def fetch_in_directory(self, working_directory: str) -> None: """ Runs 'git fetch' in a specific directory. Used for submodules. """ func_name = "fetch_in_directory" log_handler.log_info(f"Fetching remote updates in '{working_directory}'", func_name=func_name) cmd = ["git", "fetch"] self.log_and_execute(cmd, working_directory, check=True) def get_commit_count_between(self, working_directory: str, ref1: str, ref2: str) -> int: """ Counts the number of commits between two references. Typically used to find how many commits ahead/behind a branch is. Example: get_commit_count_between(path, "HEAD", "origin/master") -> commits behind. """ func_name = "get_commit_count_between" cmd = ["git", "rev-list", "--count", f"{ref1}..{ref2}"] try: result = self.log_and_execute(cmd, working_directory, check=True) return int(result.stdout.strip()) except (GitCommandError, ValueError) as e: log_handler.log_error(f"Could not count commits between '{ref1}' and '{ref2}': {e}", func_name=func_name) return -1 # Ritorna -1 per indicare un errore