SXXXXXXX_GitUtility/action_handler.py
2025-04-23 14:20:18 +02:00

977 lines
42 KiB
Python

# --- START OF FILE action_handler.py ---
# action_handler.py
# Rimosso import logging standard
import os
import datetime
# Importa il nuovo gestore della coda log
import log_handler
# Importa le dipendenze necessarie
from git_commands import GitCommands, GitCommandError
from backup_handler import BackupHandler
class ActionHandler:
"""
Handles the execution logic for core application actions.
Orchestrates calls to GitCommands and BackupHandler.
Methods are executed synchronously but are intended to be called
from worker threads managed by the main application controller.
Uses log_handler to send log messages to a central queue.
"""
# Rimosso logger dal costruttore
def __init__(self, git_commands: GitCommands, backup_handler: BackupHandler):
"""
Initializes the ActionHandler.
Args:
git_commands (GitCommands): Instance for executing Git commands.
backup_handler (BackupHandler): Instance for handling backups.
Raises:
TypeError: If provided arguments are not of the expected types.
"""
# Validazione tipi input
if not isinstance(git_commands, GitCommands):
raise TypeError("ActionHandler requires a GitCommands instance.")
if not isinstance(backup_handler, BackupHandler):
raise TypeError("ActionHandler requires a BackupHandler instance.")
self.git_commands = git_commands
self.backup_handler = backup_handler
# Log tramite il nuovo handler
log_handler.log_debug("ActionHandler initialized.", func_name="__init__")
def _perform_backup_if_enabled(
self,
repo_path: str,
profile_name: str,
autobackup_enabled: bool,
backup_base_dir: str,
excluded_extensions: set,
excluded_dirs: set,
):
"""
Performs backup using BackupHandler if enabled. Uses log_handler.
Args:
repo_path (str): Path to the repository.
profile_name (str): Name of the profile (for backup naming).
autobackup_enabled (bool): Flag from settings.
backup_base_dir (str): Destination directory for backups.
excluded_extensions (set): File extensions to exclude (lowercase).
excluded_dirs (set): Directory base names to exclude (lowercase).
Returns:
str | None: Path to created backup file, or None if disabled/failed/empty.
Raises:
IOError: If the backup process fails critically.
ValueError: If input arguments are invalid.
"""
func_name = "_perform_backup_if_enabled" # Nome funzione per log
if not autobackup_enabled:
log_handler.log_debug(
f"Autobackup disabled for '{profile_name}', skipping.",
func_name=func_name,
)
return None
log_handler.log_info(
f"Autobackup enabled for '{profile_name}'. Starting backup...",
func_name=func_name,
)
log_handler.log_debug(f" Source: {repo_path}", func_name=func_name)
log_handler.log_debug(f" Destination: {backup_base_dir}", func_name=func_name)
log_handler.log_debug(
f" Exclude Exts: {excluded_extensions}", func_name=func_name
)
log_handler.log_debug(f" Exclude Dirs: {excluded_dirs}", func_name=func_name)
# Validation
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid repo_path for backup: '{repo_path}'")
if not backup_base_dir:
raise ValueError("backup_base_dir cannot be empty.")
if not isinstance(excluded_extensions, set):
raise TypeError("excluded_extensions must be a set.")
if not isinstance(excluded_dirs, set):
raise TypeError("excluded_dirs must be a set.")
# Delegate to BackupHandler
try:
backup_path = self.backup_handler.create_zip_backup(
source_repo_path=repo_path,
backup_base_dir=backup_base_dir,
profile_name=profile_name,
excluded_extensions=excluded_extensions,
excluded_dirs_base=excluded_dirs,
)
if backup_path:
log_handler.log_info(
f"Backup completed successfully: {backup_path}", func_name=func_name
)
else:
log_handler.log_warning(
"Backup finished but no file generated.", func_name=func_name
)
return backup_path
except (IOError, ValueError, PermissionError) as backup_e:
# Usa log_error invece di logger.error
log_handler.log_error(
f"Backup failed for '{profile_name}': {backup_e}", func_name=func_name
)
raise IOError(
f"Autobackup failed for '{profile_name}': {backup_e}"
) from backup_e
except Exception as unexpected_e:
# Usa log_exception per errori imprevisti (include traceback su console)
log_handler.log_exception(
f"Unexpected error during backup for '{profile_name}': {unexpected_e}",
func_name=func_name,
)
raise IOError(
f"Unexpected autobackup failure: {unexpected_e}"
) from unexpected_e
def execute_prepare_repo(self, repo_path: str):
"""Executes 'prepare repository' action. Uses log_handler."""
func_name = "execute_prepare_repo"
log_handler.log_info(
f"Executing preparation for repository: {repo_path}", func_name=func_name
)
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid directory for preparation: '{repo_path}'")
git_dir = os.path.join(repo_path, ".git")
if os.path.exists(git_dir):
log_handler.log_warning(
f"Repo at '{repo_path}' already prepared.", func_name=func_name
)
raise ValueError("Repository is already prepared.")
try:
self.git_commands.prepare_svn_for_git(repo_path)
log_handler.log_info(
f"Repository prepared successfully: {repo_path}", func_name=func_name
)
return True
except (GitCommandError, ValueError, IOError) as e:
log_handler.log_error(
f"Failed to prepare repo '{repo_path}': {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected error preparing repo '{repo_path}': {e}",
func_name=func_name,
)
raise Exception(f"Unexpected preparation error in '{repo_path}'") from e
def execute_create_bundle(
self,
repo_path: str,
bundle_full_path: str,
profile_name: str,
autobackup_enabled: bool,
backup_base_dir: str,
autocommit_enabled: bool,
commit_message: str,
excluded_extensions: set,
excluded_dirs: set,
):
"""Executes 'create bundle'. Uses log_handler."""
func_name = "execute_create_bundle"
log_handler.log_info(
f"Executing 'Create Bundle' for profile '{profile_name}'...",
func_name=func_name,
)
log_handler.log_debug(f" Repo: {repo_path}", func_name=func_name)
log_handler.log_debug(f" Bundle: {bundle_full_path}", func_name=func_name)
log_handler.log_debug(
f" Autobackup: {autobackup_enabled}, Backup Dir: {backup_base_dir}",
func_name=func_name,
)
log_handler.log_debug(
f" Autocommit: {autocommit_enabled}", func_name=func_name
)
log_handler.log_debug(
f" Backup Exclude Exts: {excluded_extensions}", func_name=func_name
)
log_handler.log_debug(
f" Backup Exclude Dirs: {excluded_dirs}", func_name=func_name
)
# --- 1. Backup Step ---
# (Chiama _perform_backup_if_enabled che ora usa log_handler)
self._perform_backup_if_enabled(
repo_path,
profile_name,
autobackup_enabled,
backup_base_dir,
excluded_extensions,
excluded_dirs,
)
# --- 2. Autocommit Step ---
if autocommit_enabled:
log_handler.log_info(
"Autocommit enabled. Checking for changes...", func_name=func_name
)
try:
has_changes = self.git_commands.git_status_has_changes(repo_path)
if has_changes:
log_handler.log_info(
"Changes detected, performing autocommit...",
func_name=func_name,
)
commit_msg_to_use = commit_message.strip()
if not commit_msg_to_use:
now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
commit_msg_to_use = (
f"Autocommit '{profile_name}' before bundle - {now_str}"
)
log_handler.log_debug(
f"Using autocommit message: '{commit_msg_to_use}'",
func_name=func_name,
)
commit_made = self.git_commands.git_commit(
repo_path, commit_msg_to_use
)
if commit_made:
log_handler.log_info(
"Autocommit successful.", func_name=func_name
)
else:
log_handler.log_warning(
"Autocommit attempted, but no effective changes committed.",
func_name=func_name,
)
else:
log_handler.log_info(
"No changes detected, skipping autocommit.", func_name=func_name
)
except (GitCommandError, ValueError) as commit_e:
log_handler.log_error(
f"Autocommit failed for '{profile_name}': {commit_e}",
func_name=func_name,
)
raise commit_e
except Exception as commit_e:
log_handler.log_exception(
f"Unexpected error during autocommit for '{profile_name}': {commit_e}",
func_name=func_name,
)
raise Exception("Unexpected autocommit error") from commit_e
else:
log_handler.log_debug(
"Autocommit disabled, skipping commit step.", func_name=func_name
)
# --- 3. Create Bundle Step ---
log_handler.log_info(
f"Creating bundle file: {bundle_full_path}", func_name=func_name
)
try:
self.git_commands.create_git_bundle(repo_path, bundle_full_path)
bundle_exists = os.path.exists(bundle_full_path)
bundle_not_empty = bundle_exists and os.path.getsize(bundle_full_path) > 0
if bundle_exists and bundle_not_empty:
log_handler.log_info(
"Git bundle created successfully.", func_name=func_name
)
return bundle_full_path
else:
log_handler.log_warning(
f"Bundle file '{bundle_full_path}' not created or empty.",
func_name=func_name,
)
if bundle_exists and not bundle_not_empty:
try:
os.remove(bundle_full_path)
log_handler.log_info(
f"Removed empty bundle: {bundle_full_path}",
func_name=func_name,
)
except OSError as rm_err:
log_handler.log_warning(
f"Could not remove empty bundle: {rm_err}",
func_name=func_name,
)
return None
except (GitCommandError, ValueError) as bundle_e:
log_handler.log_error(
f"Bundle creation failed for '{profile_name}': {bundle_e}",
func_name=func_name,
)
raise bundle_e
except Exception as bundle_e:
log_handler.log_exception(
f"Unexpected error during bundle creation for '{profile_name}': {bundle_e}",
func_name=func_name,
)
raise Exception("Unexpected bundle creation error") from bundle_e
def execute_fetch_bundle(
self,
target_repo_path_str: str,
bundle_full_path: str,
profile_name: str,
autobackup_enabled: bool,
backup_base_dir: str,
excluded_extensions: set,
excluded_dirs: set,
):
"""Executes 'fetch bundle' (clone or fetch/merge). Uses log_handler."""
func_name = "execute_fetch_bundle"
log_handler.log_info(
f"Executing 'Fetch from Bundle' for profile '{profile_name}'...",
func_name=func_name,
)
log_handler.log_debug(
f" Target Dir: {target_repo_path_str}", func_name=func_name
)
log_handler.log_debug(f" Bundle File: {bundle_full_path}", func_name=func_name)
log_handler.log_debug(
f" Autobackup: {autobackup_enabled}", func_name=func_name
)
# ... (altri log debug) ...
if not os.path.isfile(bundle_full_path):
error_msg = f"Bundle file not found: '{bundle_full_path}'"
log_handler.log_error(error_msg, func_name=func_name)
raise FileNotFoundError(error_msg)
git_dir_path = os.path.join(target_repo_path_str, ".git")
is_existing_repo = os.path.isdir(target_repo_path_str) and os.path.exists(
git_dir_path
)
log_handler.log_debug(
f"Target is existing repo: {is_existing_repo}", func_name=func_name
)
if is_existing_repo:
log_handler.log_info(
"Target is existing repo. Fetching/Merging...", func_name=func_name
)
# --- 1. Backup Step ---
self._perform_backup_if_enabled(
target_repo_path_str,
profile_name,
autobackup_enabled,
backup_base_dir,
excluded_extensions,
excluded_dirs,
)
# --- 2. Fetch/Merge Step ---
log_handler.log_info(
f"Fetching into '{target_repo_path_str}'...", func_name=func_name
)
try:
self.git_commands.fetch_from_git_bundle(
target_repo_path_str, bundle_full_path
)
log_handler.log_info(
"Fetch/merge process completed successfully.", func_name=func_name
)
return True
except GitCommandError as fetch_e:
log_handler.log_error(
f"Fetch/merge failed: {fetch_e}", func_name=func_name
)
raise fetch_e # Re-raise original error
except Exception as fetch_e:
log_handler.log_exception(
f"Unexpected fetch/merge error: {fetch_e}", func_name=func_name
)
raise Exception("Unexpected fetch/merge error") from fetch_e
else:
log_handler.log_info(
f"Target '{target_repo_path_str}' not repo. Cloning...",
func_name=func_name,
)
try:
success = self.git_commands.clone_from_bundle(
bundle_full_path, target_repo_path_str
)
if success:
log_handler.log_info(
f"Cloned successfully into '{target_repo_path_str}'.",
func_name=func_name,
)
return True
else:
log_handler.log_error(
"Cloning reported failure unexpectedly.", func_name=func_name
)
raise Exception(
f"Cloning into '{target_repo_path_str}' failed unexpectedly."
)
except (GitCommandError, ValueError, FileNotFoundError, IOError) as clone_e:
log_handler.log_error(f"Cloning failed: {clone_e}", func_name=func_name)
raise clone_e # Re-raise original error
except Exception as clone_e:
log_handler.log_exception(
f"Unexpected clone error: {clone_e}", func_name=func_name
)
raise Exception("Unexpected clone error") from clone_e
# --- Altri Metodi (commit, tag, branch, untrack) ---
# (Modificare tutte le chiamate a self.logger.* in log_handler.log_* )
def execute_manual_commit(self, repo_path: str, commit_message: str):
func_name = "execute_manual_commit"
log_handler.log_info(
f"Executing manual commit for: {repo_path}", func_name=func_name
)
if not commit_message or commit_message.isspace():
raise ValueError("Commit message cannot be empty.")
try:
commit_made = self.git_commands.git_commit(repo_path, commit_message)
if commit_made:
log_handler.log_info("Manual commit successful.", func_name=func_name)
else:
log_handler.log_info(
"Manual commit: No changes available.", func_name=func_name
)
return commit_made
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Manual commit failed: {e}", func_name=func_name)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected manual commit error: {e}", func_name=func_name
)
raise Exception("Unexpected commit error") from e
def execute_create_tag(
self, repo_path: str, ignored: None, tag_name: str, tag_message: str
):
func_name = "execute_create_tag"
if not tag_name or tag_name.isspace():
raise ValueError("Tag name cannot be empty.")
if not tag_message or tag_message.isspace():
raise ValueError("Tag message cannot be empty.")
log_handler.log_info(
f"Executing create tag '{tag_name}' for: {repo_path}", func_name=func_name
)
log_handler.log_info(
"Attempting pre-tag commit using tag message...", func_name=func_name
)
try:
commit_made = self.git_commands.git_commit(repo_path, tag_message)
if commit_made:
log_handler.log_info(
f"Pre-tag commit successful: '{tag_message}'", func_name=func_name
)
else:
log_handler.log_info(
"No changes for pre-tag commit.", func_name=func_name
)
log_handler.log_info(
f"Creating annotated tag '{tag_name}'...", func_name=func_name
)
self.git_commands.create_tag(repo_path, tag_name, tag_message)
log_handler.log_info(
f"Tag '{tag_name}' created successfully.", func_name=func_name
)
return True
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Failed commit or create tag '{tag_name}': {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected error creating tag '{tag_name}': {e}", func_name=func_name
)
raise Exception(f"Unexpected error creating tag '{tag_name}'") from e
def execute_checkout_tag(self, repo_path: str, tag_name: str):
func_name = "execute_checkout_tag"
if not tag_name:
raise ValueError("Tag name required.")
log_handler.log_info(
f"Executing checkout tag '{tag_name}' in: {repo_path}", func_name=func_name
)
try: # Check changes
if self.git_commands.git_status_has_changes(repo_path):
raise ValueError("Uncommitted changes exist. Commit or stash first.")
log_handler.log_debug("No uncommitted changes found.", func_name=func_name)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Status check error before checkout: {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected status check error: {e}", func_name=func_name
)
raise Exception("Unexpected status check error") from e
try: # Checkout
success = self.git_commands.checkout_tag(repo_path, tag_name)
if success:
log_handler.log_info(
f"Tag '{tag_name}' checked out (Detached HEAD).",
func_name=func_name,
)
return True
else:
raise GitCommandError("Checkout failed for unknown reason.")
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Failed checkout tag '{tag_name}': {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected checkout error: {e}", func_name=func_name
)
raise Exception("Unexpected tag checkout error") from e
def execute_create_branch(self, repo_path: str, branch_name: str):
func_name = "execute_create_branch"
log_handler.log_info(
f"Executing create branch '{branch_name}' in: {repo_path}",
func_name=func_name,
)
if not branch_name:
raise ValueError("Branch name cannot be empty.")
try:
success = self.git_commands.create_branch(repo_path, branch_name)
if success:
log_handler.log_info(
f"Branch '{branch_name}' created successfully.", func_name=func_name
)
return True
else:
raise GitCommandError(f"Branch creation '{branch_name}' failed.")
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Failed create branch: {e}", func_name=func_name)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected create branch error: {e}", func_name=func_name
)
raise Exception("Unexpected branch creation error") from e
def execute_switch_branch(self, repo_path: str, branch_name: str):
func_name = "execute_switch_branch"
if not branch_name:
raise ValueError("Branch name required.")
log_handler.log_info(
f"Executing switch to branch '{branch_name}'.", func_name=func_name
)
try: # Check changes
if self.git_commands.git_status_has_changes(repo_path):
raise ValueError("Uncommitted changes exist. Commit or stash first.")
log_handler.log_debug("No uncommitted changes found.", func_name=func_name)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Status check error before switch: {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected status check error: {e}", func_name=func_name
)
raise Exception("Unexpected status check error") from e
try: # Switch
success = self.git_commands.checkout_branch(repo_path, branch_name)
if success:
log_handler.log_info(
f"Switched to branch '{branch_name}'.", func_name=func_name
)
return True
else:
raise GitCommandError(f"Switch to branch '{branch_name}' failed.")
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Failed switch branch '{branch_name}': {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Unexpected switch branch error: {e}", func_name=func_name
)
raise Exception("Unexpected branch switch error") from e
def execute_delete_branch(self, repo_path: str, branch_name: str, force=False):
func_name = "execute_delete_branch"
if not branch_name:
raise ValueError("Branch name required.")
log_handler.log_info(
f"Executing delete branch '{branch_name}' (force={force}).",
func_name=func_name,
)
try:
# success = self.git_commands.delete_branch(repo_path, branch_name, force)
# return success
log_handler.log_warning(
"execute_delete_branch not fully implemented.", func_name=func_name
)
raise NotImplementedError("Branch deletion logic needs implementation.")
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Failed delete branch '{branch_name}': {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Failed delete branch '{branch_name}': {e}", func_name=func_name
)
raise Exception("Unexpected branch deletion error") from e
def execute_delete_tag(self, repo_path: str, tag_name: str):
func_name = "execute_delete_tag"
if not tag_name:
raise ValueError("Tag name required.")
log_handler.log_info(
f"Executing delete tag '{tag_name}' in: {repo_path}", func_name=func_name
)
try:
# success = self.git_commands.delete_tag(repo_path, tag_name)
# return success
log_handler.log_warning(
"execute_delete_tag not fully implemented.", func_name=func_name
)
raise NotImplementedError("Tag deletion logic needs implementation.")
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"Failed delete tag '{tag_name}': {e}", func_name=func_name
)
raise e
except Exception as e:
log_handler.log_exception(
f"Failed delete tag '{tag_name}': {e}", func_name=func_name
)
raise Exception("Unexpected tag deletion error") from e
def execute_untrack_files_from_gitignore(self, repo_path: str):
func_name = "execute_untrack_files_from_gitignore"
log_handler.log_info(
f"Checking for tracked files to untrack in '{repo_path}'...",
func_name=func_name,
)
rules_to_files_map = {}
all_files_to_untrack = []
try:
tracked_files = self.git_commands.get_tracked_files(repo_path)
log_handler.log_debug(
f"Checking {len(tracked_files)} tracked files...", func_name=func_name
)
for file_path in tracked_files:
norm_file_path = os.path.normpath(file_path)
if norm_file_path == ".gitignore":
continue
matching_rule = self.git_commands.get_matching_gitignore_rule(
repo_path, file_path
)
if matching_rule is not None:
log_handler.log_info(
f"Tracked '{file_path}' matches rule: '{matching_rule}'",
func_name=func_name,
)
if matching_rule not in rules_to_files_map:
rules_to_files_map[matching_rule] = []
rules_to_files_map[matching_rule].append(file_path)
all_files_to_untrack.append(file_path)
if all_files_to_untrack:
num_untrack = len(all_files_to_untrack)
log_handler.log_info(
f"Found {num_untrack} tracked files to untrack.",
func_name=func_name,
)
self.git_commands.remove_from_tracking(repo_path, all_files_to_untrack)
log_handler.log_info(
f"Removed {num_untrack} items from tracking.", func_name=func_name
)
commit_message = "Chore: Stop tracking files based on .gitignore update.\n\nSummary:\n"
sorted_rules = sorted(rules_to_files_map.keys())
for rule in sorted_rules:
count = len(rules_to_files_map[rule])
plural = "s" if count > 1 else ""
commit_message += (
f'- Rule "{rule}" untracked {count} file{plural}.\n'
)
log_handler.log_info(
"Creating automatic commit for untracking changes.",
func_name=func_name,
)
log_handler.log_debug(
f"Commit message:\n{commit_message}", func_name=func_name
)
commit_made = self.git_commands.git_commit(repo_path, commit_message)
if commit_made:
log_handler.log_info(
"Automatic untrack commit successful.", func_name=func_name
)
return True
else:
log_handler.log_warning(
"Untracking done, but commit reported no changes.",
func_name=func_name,
)
return False
else:
log_handler.log_info(
"No tracked files match .gitignore rules. No action.",
func_name=func_name,
)
return False
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Error during untracking: {e}", func_name=func_name)
raise
except Exception as e:
log_handler.log_exception(
f"Unexpected untracking error: {e}", func_name=func_name
)
raise Exception("Unexpected untracking error") from e
def execute_checkout_tracking_branch(
self,
repo_path: str,
new_local_branch_name: str,
remote_tracking_branch_full_name: str, # Es. 'origin/main'
) -> dict:
"""
Checks out a remote branch as a new local tracking branch.
Handles the case where the local branch name might already exist (should be checked by caller).
Args:
repo_path (str): Path to the local repository.
new_local_branch_name (str): The name for the new local branch.
remote_tracking_branch_full_name (str): The full name of the remote-tracking branch.
Returns:
dict: A dictionary containing the status ('success', 'error'), message,
and optionally an exception.
Raises:
ValueError: If input arguments are invalid.
# GitCommandError is handled and returned in the dictionary
"""
func_name = "execute_checkout_tracking_branch"
log_handler.log_info(
f"Executing checkout tracking branch: Local='{new_local_branch_name}', "
f"Remote='{remote_tracking_branch_full_name}' in '{repo_path}'",
func_name=func_name,
)
# --- Input Validation ---
if not repo_path or not os.path.isdir(repo_path):
raise ValueError(f"Invalid repository path: '{repo_path}'")
if not new_local_branch_name or new_local_branch_name.isspace():
raise ValueError("New local branch name cannot be empty.")
if (
not remote_tracking_branch_full_name
or "/" not in remote_tracking_branch_full_name
):
raise ValueError(
f"Invalid remote tracking branch name format: '{remote_tracking_branch_full_name}' (expecting 'remote/branch')"
)
if not os.path.exists(os.path.join(repo_path, ".git")):
raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
# --- Pre-check: Uncommitted changes? ---
# È buona pratica impedire checkout se ci sono modifiche non salvate
try:
if self.git_commands.git_status_has_changes(repo_path):
msg = f"Checkout aborted: Uncommitted changes detected. Please commit or stash first before checking out '{new_local_branch_name}'."
log_handler.log_warning(msg, func_name=func_name)
return {"status": "error", "message": msg, "exception": ValueError(msg)}
except GitCommandError as status_err:
msg = f"Checkout aborted: Failed to check repository status: {status_err}"
log_handler.log_error(msg, func_name=func_name)
return {"status": "error", "message": msg, "exception": status_err}
# --- Esecuzione Comando Git ---
result_info = {"status": "unknown", "message": "Checkout not completed."}
try:
# Chiama il comando specifico in GitCommands (che ha check=False)
checkout_result = self.git_commands.checkout_new_branch_from_remote(
working_directory=repo_path,
new_local_branch_name=new_local_branch_name,
remote_tracking_branch_full_name=remote_tracking_branch_full_name,
)
# Analizza il risultato
if checkout_result.returncode == 0:
result_info["status"] = "success"
result_info["message"] = (
f"Successfully checked out '{remote_tracking_branch_full_name}' as new local branch '{new_local_branch_name}'."
)
log_handler.log_info(result_info["message"], func_name=func_name)
# L'output di checkout -b di solito è su stderr, loggato da log_and_execute
else:
# Errore
result_info["status"] = "error"
stderr_full = checkout_result.stderr if checkout_result.stderr else ""
stderr_lower = stderr_full.lower()
log_handler.log_error(
f"Checkout tracking branch command failed (RC={checkout_result.returncode}). Stderr: {stderr_lower}",
func_name=func_name,
)
# Controlla errori specifici
if (
f"a branch named '{new_local_branch_name}' already exists"
in stderr_lower
):
result_info["message"] = (
f"Checkout failed: A local branch named '{new_local_branch_name}' already exists."
)
elif (
"invalid object name" in stderr_lower
or "not a valid object name" in stderr_lower
):
result_info["message"] = (
f"Checkout failed: Remote branch '{remote_tracking_branch_full_name}' not found or invalid."
)
# Aggiungere altri check se necessario (es. pathspec did not match)
else:
result_info["message"] = (
f"Checkout tracking branch failed (RC={checkout_result.returncode}). Check logs."
)
result_info["exception"] = GitCommandError(
result_info["message"], stderr=checkout_result.stderr
)
except (GitCommandError, ValueError) as e:
# Errore dalla validazione o dal controllo stato
log_handler.log_error(
f"Error during checkout tracking branch setup: {e}", func_name=func_name
)
result_info = {
"status": "error",
"message": f"Checkout failed: {e}",
"exception": e,
}
except Exception as e:
# Errore imprevisto
log_handler.log_exception(
f"Unexpected error during checkout tracking branch: {e}",
func_name=func_name,
)
result_info = {
"status": "error",
"message": f"Unexpected checkout error: {type(e).__name__}",
"exception": e,
}
return result_info
def execute_delete_local_branch(self, repo_path: str, branch_name: str, force: bool = False) -> dict:
"""
Deletes a local branch, with safety checks.
Args:
repo_path (str): Path to the local repository.
branch_name (str): The name of the local branch to delete.
force (bool): If True, force delete even if unmerged.
Returns:
dict: A dictionary containing the status ('success', 'error'), message,
and optionally an exception.
Raises:
ValueError: If input arguments are invalid or attempting to delete current branch.
# GitCommandError is handled and returned in the dictionary
"""
func_name = "execute_delete_local_branch"
action_type = "Force deleting" if force else "Deleting"
log_handler.log_info(
f"Executing: {action_type} local branch '{branch_name}' in '{repo_path}'",
func_name=func_name
)
# --- Input Validation & Safety Checks ---
if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'")
if not branch_name or branch_name.isspace(): raise ValueError("Branch name cannot be empty.")
if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
# **Safety Check 1: Non cancellare il branch corrente**
try:
current_branch = self.git_commands.get_current_branch_name(repo_path)
if current_branch == branch_name:
msg = f"Cannot delete the currently checked-out branch ('{branch_name}'). Switch to another branch first."
log_handler.log_error(msg, func_name=func_name)
# Solleviamo ValueError qui perché è un errore di input logico, non un errore Git
raise ValueError(msg)
except GitCommandError as e:
# Se fallisce il controllo del branch corrente, non procedere
msg = f"Delete aborted: Could not determine current branch to prevent self-deletion: {e}"
log_handler.log_error(msg, func_name=func_name)
return {'status': 'error', 'message': msg, 'exception': e}
except ValueError as e: # Cattura l'errore sollevato sopra
raise e # Rilancia il ValueError per il chiamante
# **Safety Check 2 (Opzionale ma consigliato): Non cancellare master/main facilmente**
# Potremmo aggiungere un controllo extra qui se branch_name è 'master' o 'main'
# e richiedere 'force=True' anche se tecnicamente è mergiato.
# if branch_name in ['master', 'main'] and not force:
# raise ValueError(f"Deleting '{branch_name}' requires force option for safety. Use 'Force Delete' if intended.")
# --- Esecuzione Comando Git ---
result_info = {'status': 'unknown', 'message': 'Delete branch not completed.'}
try:
# Chiama il comando specifico in GitCommands (che ha check=False)
delete_result = self.git_commands.delete_local_branch(
working_directory=repo_path,
branch_name=branch_name,
force=force
)
# Analizza il risultato
if delete_result.returncode == 0:
result_info['status'] = 'success'
result_info['message'] = f"Local branch '{branch_name}' deleted successfully."
log_handler.log_info(result_info['message'], func_name=func_name)
# L'output di 'git branch -d/-D' di solito è su stderr
if delete_result.stderr:
log_handler.log_info(f"Delete output: {delete_result.stderr.strip()}", func_name=func_name)
else:
# Errore
result_info['status'] = 'error'
stderr_full = delete_result.stderr if delete_result.stderr else ""
stderr_lower = stderr_full.lower()
log_handler.log_error(f"Delete branch command failed (RC={delete_result.returncode}). Stderr: {stderr_lower}", func_name=func_name)
# Controlla errori specifici
if f"branch '{branch_name}' not found" in stderr_lower:
result_info['message'] = f"Delete failed: Branch '{branch_name}' not found."
elif "not fully merged" in stderr_lower:
# Questo errore avviene solo se force=False
result_info['message'] = f"Delete failed: Branch '{branch_name}' has unmerged changes. Use 'Force Delete' to discard them."
# Aggiungere altri check se necessario
else:
result_info['message'] = f"Delete branch '{branch_name}' failed (RC={delete_result.returncode}). Check logs."
result_info['exception'] = GitCommandError(result_info['message'], stderr=delete_result.stderr)
except (GitCommandError, ValueError) as e:
# Errore dalla validazione iniziale o dal check del branch corrente
log_handler.log_error(f"Error during delete branch setup for '{branch_name}': {e}", func_name=func_name)
# Se è ValueError (es. cancellazione branch corrente), lo rilanciamo per gestirlo nel worker/controller
if isinstance(e, ValueError):
raise e
result_info = {'status': 'error', 'message': f"Delete failed: {e}", 'exception': e}
except Exception as e:
# Errore imprevisto
log_handler.log_exception(f"Unexpected error deleting local branch '{branch_name}': {e}", func_name=func_name)
result_info = {'status': 'error', 'message': f"Unexpected delete error: {type(e).__name__}", 'exception': e}
return result_info
# --- END OF FILE action_handler.py ---