174 lines
8.1 KiB
Python
174 lines
8.1 KiB
Python
# backup_handler.py
|
|
import os
|
|
import datetime
|
|
import zipfile
|
|
import logging
|
|
|
|
# Note: Assumes ConfigManager is available via dependency injection or other means
|
|
# if needed for settings beyond exclusions passed directly.
|
|
# For now, it only needs exclusions passed to the create method.
|
|
|
|
class BackupHandler:
|
|
"""Handles the creation of ZIP backups with exclusions."""
|
|
|
|
def __init__(self, logger):
|
|
"""
|
|
Initializes the BackupHandler.
|
|
|
|
Args:
|
|
logger (logging.Logger): Logger instance.
|
|
"""
|
|
self.logger = logger
|
|
|
|
|
|
def create_zip_backup(self, source_repo_path, backup_base_dir,
|
|
profile_name, excluded_extensions, excluded_dirs_base):
|
|
"""
|
|
Creates a timestamped ZIP backup of the source repository directory,
|
|
respecting provided exclusions.
|
|
|
|
Args:
|
|
source_repo_path (str): Absolute path to the repository to back up.
|
|
backup_base_dir (str): Absolute path to the base backup directory.
|
|
profile_name (str): Name of the current profile (for filename).
|
|
excluded_extensions (set): Set of lowercase extensions to exclude (e.g., {'.log'}).
|
|
excluded_dirs_base (set): Set of lowercase directory base names to exclude
|
|
(e.g., {'.git', '.svn'}).
|
|
|
|
Returns:
|
|
str: The full path of the created ZIP file on success.
|
|
|
|
Raises:
|
|
ValueError: If input paths are invalid.
|
|
IOError: If directory creation or file writing fails.
|
|
Exception: For other unexpected errors during ZIP creation.
|
|
"""
|
|
self.logger.info(
|
|
f"Creating ZIP backup for profile '{profile_name}' "
|
|
f"from '{source_repo_path}'"
|
|
)
|
|
|
|
# --- 1. Validate Inputs and Prepare Destination ---
|
|
if not source_repo_path or not os.path.isdir(source_repo_path):
|
|
raise ValueError(f"Invalid source repository path: {source_repo_path}")
|
|
if not backup_base_dir:
|
|
raise ValueError("Backup base directory cannot be empty.")
|
|
|
|
# Ensure backup directory exists
|
|
if not os.path.isdir(backup_base_dir):
|
|
self.logger.info(f"Creating backup base directory: {backup_base_dir}")
|
|
try:
|
|
os.makedirs(backup_base_dir, exist_ok=True)
|
|
except OSError as e:
|
|
self.logger.error(f"Cannot create backup directory: {e}",
|
|
exc_info=True)
|
|
# Re-raise as IOError for the caller
|
|
raise IOError(f"Could not create backup directory: {e}") from e
|
|
|
|
# --- 2. Construct Backup Filename ---
|
|
now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
# Sanitize profile name for use in filename
|
|
safe_profile = "".join(c for c in profile_name
|
|
if c.isalnum() or c in '_-').rstrip() or "profile"
|
|
backup_filename = f"{now_str}_backup_{safe_profile}.zip"
|
|
backup_full_path = os.path.join(backup_base_dir, backup_filename)
|
|
self.logger.info(f"Target backup ZIP file: {backup_full_path}")
|
|
|
|
# --- 3. Create ZIP Archive ---
|
|
files_added = 0
|
|
files_excluded = 0
|
|
dirs_excluded = 0
|
|
zip_f = None # Initialize zip file object
|
|
try:
|
|
# Open ZIP file with appropriate settings
|
|
zip_f = zipfile.ZipFile(backup_full_path, 'w',
|
|
compression=zipfile.ZIP_DEFLATED,
|
|
allowZip64=True) # Support large archives
|
|
|
|
# Walk through the source directory
|
|
for root, dirs, files in os.walk(source_repo_path, topdown=True):
|
|
# --- Directory Exclusion ---
|
|
original_dirs = list(dirs) # Copy before modifying
|
|
# Exclude based on base name (case-insensitive)
|
|
dirs[:] = [d for d in dirs if d.lower() not in excluded_dirs_base]
|
|
# Log excluded directories for this level
|
|
excluded_dirs_now = set(original_dirs) - set(dirs)
|
|
if excluded_dirs_now:
|
|
dirs_excluded += len(excluded_dirs_now)
|
|
for ex_dir in excluded_dirs_now:
|
|
path_excluded = os.path.join(root, ex_dir)
|
|
self.logger.debug(f"Excluding directory: {path_excluded}")
|
|
|
|
# --- File Exclusion and Addition ---
|
|
for filename in files:
|
|
# Get file extension (lowercase for comparison)
|
|
_, file_ext = os.path.splitext(filename)
|
|
file_ext_lower = file_ext.lower()
|
|
|
|
# Check exclusion rules (case-insensitive filename or extension)
|
|
if filename.lower() in excluded_dirs_base or \
|
|
file_ext_lower in excluded_extensions:
|
|
path_excluded = os.path.join(root, filename)
|
|
self.logger.debug(f"Excluding file: {path_excluded}")
|
|
files_excluded += 1
|
|
continue # Skip this file
|
|
|
|
# If not excluded, add file to ZIP
|
|
file_full_path = os.path.join(root, filename)
|
|
# Store with relative path inside ZIP archive
|
|
archive_name = os.path.relpath(file_full_path, source_repo_path)
|
|
|
|
try:
|
|
zip_f.write(file_full_path, arcname=archive_name)
|
|
files_added += 1
|
|
# Log progress occasionally for large backups
|
|
if files_added % 500 == 0:
|
|
self.logger.debug(f"Added {files_added} files...")
|
|
except Exception as write_e:
|
|
# Log error writing specific file but allow backup to continue
|
|
self.logger.error(
|
|
f"Error writing file '{file_full_path}' to ZIP: {write_e}",
|
|
exc_info=True
|
|
)
|
|
# Mark backup potentially incomplete? For now, just log.
|
|
|
|
# Log final summary after successful walk
|
|
self.logger.info(f"Backup ZIP creation process finished: {backup_full_path}")
|
|
self.logger.info(
|
|
f"Summary - Added: {files_added}, Excl Files: {files_excluded}, "
|
|
f"Excl Dirs: {dirs_excluded}"
|
|
)
|
|
# Return the path of the created zip file on success
|
|
return backup_full_path
|
|
|
|
except (OSError, zipfile.BadZipFile) as e:
|
|
# Handle OS errors and specific ZIP errors
|
|
self.logger.error(f"Error creating backup ZIP: {e}", exc_info=True)
|
|
# Re-raise as specific types or a general IOError
|
|
raise IOError(f"Failed to create backup ZIP: {e}") from e
|
|
except Exception as e:
|
|
# Catch any other unexpected error during the process
|
|
self.logger.exception(f"Unexpected error during ZIP backup: {e}")
|
|
# Re-raise the original exception
|
|
raise
|
|
finally:
|
|
# Ensure the ZIP file is always closed
|
|
if zip_f:
|
|
zip_f.close()
|
|
self.logger.debug(f"ZIP file '{backup_full_path}' closed.")
|
|
|
|
# Clean up potentially empty/failed ZIP file
|
|
zip_exists = os.path.exists(backup_full_path)
|
|
# Check if zip exists but no files were added
|
|
if zip_exists and files_added == 0:
|
|
self.logger.warning(f"Backup ZIP is empty: {backup_full_path}")
|
|
try:
|
|
os.remove(backup_full_path)
|
|
self.logger.info("Removed empty backup ZIP file.")
|
|
except OSError as rm_e:
|
|
# Log error if removal fails, but don't stop execution
|
|
self.logger.error(f"Failed remove empty backup ZIP: {rm_e}")
|
|
elif not zip_exists and files_added > 0:
|
|
# This case indicates an issue if files were supposedly added
|
|
self.logger.error("Backup process finished but ZIP file missing.")
|
|
# Consider raising an error here? |