832 lines
36 KiB
Python
832 lines
36 KiB
Python
# --- START OF FILE action_handler.py ---
|
|
|
|
# action_handler.py
|
|
# Rimosso import logging standard
|
|
import os
|
|
import datetime
|
|
|
|
# Importa il nuovo gestore della coda log
|
|
import log_handler
|
|
|
|
# Importa le dipendenze necessarie
|
|
from git_commands import GitCommands, GitCommandError
|
|
from backup_handler import BackupHandler
|
|
|
|
|
|
class ActionHandler:
|
|
"""
|
|
Handles the execution logic for core application actions.
|
|
Orchestrates calls to GitCommands and BackupHandler.
|
|
Methods are executed synchronously but are intended to be called
|
|
from worker threads managed by the main application controller.
|
|
Uses log_handler to send log messages to a central queue.
|
|
"""
|
|
|
|
# Rimosso logger dal costruttore
|
|
def __init__(self, git_commands: GitCommands, backup_handler: BackupHandler):
|
|
"""
|
|
Initializes the ActionHandler.
|
|
|
|
Args:
|
|
git_commands (GitCommands): Instance for executing Git commands.
|
|
backup_handler (BackupHandler): Instance for handling backups.
|
|
|
|
Raises:
|
|
TypeError: If provided arguments are not of the expected types.
|
|
"""
|
|
# Validazione tipi input
|
|
if not isinstance(git_commands, GitCommands):
|
|
raise TypeError("ActionHandler requires a GitCommands instance.")
|
|
if not isinstance(backup_handler, BackupHandler):
|
|
raise TypeError("ActionHandler requires a BackupHandler instance.")
|
|
|
|
self.git_commands = git_commands
|
|
self.backup_handler = backup_handler
|
|
# Log tramite il nuovo handler
|
|
log_handler.log_debug("ActionHandler initialized.", func_name="__init__")
|
|
|
|
def _perform_backup_if_enabled(
|
|
self,
|
|
repo_path: str,
|
|
profile_name: str,
|
|
autobackup_enabled: bool,
|
|
backup_base_dir: str,
|
|
excluded_extensions: set,
|
|
excluded_dirs: set,
|
|
):
|
|
"""
|
|
Performs backup using BackupHandler if enabled. Uses log_handler.
|
|
|
|
Args:
|
|
repo_path (str): Path to the repository.
|
|
profile_name (str): Name of the profile (for backup naming).
|
|
autobackup_enabled (bool): Flag from settings.
|
|
backup_base_dir (str): Destination directory for backups.
|
|
excluded_extensions (set): File extensions to exclude (lowercase).
|
|
excluded_dirs (set): Directory base names to exclude (lowercase).
|
|
|
|
Returns:
|
|
str | None: Path to created backup file, or None if disabled/failed/empty.
|
|
|
|
Raises:
|
|
IOError: If the backup process fails critically.
|
|
ValueError: If input arguments are invalid.
|
|
"""
|
|
func_name = "_perform_backup_if_enabled" # Nome funzione per log
|
|
if not autobackup_enabled:
|
|
log_handler.log_debug(
|
|
f"Autobackup disabled for '{profile_name}', skipping.",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
|
|
log_handler.log_info(
|
|
f"Autobackup enabled for '{profile_name}'. Starting backup...",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(f" Source: {repo_path}", func_name=func_name)
|
|
log_handler.log_debug(f" Destination: {backup_base_dir}", func_name=func_name)
|
|
log_handler.log_debug(
|
|
f" Exclude Exts: {excluded_extensions}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(f" Exclude Dirs: {excluded_dirs}", func_name=func_name)
|
|
|
|
# Validation
|
|
if not repo_path or not os.path.isdir(repo_path):
|
|
raise ValueError(f"Invalid repo_path for backup: '{repo_path}'")
|
|
if not backup_base_dir:
|
|
raise ValueError("backup_base_dir cannot be empty.")
|
|
if not isinstance(excluded_extensions, set):
|
|
raise TypeError("excluded_extensions must be a set.")
|
|
if not isinstance(excluded_dirs, set):
|
|
raise TypeError("excluded_dirs must be a set.")
|
|
|
|
# Delegate to BackupHandler
|
|
try:
|
|
backup_path = self.backup_handler.create_zip_backup(
|
|
source_repo_path=repo_path,
|
|
backup_base_dir=backup_base_dir,
|
|
profile_name=profile_name,
|
|
excluded_extensions=excluded_extensions,
|
|
excluded_dirs_base=excluded_dirs,
|
|
)
|
|
if backup_path:
|
|
log_handler.log_info(
|
|
f"Backup completed successfully: {backup_path}", func_name=func_name
|
|
)
|
|
else:
|
|
log_handler.log_warning(
|
|
"Backup finished but no file generated.", func_name=func_name
|
|
)
|
|
return backup_path
|
|
|
|
except (IOError, ValueError, PermissionError) as backup_e:
|
|
# Usa log_error invece di logger.error
|
|
log_handler.log_error(
|
|
f"Backup failed for '{profile_name}': {backup_e}", func_name=func_name
|
|
)
|
|
raise IOError(
|
|
f"Autobackup failed for '{profile_name}': {backup_e}"
|
|
) from backup_e
|
|
except Exception as unexpected_e:
|
|
# Usa log_exception per errori imprevisti (include traceback su console)
|
|
log_handler.log_exception(
|
|
f"Unexpected error during backup for '{profile_name}': {unexpected_e}",
|
|
func_name=func_name,
|
|
)
|
|
raise IOError(
|
|
f"Unexpected autobackup failure: {unexpected_e}"
|
|
) from unexpected_e
|
|
|
|
def execute_prepare_repo(self, repo_path: str):
|
|
"""Executes 'prepare repository' action. Uses log_handler."""
|
|
func_name = "execute_prepare_repo"
|
|
log_handler.log_info(
|
|
f"Executing preparation for repository: {repo_path}", func_name=func_name
|
|
)
|
|
if not repo_path or not os.path.isdir(repo_path):
|
|
raise ValueError(f"Invalid directory for preparation: '{repo_path}'")
|
|
|
|
git_dir = os.path.join(repo_path, ".git")
|
|
if os.path.exists(git_dir):
|
|
log_handler.log_warning(
|
|
f"Repo at '{repo_path}' already prepared.", func_name=func_name
|
|
)
|
|
raise ValueError("Repository is already prepared.")
|
|
|
|
try:
|
|
self.git_commands.prepare_svn_for_git(repo_path)
|
|
log_handler.log_info(
|
|
f"Repository prepared successfully: {repo_path}", func_name=func_name
|
|
)
|
|
return True
|
|
except (GitCommandError, ValueError, IOError) as e:
|
|
log_handler.log_error(
|
|
f"Failed to prepare repo '{repo_path}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error preparing repo '{repo_path}': {e}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception(f"Unexpected preparation error in '{repo_path}'") from e
|
|
|
|
def execute_create_bundle(
|
|
self,
|
|
repo_path: str,
|
|
bundle_full_path: str,
|
|
profile_name: str,
|
|
autobackup_enabled: bool,
|
|
backup_base_dir: str,
|
|
autocommit_enabled: bool,
|
|
commit_message: str,
|
|
excluded_extensions: set,
|
|
excluded_dirs: set,
|
|
):
|
|
"""Executes 'create bundle'. Uses log_handler."""
|
|
func_name = "execute_create_bundle"
|
|
log_handler.log_info(
|
|
f"Executing 'Create Bundle' for profile '{profile_name}'...",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(f" Repo: {repo_path}", func_name=func_name)
|
|
log_handler.log_debug(f" Bundle: {bundle_full_path}", func_name=func_name)
|
|
log_handler.log_debug(
|
|
f" Autobackup: {autobackup_enabled}, Backup Dir: {backup_base_dir}",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f" Autocommit: {autocommit_enabled}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Backup Exclude Exts: {excluded_extensions}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(
|
|
f" Backup Exclude Dirs: {excluded_dirs}", func_name=func_name
|
|
)
|
|
|
|
# --- 1. Backup Step ---
|
|
# (Chiama _perform_backup_if_enabled che ora usa log_handler)
|
|
self._perform_backup_if_enabled(
|
|
repo_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
|
|
# --- 2. Autocommit Step ---
|
|
if autocommit_enabled:
|
|
log_handler.log_info(
|
|
"Autocommit enabled. Checking for changes...", func_name=func_name
|
|
)
|
|
try:
|
|
has_changes = self.git_commands.git_status_has_changes(repo_path)
|
|
if has_changes:
|
|
log_handler.log_info(
|
|
"Changes detected, performing autocommit...",
|
|
func_name=func_name,
|
|
)
|
|
commit_msg_to_use = commit_message.strip()
|
|
if not commit_msg_to_use:
|
|
now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
commit_msg_to_use = (
|
|
f"Autocommit '{profile_name}' before bundle - {now_str}"
|
|
)
|
|
log_handler.log_debug(
|
|
f"Using autocommit message: '{commit_msg_to_use}'",
|
|
func_name=func_name,
|
|
)
|
|
commit_made = self.git_commands.git_commit(
|
|
repo_path, commit_msg_to_use
|
|
)
|
|
if commit_made:
|
|
log_handler.log_info(
|
|
"Autocommit successful.", func_name=func_name
|
|
)
|
|
else:
|
|
log_handler.log_warning(
|
|
"Autocommit attempted, but no effective changes committed.",
|
|
func_name=func_name,
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
"No changes detected, skipping autocommit.", func_name=func_name
|
|
)
|
|
except (GitCommandError, ValueError) as commit_e:
|
|
log_handler.log_error(
|
|
f"Autocommit failed for '{profile_name}': {commit_e}",
|
|
func_name=func_name,
|
|
)
|
|
raise commit_e
|
|
except Exception as commit_e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during autocommit for '{profile_name}': {commit_e}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception("Unexpected autocommit error") from commit_e
|
|
else:
|
|
log_handler.log_debug(
|
|
"Autocommit disabled, skipping commit step.", func_name=func_name
|
|
)
|
|
|
|
# --- 3. Create Bundle Step ---
|
|
log_handler.log_info(
|
|
f"Creating bundle file: {bundle_full_path}", func_name=func_name
|
|
)
|
|
try:
|
|
self.git_commands.create_git_bundle(repo_path, bundle_full_path)
|
|
bundle_exists = os.path.exists(bundle_full_path)
|
|
bundle_not_empty = bundle_exists and os.path.getsize(bundle_full_path) > 0
|
|
if bundle_exists and bundle_not_empty:
|
|
log_handler.log_info(
|
|
"Git bundle created successfully.", func_name=func_name
|
|
)
|
|
return bundle_full_path
|
|
else:
|
|
log_handler.log_warning(
|
|
f"Bundle file '{bundle_full_path}' not created or empty.",
|
|
func_name=func_name,
|
|
)
|
|
if bundle_exists and not bundle_not_empty:
|
|
try:
|
|
os.remove(bundle_full_path)
|
|
log_handler.log_info(
|
|
f"Removed empty bundle: {bundle_full_path}",
|
|
func_name=func_name,
|
|
)
|
|
except OSError as rm_err:
|
|
log_handler.log_warning(
|
|
f"Could not remove empty bundle: {rm_err}",
|
|
func_name=func_name,
|
|
)
|
|
return None
|
|
except (GitCommandError, ValueError) as bundle_e:
|
|
log_handler.log_error(
|
|
f"Bundle creation failed for '{profile_name}': {bundle_e}",
|
|
func_name=func_name,
|
|
)
|
|
raise bundle_e
|
|
except Exception as bundle_e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error during bundle creation for '{profile_name}': {bundle_e}",
|
|
func_name=func_name,
|
|
)
|
|
raise Exception("Unexpected bundle creation error") from bundle_e
|
|
|
|
def execute_fetch_bundle(
|
|
self,
|
|
target_repo_path_str: str,
|
|
bundle_full_path: str,
|
|
profile_name: str,
|
|
autobackup_enabled: bool,
|
|
backup_base_dir: str,
|
|
excluded_extensions: set,
|
|
excluded_dirs: set,
|
|
):
|
|
"""Executes 'fetch bundle' (clone or fetch/merge). Uses log_handler."""
|
|
func_name = "execute_fetch_bundle"
|
|
log_handler.log_info(
|
|
f"Executing 'Fetch from Bundle' for profile '{profile_name}'...",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f" Target Dir: {target_repo_path_str}", func_name=func_name
|
|
)
|
|
log_handler.log_debug(f" Bundle File: {bundle_full_path}", func_name=func_name)
|
|
log_handler.log_debug(
|
|
f" Autobackup: {autobackup_enabled}", func_name=func_name
|
|
)
|
|
# ... (altri log debug) ...
|
|
|
|
if not os.path.isfile(bundle_full_path):
|
|
error_msg = f"Bundle file not found: '{bundle_full_path}'"
|
|
log_handler.log_error(error_msg, func_name=func_name)
|
|
raise FileNotFoundError(error_msg)
|
|
|
|
git_dir_path = os.path.join(target_repo_path_str, ".git")
|
|
is_existing_repo = os.path.isdir(target_repo_path_str) and os.path.exists(
|
|
git_dir_path
|
|
)
|
|
log_handler.log_debug(
|
|
f"Target is existing repo: {is_existing_repo}", func_name=func_name
|
|
)
|
|
|
|
if is_existing_repo:
|
|
log_handler.log_info(
|
|
"Target is existing repo. Fetching/Merging...", func_name=func_name
|
|
)
|
|
# --- 1. Backup Step ---
|
|
self._perform_backup_if_enabled(
|
|
target_repo_path_str,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
# --- 2. Fetch/Merge Step ---
|
|
log_handler.log_info(
|
|
f"Fetching into '{target_repo_path_str}'...", func_name=func_name
|
|
)
|
|
try:
|
|
self.git_commands.fetch_from_git_bundle(
|
|
target_repo_path_str, bundle_full_path
|
|
)
|
|
log_handler.log_info(
|
|
"Fetch/merge process completed successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except GitCommandError as fetch_e:
|
|
log_handler.log_error(
|
|
f"Fetch/merge failed: {fetch_e}", func_name=func_name
|
|
)
|
|
raise fetch_e # Re-raise original error
|
|
except Exception as fetch_e:
|
|
log_handler.log_exception(
|
|
f"Unexpected fetch/merge error: {fetch_e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected fetch/merge error") from fetch_e
|
|
else:
|
|
log_handler.log_info(
|
|
f"Target '{target_repo_path_str}' not repo. Cloning...",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
success = self.git_commands.clone_from_bundle(
|
|
bundle_full_path, target_repo_path_str
|
|
)
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Cloned successfully into '{target_repo_path_str}'.",
|
|
func_name=func_name,
|
|
)
|
|
return True
|
|
else:
|
|
log_handler.log_error(
|
|
"Cloning reported failure unexpectedly.", func_name=func_name
|
|
)
|
|
raise Exception(
|
|
f"Cloning into '{target_repo_path_str}' failed unexpectedly."
|
|
)
|
|
except (GitCommandError, ValueError, FileNotFoundError, IOError) as clone_e:
|
|
log_handler.log_error(f"Cloning failed: {clone_e}", func_name=func_name)
|
|
raise clone_e # Re-raise original error
|
|
except Exception as clone_e:
|
|
log_handler.log_exception(
|
|
f"Unexpected clone error: {clone_e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected clone error") from clone_e
|
|
|
|
# --- Altri Metodi (commit, tag, branch, untrack) ---
|
|
# (Modificare tutte le chiamate a self.logger.* in log_handler.log_* )
|
|
|
|
def execute_manual_commit(self, repo_path: str, commit_message: str):
|
|
func_name = "execute_manual_commit"
|
|
log_handler.log_info(
|
|
f"Executing manual commit for: {repo_path}", func_name=func_name
|
|
)
|
|
if not commit_message or commit_message.isspace():
|
|
raise ValueError("Commit message cannot be empty.")
|
|
try:
|
|
commit_made = self.git_commands.git_commit(repo_path, commit_message)
|
|
if commit_made:
|
|
log_handler.log_info("Manual commit successful.", func_name=func_name)
|
|
else:
|
|
log_handler.log_info(
|
|
"Manual commit: No changes available.", func_name=func_name
|
|
)
|
|
return commit_made
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(f"Manual commit failed: {e}", func_name=func_name)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected manual commit error: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected commit error") from e
|
|
|
|
def execute_create_tag(
|
|
self, repo_path: str, ignored: None, tag_name: str, tag_message: str
|
|
):
|
|
func_name = "execute_create_tag"
|
|
if not tag_name or tag_name.isspace():
|
|
raise ValueError("Tag name cannot be empty.")
|
|
if not tag_message or tag_message.isspace():
|
|
raise ValueError("Tag message cannot be empty.")
|
|
log_handler.log_info(
|
|
f"Executing create tag '{tag_name}' for: {repo_path}", func_name=func_name
|
|
)
|
|
log_handler.log_info(
|
|
"Attempting pre-tag commit using tag message...", func_name=func_name
|
|
)
|
|
try:
|
|
commit_made = self.git_commands.git_commit(repo_path, tag_message)
|
|
if commit_made:
|
|
log_handler.log_info(
|
|
f"Pre-tag commit successful: '{tag_message}'", func_name=func_name
|
|
)
|
|
else:
|
|
log_handler.log_info(
|
|
"No changes for pre-tag commit.", func_name=func_name
|
|
)
|
|
log_handler.log_info(
|
|
f"Creating annotated tag '{tag_name}'...", func_name=func_name
|
|
)
|
|
self.git_commands.create_tag(repo_path, tag_name, tag_message)
|
|
log_handler.log_info(
|
|
f"Tag '{tag_name}' created successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed commit or create tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected error creating tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise Exception(f"Unexpected error creating tag '{tag_name}'") from e
|
|
|
|
def execute_checkout_tag(self, repo_path: str, tag_name: str):
|
|
func_name = "execute_checkout_tag"
|
|
if not tag_name:
|
|
raise ValueError("Tag name required.")
|
|
log_handler.log_info(
|
|
f"Executing checkout tag '{tag_name}' in: {repo_path}", func_name=func_name
|
|
)
|
|
try: # Check changes
|
|
if self.git_commands.git_status_has_changes(repo_path):
|
|
raise ValueError("Uncommitted changes exist. Commit or stash first.")
|
|
log_handler.log_debug("No uncommitted changes found.", func_name=func_name)
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Status check error before checkout: {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected status check error: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected status check error") from e
|
|
try: # Checkout
|
|
success = self.git_commands.checkout_tag(repo_path, tag_name)
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Tag '{tag_name}' checked out (Detached HEAD).",
|
|
func_name=func_name,
|
|
)
|
|
return True
|
|
else:
|
|
raise GitCommandError("Checkout failed for unknown reason.")
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed checkout tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected checkout error: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected tag checkout error") from e
|
|
|
|
def execute_create_branch(self, repo_path: str, branch_name: str):
|
|
func_name = "execute_create_branch"
|
|
log_handler.log_info(
|
|
f"Executing create branch '{branch_name}' in: {repo_path}",
|
|
func_name=func_name,
|
|
)
|
|
if not branch_name:
|
|
raise ValueError("Branch name cannot be empty.")
|
|
try:
|
|
success = self.git_commands.create_branch(repo_path, branch_name)
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Branch '{branch_name}' created successfully.", func_name=func_name
|
|
)
|
|
return True
|
|
else:
|
|
raise GitCommandError(f"Branch creation '{branch_name}' failed.")
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(f"Failed create branch: {e}", func_name=func_name)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected create branch error: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected branch creation error") from e
|
|
|
|
def execute_switch_branch(self, repo_path: str, branch_name: str):
|
|
func_name = "execute_switch_branch"
|
|
if not branch_name:
|
|
raise ValueError("Branch name required.")
|
|
log_handler.log_info(
|
|
f"Executing switch to branch '{branch_name}'.", func_name=func_name
|
|
)
|
|
try: # Check changes
|
|
if self.git_commands.git_status_has_changes(repo_path):
|
|
raise ValueError("Uncommitted changes exist. Commit or stash first.")
|
|
log_handler.log_debug("No uncommitted changes found.", func_name=func_name)
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Status check error before switch: {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected status check error: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected status check error") from e
|
|
try: # Switch
|
|
success = self.git_commands.checkout_branch(repo_path, branch_name)
|
|
if success:
|
|
log_handler.log_info(
|
|
f"Switched to branch '{branch_name}'.", func_name=func_name
|
|
)
|
|
return True
|
|
else:
|
|
raise GitCommandError(f"Switch to branch '{branch_name}' failed.")
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed switch branch '{branch_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected switch branch error: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected branch switch error") from e
|
|
|
|
def execute_delete_branch(self, repo_path: str, branch_name: str, force=False):
|
|
func_name = "execute_delete_branch"
|
|
if not branch_name:
|
|
raise ValueError("Branch name required.")
|
|
log_handler.log_info(
|
|
f"Executing delete branch '{branch_name}' (force={force}).",
|
|
func_name=func_name,
|
|
)
|
|
try:
|
|
# success = self.git_commands.delete_branch(repo_path, branch_name, force)
|
|
# return success
|
|
log_handler.log_warning(
|
|
"execute_delete_branch not fully implemented.", func_name=func_name
|
|
)
|
|
raise NotImplementedError("Branch deletion logic needs implementation.")
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed delete branch '{branch_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Failed delete branch '{branch_name}': {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected branch deletion error") from e
|
|
|
|
def execute_delete_tag(self, repo_path: str, tag_name: str):
|
|
func_name = "execute_delete_tag"
|
|
if not tag_name:
|
|
raise ValueError("Tag name required.")
|
|
log_handler.log_info(
|
|
f"Executing delete tag '{tag_name}' in: {repo_path}", func_name=func_name
|
|
)
|
|
try:
|
|
# success = self.git_commands.delete_tag(repo_path, tag_name)
|
|
# return success
|
|
log_handler.log_warning(
|
|
"execute_delete_tag not fully implemented.", func_name=func_name
|
|
)
|
|
raise NotImplementedError("Tag deletion logic needs implementation.")
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(
|
|
f"Failed delete tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise e
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Failed delete tag '{tag_name}': {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected tag deletion error") from e
|
|
|
|
def execute_untrack_files_from_gitignore(self, repo_path: str):
|
|
func_name = "execute_untrack_files_from_gitignore"
|
|
log_handler.log_info(
|
|
f"Checking for tracked files to untrack in '{repo_path}'...",
|
|
func_name=func_name,
|
|
)
|
|
rules_to_files_map = {}
|
|
all_files_to_untrack = []
|
|
try:
|
|
tracked_files = self.git_commands.get_tracked_files(repo_path)
|
|
log_handler.log_debug(
|
|
f"Checking {len(tracked_files)} tracked files...", func_name=func_name
|
|
)
|
|
for file_path in tracked_files:
|
|
norm_file_path = os.path.normpath(file_path)
|
|
if norm_file_path == ".gitignore":
|
|
continue
|
|
matching_rule = self.git_commands.get_matching_gitignore_rule(
|
|
repo_path, file_path
|
|
)
|
|
if matching_rule is not None:
|
|
log_handler.log_info(
|
|
f"Tracked '{file_path}' matches rule: '{matching_rule}'",
|
|
func_name=func_name,
|
|
)
|
|
if matching_rule not in rules_to_files_map:
|
|
rules_to_files_map[matching_rule] = []
|
|
rules_to_files_map[matching_rule].append(file_path)
|
|
all_files_to_untrack.append(file_path)
|
|
|
|
if all_files_to_untrack:
|
|
num_untrack = len(all_files_to_untrack)
|
|
log_handler.log_info(
|
|
f"Found {num_untrack} tracked files to untrack.",
|
|
func_name=func_name,
|
|
)
|
|
self.git_commands.remove_from_tracking(repo_path, all_files_to_untrack)
|
|
log_handler.log_info(
|
|
f"Removed {num_untrack} items from tracking.", func_name=func_name
|
|
)
|
|
commit_message = "Chore: Stop tracking files based on .gitignore update.\n\nSummary:\n"
|
|
sorted_rules = sorted(rules_to_files_map.keys())
|
|
for rule in sorted_rules:
|
|
count = len(rules_to_files_map[rule])
|
|
plural = "s" if count > 1 else ""
|
|
commit_message += (
|
|
f'- Rule "{rule}" untracked {count} file{plural}.\n'
|
|
)
|
|
log_handler.log_info(
|
|
"Creating automatic commit for untracking changes.",
|
|
func_name=func_name,
|
|
)
|
|
log_handler.log_debug(
|
|
f"Commit message:\n{commit_message}", func_name=func_name
|
|
)
|
|
commit_made = self.git_commands.git_commit(repo_path, commit_message)
|
|
if commit_made:
|
|
log_handler.log_info(
|
|
"Automatic untrack commit successful.", func_name=func_name
|
|
)
|
|
return True
|
|
else:
|
|
log_handler.log_warning(
|
|
"Untracking done, but commit reported no changes.",
|
|
func_name=func_name,
|
|
)
|
|
return False
|
|
else:
|
|
log_handler.log_info(
|
|
"No tracked files match .gitignore rules. No action.",
|
|
func_name=func_name,
|
|
)
|
|
return False
|
|
except (GitCommandError, ValueError) as e:
|
|
log_handler.log_error(f"Error during untracking: {e}", func_name=func_name)
|
|
raise
|
|
except Exception as e:
|
|
log_handler.log_exception(
|
|
f"Unexpected untracking error: {e}", func_name=func_name
|
|
)
|
|
raise Exception("Unexpected untracking error") from e
|
|
|
|
def execute_checkout_tracking_branch(
|
|
self,
|
|
repo_path: str,
|
|
new_local_branch_name: str,
|
|
remote_tracking_branch_full_name: str # Es. 'origin/main'
|
|
) -> dict:
|
|
"""
|
|
Checks out a remote branch as a new local tracking branch.
|
|
Handles the case where the local branch name might already exist (should be checked by caller).
|
|
|
|
Args:
|
|
repo_path (str): Path to the local repository.
|
|
new_local_branch_name (str): The name for the new local branch.
|
|
remote_tracking_branch_full_name (str): The full name of the remote-tracking branch.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the status ('success', 'error'), message,
|
|
and optionally an exception.
|
|
|
|
Raises:
|
|
ValueError: If input arguments are invalid.
|
|
# GitCommandError is handled and returned in the dictionary
|
|
"""
|
|
func_name = "execute_checkout_tracking_branch"
|
|
log_handler.log_info(
|
|
f"Executing checkout tracking branch: Local='{new_local_branch_name}', "
|
|
f"Remote='{remote_tracking_branch_full_name}' in '{repo_path}'",
|
|
func_name=func_name
|
|
)
|
|
|
|
# --- Input Validation ---
|
|
if not repo_path or not os.path.isdir(repo_path): raise ValueError(f"Invalid repository path: '{repo_path}'")
|
|
if not new_local_branch_name or new_local_branch_name.isspace(): raise ValueError("New local branch name cannot be empty.")
|
|
if not remote_tracking_branch_full_name or '/' not in remote_tracking_branch_full_name: raise ValueError(f"Invalid remote tracking branch name format: '{remote_tracking_branch_full_name}' (expecting 'remote/branch')")
|
|
if not os.path.exists(os.path.join(repo_path, ".git")): raise ValueError(f"Directory '{repo_path}' is not a Git repository.")
|
|
|
|
# --- Pre-check: Uncommitted changes? ---
|
|
# È buona pratica impedire checkout se ci sono modifiche non salvate
|
|
try:
|
|
if self.git_commands.git_status_has_changes(repo_path):
|
|
msg = f"Checkout aborted: Uncommitted changes detected. Please commit or stash first before checking out '{new_local_branch_name}'."
|
|
log_handler.log_warning(msg, func_name=func_name)
|
|
return {'status': 'error', 'message': msg, 'exception': ValueError(msg)}
|
|
except GitCommandError as status_err:
|
|
msg = f"Checkout aborted: Failed to check repository status: {status_err}"
|
|
log_handler.log_error(msg, func_name=func_name)
|
|
return {'status': 'error', 'message': msg, 'exception': status_err}
|
|
|
|
# --- Esecuzione Comando Git ---
|
|
result_info = {'status': 'unknown', 'message': 'Checkout not completed.'}
|
|
try:
|
|
# Chiama il comando specifico in GitCommands (che ha check=False)
|
|
checkout_result = self.git_commands.checkout_new_branch_from_remote(
|
|
working_directory=repo_path,
|
|
new_local_branch_name=new_local_branch_name,
|
|
remote_tracking_branch_full_name=remote_tracking_branch_full_name
|
|
)
|
|
|
|
# Analizza il risultato
|
|
if checkout_result.returncode == 0:
|
|
result_info['status'] = 'success'
|
|
result_info['message'] = f"Successfully checked out '{remote_tracking_branch_full_name}' as new local branch '{new_local_branch_name}'."
|
|
log_handler.log_info(result_info['message'], func_name=func_name)
|
|
# L'output di checkout -b di solito è su stderr, loggato da log_and_execute
|
|
else:
|
|
# Errore
|
|
result_info['status'] = 'error'
|
|
stderr_full = checkout_result.stderr if checkout_result.stderr else ""
|
|
stderr_lower = stderr_full.lower()
|
|
log_handler.log_error(f"Checkout tracking branch command failed (RC={checkout_result.returncode}). Stderr: {stderr_lower}", func_name=func_name)
|
|
|
|
# Controlla errori specifici
|
|
if f"a branch named '{new_local_branch_name}' already exists" in stderr_lower:
|
|
result_info['message'] = f"Checkout failed: A local branch named '{new_local_branch_name}' already exists."
|
|
elif "invalid object name" in stderr_lower or "not a valid object name" in stderr_lower:
|
|
result_info['message'] = f"Checkout failed: Remote branch '{remote_tracking_branch_full_name}' not found or invalid."
|
|
# Aggiungere altri check se necessario (es. pathspec did not match)
|
|
else:
|
|
result_info['message'] = f"Checkout tracking branch failed (RC={checkout_result.returncode}). Check logs."
|
|
|
|
result_info['exception'] = GitCommandError(result_info['message'], stderr=checkout_result.stderr)
|
|
|
|
except (GitCommandError, ValueError) as e:
|
|
# Errore dalla validazione o dal controllo stato
|
|
log_handler.log_error(f"Error during checkout tracking branch setup: {e}", func_name=func_name)
|
|
result_info = {'status': 'error', 'message': f"Checkout failed: {e}", 'exception': e}
|
|
|
|
except Exception as e:
|
|
# Errore imprevisto
|
|
log_handler.log_exception(f"Unexpected error during checkout tracking branch: {e}", func_name=func_name)
|
|
result_info = {'status': 'error', 'message': f"Unexpected checkout error: {type(e).__name__}", 'exception': e}
|
|
|
|
return result_info
|
|
|
|
|
|
# --- END OF FILE action_handler.py ---
|