498 lines
20 KiB
Python
498 lines
20 KiB
Python
# action_handler.py
|
|
import logging
|
|
import os
|
|
|
|
# Import dependencies
|
|
from git_commands import GitCommands, GitCommandError
|
|
from backup_handler import BackupHandler
|
|
|
|
|
|
class ActionHandler:
|
|
"""Handles the execution logic for core application actions."""
|
|
|
|
def __init__(
|
|
self, logger, git_commands: GitCommands, backup_handler: BackupHandler
|
|
):
|
|
"""
|
|
Initializes the ActionHandler.
|
|
|
|
Args:
|
|
logger (logging.Logger): Logger instance.
|
|
git_commands (GitCommands): Instance for executing Git commands.
|
|
backup_handler (BackupHandler): Instance for handling backups.
|
|
"""
|
|
self.logger = logger
|
|
self.git_commands = git_commands
|
|
self.backup_handler = backup_handler
|
|
|
|
def _perform_backup_if_enabled(
|
|
self,
|
|
svn_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
):
|
|
"""
|
|
Performs backup if enabled. Raises IOError on backup failure.
|
|
|
|
Args:
|
|
svn_path (str): Path to the repository.
|
|
profile_name (str): Name of the profile (for backup naming).
|
|
autobackup_enabled (bool): Flag from settings.
|
|
backup_base_dir (str): Destination directory for backups.
|
|
excluded_extensions (set): File extensions to exclude.
|
|
excluded_dirs (set): Directory names to exclude.
|
|
|
|
Raises:
|
|
IOError: If the backup process fails for any reason.
|
|
"""
|
|
if not autobackup_enabled:
|
|
self.logger.debug("Autobackup disabled, skipping backup.")
|
|
return # Backup not needed, proceed successfully
|
|
|
|
self.logger.info("Autobackup enabled. Starting backup...")
|
|
try:
|
|
# Delegate backup creation to the BackupHandler instance
|
|
# It will raise exceptions on failure
|
|
backup_path = self.backup_handler.create_zip_backup(
|
|
svn_path,
|
|
backup_base_dir,
|
|
profile_name,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
# Log success if backup handler doesn't raise an error
|
|
self.logger.info(f"Backup completed successfully: {backup_path}")
|
|
# No explicit return value needed on success
|
|
|
|
except Exception as backup_e:
|
|
# Log error and re-raise as IOError to signal critical failure
|
|
self.logger.error(f"Backup failed: {backup_e}", exc_info=True)
|
|
# Standardize on IOError for backup failures passed up
|
|
raise IOError(f"Autobackup failed: {backup_e}") from backup_e
|
|
|
|
def execute_prepare_repo(self, svn_path):
|
|
"""
|
|
Executes the 'prepare repository' action.
|
|
|
|
Args:
|
|
svn_path (str): Validated path to the repository.
|
|
|
|
Returns:
|
|
bool: True on success. (Always True if no exception raised)
|
|
Raises:
|
|
ValueError: If repository is already prepared.
|
|
GitCommandError/IOError/Exception: If preparation fails.
|
|
"""
|
|
self.logger.info(f"Executing preparation for: {svn_path}")
|
|
# Check if already prepared first to provide specific feedback
|
|
git_dir = os.path.join(svn_path, ".git")
|
|
if os.path.exists(git_dir):
|
|
self.logger.warning("Repository is already prepared.")
|
|
# Raise ValueError to signal this specific status to the UI layer
|
|
raise ValueError("Repository is already prepared.")
|
|
|
|
# Attempt preparation using GitCommands
|
|
# Any exception (GitCommandError, ValueError, IOError) will be caught
|
|
try:
|
|
self.git_commands.prepare_svn_for_git(svn_path)
|
|
self.logger.info("Repository prepared successfully.")
|
|
return True
|
|
except Exception as e:
|
|
# Log and re-raise any exception from prepare_svn_for_git
|
|
self.logger.error(f"Failed to prepare repository: {e}", exc_info=True)
|
|
raise
|
|
|
|
def execute_create_bundle(
|
|
self,
|
|
svn_path,
|
|
bundle_full_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
autocommit_enabled,
|
|
commit_message,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
):
|
|
"""
|
|
Executes 'create bundle', including backup and commit logic.
|
|
|
|
Args:
|
|
svn_path (str): Validated path to repository.
|
|
bundle_full_path (str): Validated path for bundle file.
|
|
profile_name (str): Current profile name.
|
|
autobackup_enabled (bool): Whether to perform backup.
|
|
backup_base_dir (str): Validated base directory for backups.
|
|
autocommit_enabled (bool): Whether to perform autocommit.
|
|
commit_message (str): Commit message from GUI (for autocommit).
|
|
excluded_extensions (set): File extensions to exclude from backup.
|
|
excluded_dirs (set): Directory names to exclude from backup.
|
|
|
|
Returns:
|
|
str or None: Path to created bundle on success, None if empty/not created.
|
|
Raises:
|
|
IOError: If backup fails.
|
|
GitCommandError/ValueError/Exception: If commit or bundle creation fails.
|
|
"""
|
|
# --- Backup Step ---
|
|
# _perform_backup_if_enabled raises IOError on failure
|
|
self._perform_backup_if_enabled(
|
|
svn_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
|
|
# --- Autocommit Step ---
|
|
if autocommit_enabled:
|
|
self.logger.info("Autocommit before bundle is enabled.")
|
|
try:
|
|
# Check for changes before attempting commit
|
|
has_changes = self.git_commands.git_status_has_changes(svn_path)
|
|
if has_changes:
|
|
self.logger.info("Changes detected, performing autocommit...")
|
|
# Use provided message or generate default
|
|
commit_msg_to_use = (
|
|
commit_message
|
|
if commit_message
|
|
else f"Autocommit '{profile_name}' before bundle"
|
|
)
|
|
self.logger.debug(
|
|
f"Using autocommit message: '{commit_msg_to_use}'"
|
|
)
|
|
# Perform commit (raises error on failure, returns bool)
|
|
self.git_commands.git_commit(svn_path, commit_msg_to_use)
|
|
# Log based on return value? git_commit already logs detail.
|
|
self.logger.info("Autocommit attempt finished.")
|
|
else:
|
|
self.logger.info("No changes detected for autocommit.")
|
|
except Exception as commit_e:
|
|
# Log and re-raise commit error to stop the process
|
|
self.logger.error(f"Autocommit failed: {commit_e}", exc_info=True)
|
|
raise commit_e
|
|
|
|
# --- Create Bundle Step ---
|
|
self.logger.info(f"Creating bundle file: {bundle_full_path}")
|
|
try:
|
|
# Execute command (raises error on failure)
|
|
self.git_commands.create_git_bundle(svn_path, bundle_full_path)
|
|
# Check result: file exists and is not empty
|
|
bundle_exists = os.path.exists(bundle_full_path)
|
|
bundle_not_empty = bundle_exists and os.path.getsize(bundle_full_path) > 0
|
|
if bundle_exists and bundle_not_empty:
|
|
self.logger.info("Git bundle created successfully.")
|
|
return bundle_full_path # Return path on success
|
|
else:
|
|
# Bundle empty or not created (logged by GitCommands)
|
|
self.logger.warning("Bundle file not created or is empty.")
|
|
if bundle_exists and not bundle_not_empty:
|
|
# Clean up empty file
|
|
try:
|
|
os.remove(bundle_full_path)
|
|
except OSError:
|
|
self.logger.warning("Could not remove empty bundle file.")
|
|
return None # Indicate non-fatal issue (empty bundle)
|
|
except Exception as bundle_e:
|
|
# Log and re-raise bundle creation errors
|
|
self.logger.error(f"Bundle creation failed: {bundle_e}", exc_info=True)
|
|
raise bundle_e
|
|
|
|
def execute_fetch_bundle(
|
|
self,
|
|
svn_path,
|
|
bundle_full_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
):
|
|
"""
|
|
Executes the 'fetch bundle' action, including backup logic.
|
|
|
|
Args: See execute_create_bundle args, excluding commit related ones.
|
|
|
|
Raises:
|
|
IOError: If backup fails.
|
|
GitCommandError/Exception: If fetch or merge fails (incl. conflicts).
|
|
"""
|
|
# --- Backup Step ---
|
|
# Raises IOError on failure
|
|
self._perform_backup_if_enabled(
|
|
svn_path,
|
|
profile_name,
|
|
autobackup_enabled,
|
|
backup_base_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
|
|
# --- Fetch and Merge Step ---
|
|
self.logger.info(f"Fetching into '{svn_path}' from: {bundle_full_path}")
|
|
try:
|
|
# Delegate to GitCommands; it raises GitCommandError on conflict/failure
|
|
self.git_commands.fetch_from_git_bundle(svn_path, bundle_full_path)
|
|
self.logger.info("Fetch/merge process completed successfully.")
|
|
# No return needed, success indicated by no exception
|
|
except Exception as fetch_e:
|
|
# Log and re-raise any error from fetch/merge
|
|
self.logger.error(f"Fetch/merge failed: {fetch_e}", exc_info=True)
|
|
raise fetch_e
|
|
|
|
def execute_manual_commit(self, svn_path, commit_message):
|
|
"""
|
|
Executes a manual commit with the provided message.
|
|
|
|
Args:
|
|
svn_path (str): Validated path to the repository.
|
|
commit_message (str): The commit message (must not be empty).
|
|
|
|
Returns:
|
|
bool: True if a commit was made, False if no changes were committed.
|
|
Raises:
|
|
ValueError: If commit_message is empty.
|
|
GitCommandError/Exception: If commit command fails.
|
|
"""
|
|
if not commit_message:
|
|
# This validation should ideally happen before calling this method
|
|
self.logger.error("Manual commit attempt with empty message.")
|
|
raise ValueError("Commit message cannot be empty.")
|
|
|
|
self.logger.info(f"Executing manual commit for: {svn_path}")
|
|
try:
|
|
# git_commit handles staging and commit attempt
|
|
# It returns True/False and raises GitCommandError on failure
|
|
commit_made = self.git_commands.git_commit(svn_path, commit_message)
|
|
return commit_made
|
|
except Exception as e:
|
|
# Catch and re-raise errors from git_commit
|
|
self.logger.error(f"Manual commit failed: {e}", exc_info=True)
|
|
raise
|
|
|
|
def execute_create_tag(self, svn_path, commit_message, tag_name, tag_message):
|
|
"""
|
|
Executes tag creation, including pre-commit using commit_message if needed.
|
|
|
|
Args:
|
|
svn_path (str): Validated path to repository.
|
|
commit_message (str): Message for pre-tag commit (if needed). Can be empty.
|
|
tag_name (str): Name for the new tag.
|
|
tag_message (str): Annotation message for the new tag.
|
|
|
|
Returns:
|
|
bool: True on successful tag creation.
|
|
Raises:
|
|
ValueError: If validation fails (e.g., changes exist, no commit message).
|
|
GitCommandError/Exception: If commit or tag command fails.
|
|
"""
|
|
self.logger.info(f"Executing create tag '{tag_name}' for: {svn_path}")
|
|
|
|
# --- Pre-commit Step (only if changes exist AND message provided) ---
|
|
try:
|
|
has_changes = self.git_commands.git_status_has_changes(svn_path)
|
|
if has_changes:
|
|
self.logger.info("Uncommitted changes detected before tagging.")
|
|
if not commit_message:
|
|
# Block if changes exist but no message provided
|
|
msg = "Changes exist. Commit message required before tagging."
|
|
self.logger.error(f"Tag creation blocked: {msg}")
|
|
raise ValueError(msg) # Raise error for UI layer
|
|
|
|
# Perform the pre-tag commit with the provided message
|
|
self.logger.debug(f"Performing pre-tag commit: '{commit_message}'")
|
|
# git_commit raises error on failure, returns bool otherwise
|
|
commit_made = self.git_commands.git_commit(svn_path, commit_message)
|
|
# Log based on return value? git_commit logs details.
|
|
self.logger.info(
|
|
f"Pre-tag commit attempt finished (made={commit_made})."
|
|
)
|
|
else:
|
|
self.logger.info("No uncommitted changes detected before tagging.")
|
|
|
|
except Exception as e:
|
|
# Catch errors during status check or commit
|
|
self.logger.error(f"Error during pre-tag commit step: {e}", exc_info=True)
|
|
raise # Re-raise commit-related errors
|
|
|
|
# --- Create Tag Step ---
|
|
self.logger.info(f"Proceeding to create tag '{tag_name}'...")
|
|
try:
|
|
# git_commands.create_tag handles its own validation and execution
|
|
# It raises ValueError for invalid name, GitCommandError for exists/fail
|
|
self.git_commands.create_tag(svn_path, tag_name, tag_message)
|
|
self.logger.info(f"Tag '{tag_name}' created successfully.")
|
|
return True # Indicate success
|
|
except Exception as e:
|
|
# Catch errors during tag creation
|
|
self.logger.error(f"Failed to create tag '{tag_name}': {e}", exc_info=True)
|
|
raise # Re-raise tag creation errors
|
|
|
|
def execute_checkout_tag(self, svn_path, tag_name):
|
|
"""
|
|
Executes checkout for the specified tag after checking changes.
|
|
|
|
Args:
|
|
svn_path (str): Validated path to repository.
|
|
tag_name (str): The tag name to check out (validated by caller).
|
|
|
|
Returns:
|
|
bool: True on successful checkout.
|
|
Raises:
|
|
ValueError: If uncommitted changes exist.
|
|
GitCommandError/Exception: If status check or checkout fails.
|
|
"""
|
|
if not tag_name:
|
|
raise ValueError("Tag name required for checkout.")
|
|
|
|
self.logger.info(f"Executing checkout tag '{tag_name}' in: {svn_path}")
|
|
|
|
# --- Check for Uncommitted Changes ---
|
|
try:
|
|
has_changes = self.git_commands.git_status_has_changes(svn_path)
|
|
if has_changes:
|
|
msg = "Uncommitted changes exist. Commit or stash first."
|
|
self.logger.error(f"Checkout blocked: {msg}")
|
|
raise ValueError(msg) # Raise specific error for UI
|
|
self.logger.debug("No uncommitted changes found.")
|
|
except Exception as e:
|
|
# Catch errors during status check
|
|
self.logger.error(f"Status check error before checkout: {e}", exc_info=True)
|
|
raise # Re-raise status check errors
|
|
|
|
# --- Execute Checkout ---
|
|
try:
|
|
# git_commands.checkout_tag raises error on failure
|
|
checkout_success = self.git_commands.checkout_tag(svn_path, tag_name)
|
|
if checkout_success:
|
|
self.logger.info(f"Tag '{tag_name}' checked out.")
|
|
return True
|
|
else:
|
|
# This path should theoretically not be reached
|
|
self.logger.error("Checkout command reported failure unexpectedly.")
|
|
raise GitCommandError("Checkout failed for unknown reason.")
|
|
except Exception as e:
|
|
# Catch errors during checkout (e.g., tag not found)
|
|
self.logger.error(
|
|
f"Failed to checkout tag '{tag_name}': {e}", exc_info=True
|
|
)
|
|
raise # Re-raise checkout errors
|
|
|
|
# --- Branch Actions ---
|
|
def execute_create_branch(self, svn_path, branch_name, start_point=None):
|
|
"""
|
|
Executes branch creation.
|
|
|
|
Args: See git_commands.create_branch
|
|
|
|
Returns: True on success.
|
|
Raises: GitCommandError/ValueError/Exception on failure.
|
|
"""
|
|
self.logger.info(
|
|
f"Executing create branch '{branch_name}' "
|
|
f"from '{start_point or 'HEAD'}'."
|
|
)
|
|
try:
|
|
# Delegate to git_commands, raises error on failure
|
|
self.git_commands.create_branch(svn_path, branch_name, start_point)
|
|
return True
|
|
except Exception as e:
|
|
# Log and re-raise error
|
|
self.logger.error(f"Failed to create branch: {e}", exc_info=True)
|
|
raise
|
|
|
|
def execute_switch_branch(self, svn_path, branch_name):
|
|
"""
|
|
Executes branch switch after checking for changes.
|
|
|
|
Args: See git_commands.checkout_branch
|
|
|
|
Returns: True on success.
|
|
Raises: ValueError (changes exist), GitCommandError/Exception on failure.
|
|
"""
|
|
if not branch_name:
|
|
raise ValueError("Branch name required for switch.")
|
|
self.logger.info(f"Executing switch to branch '{branch_name}'.")
|
|
|
|
# --- Check for Uncommitted Changes ---
|
|
try:
|
|
has_changes = self.git_commands.git_status_has_changes(svn_path)
|
|
if has_changes:
|
|
msg = "Uncommitted changes exist. Commit or stash first."
|
|
self.logger.error(f"Switch blocked: {msg}")
|
|
raise ValueError(msg) # Raise specific error for UI
|
|
self.logger.debug("No uncommitted changes found.")
|
|
except Exception as e:
|
|
# Catch errors during status check
|
|
self.logger.error(f"Status check error before switch: {e}", exc_info=True)
|
|
raise # Re-raise status check errors
|
|
|
|
# --- Execute Switch ---
|
|
try:
|
|
# Delegate to git_commands, raises error on failure
|
|
success = self.git_commands.checkout_branch(svn_path, branch_name)
|
|
return success
|
|
except Exception as e:
|
|
# Catch errors during switch (e.g., branch not found)
|
|
self.logger.error(
|
|
f"Failed switch to branch '{branch_name}': {e}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
def execute_delete_branch(self, svn_path, branch_name, force=False):
|
|
"""
|
|
Executes branch deletion.
|
|
|
|
Args: See git_commands.delete_branch
|
|
|
|
Returns: True on success.
|
|
Raises: GitCommandError/ValueError/Exception on failure.
|
|
"""
|
|
if not branch_name:
|
|
raise ValueError("Branch name required for delete.")
|
|
# Add checks for main/master? UI layer handles this.
|
|
|
|
self.logger.info(f"Executing delete branch '{branch_name}' (force={force}).")
|
|
try:
|
|
# Delegate to git_commands, raises error on failure
|
|
success = self.git_commands.delete_branch(svn_path, branch_name, force)
|
|
return success
|
|
except Exception as e:
|
|
# Catch errors (like not merged, needs force)
|
|
self.logger.error(
|
|
f"Failed delete branch '{branch_name}': {e}", exc_info=True
|
|
)
|
|
raise
|
|
|
|
def execute_delete_tag(self, svn_path, tag_name):
|
|
"""
|
|
Executes deletion for the specified tag.
|
|
|
|
Args:
|
|
svn_path (str): Validated path to repository.
|
|
tag_name (str): The tag name to delete (already validated).
|
|
|
|
Returns:
|
|
bool: True on successful deletion.
|
|
Raises:
|
|
GitCommandError/ValueError/Exception: If delete fails.
|
|
"""
|
|
if not tag_name:
|
|
# Should be validated by caller
|
|
raise ValueError("Tag name required for deletion.")
|
|
|
|
self.logger.info(f"Executing delete tag '{tag_name}' in: {svn_path}")
|
|
try:
|
|
# Delegate deletion to GitCommands method
|
|
# This raises GitCommandError if tag not found or other git error
|
|
success = self.git_commands.delete_tag(svn_path, tag_name)
|
|
return success # Should be True if no exception
|
|
except Exception as e:
|
|
# Catch and re-raise errors from git_commands.delete_tag
|
|
self.logger.error(f"Failed to delete tag '{tag_name}': {e}", exc_info=True)
|
|
raise # Re-raise for the UI layer to handle
|