SXXXXXXX_GitUtility/backup_handler.py

346 lines
16 KiB
Python

# --- START OF FILE backup_handler.py ---
# backup_handler.py
import os
import datetime
import zipfile
# Rimosso import logging
import sys # Per fallback print
# Importa il nuovo gestore della coda log
import log_handler
class BackupHandler:
"""Handles the creation of ZIP backups with file and directory exclusions."""
# Rimosso logger da __init__
def __init__(self, logger_ignored=None): # Accetta argomento ma lo ignora
"""Initializes the BackupHandler."""
# Non c'è più self.logger
log_handler.log_debug("BackupHandler initialized.", func_name="__init__")
# self.logger = logger # Rimosso
def _log_walk_error(self, os_error: OSError):
"""
Error handler callback for os.walk to log issues during traversal.
Uses log_handler.
"""
# Log PermissionError and other OSError subclasses encountered during walk
if isinstance(os_error, PermissionError):
# Messaggio specifico per PermissionError
log_handler.log_warning(
f"OS error during directory walk: Cannot access '{os_error.filename}'. "
f"Reason: Permission denied. Skipping item/directory.",
func_name="_log_walk_error",
)
elif isinstance(os_error, OSError):
# Messaggio generico per altre OSError
log_handler.log_warning(
f"OS error during directory walk: Cannot access '{os_error.filename}'. "
f"Reason: {os_error.strerror}. Skipping item/directory.",
func_name="_log_walk_error",
)
else:
# Logga altri errori imprevisti durante la camminata
# Usa log_exception per includere traceback (su console)
log_handler.log_exception(
f"Unexpected error during directory walk: {os_error}",
func_name="_log_walk_error",
)
# Returning None (implicitly) tells os.walk to continue if possible
def create_zip_backup(
self,
source_repo_path: str, # Directory to back up
backup_base_dir: str, # Destination base directory for the zip
profile_name: str, # Profile name for backup filename
excluded_extensions: set, # Set of lowercase extensions like {'.log', '.tmp'}
excluded_dirs_base: set, # Set of lowercase dir base names like {'__pycache__', '.git'}
):
"""
Creates a timestamped ZIP backup of the source directory, respecting exclusions.
Uses log_handler for logging.
Args:
source_repo_path (str): Absolute path to the directory to back up.
backup_base_dir (str): Absolute path to the base backup directory.
profile_name (str): Name of the current profile (for filename).
excluded_extensions (set): Set of lowercase file extensions to exclude.
excluded_dirs_base (set): Set of lowercase directory base names to exclude.
Returns:
str | None: Full path of the created ZIP file on success, or None otherwise.
Raises:
ValueError: If input paths invalid or required args missing/wrong type.
IOError: If directory creation or file writing fails critically.
Exception: For other unexpected errors during ZIP creation.
"""
func_name = "create_zip_backup" # Nome funzione per log
log_handler.log_info(
f"Starting ZIP backup for profile '{profile_name}'...", func_name=func_name
)
log_handler.log_debug(
f" Source path: '{source_repo_path}'", func_name=func_name
)
log_handler.log_debug(
f" Backup base directory: '{backup_base_dir}'", func_name=func_name
)
log_handler.log_debug(
f" Excluded extensions: {excluded_extensions}", func_name=func_name
)
log_handler.log_debug(
f" Excluded directory names (base): {excluded_dirs_base}",
func_name=func_name,
)
# --- 1. Validate Inputs ---
if not source_repo_path or not os.path.isdir(source_repo_path):
raise ValueError(f"Invalid source directory path: {source_repo_path}")
if not backup_base_dir:
raise ValueError("Backup base directory cannot be empty.")
if not isinstance(excluded_extensions, set):
raise TypeError("excluded_extensions must be a set.")
if not isinstance(excluded_dirs_base, set):
raise TypeError("excluded_dirs_base must be a set.")
# --- 2. Prepare Destination Directory ---
try:
# Check if base directory exists, create if not
if not os.path.isdir(backup_base_dir):
log_handler.log_info(
f"Creating backup base directory: {backup_base_dir}",
func_name=func_name,
)
# exist_ok=True prevents error if dir was created between check and makedirs
os.makedirs(backup_base_dir, exist_ok=True)
# Check write permissions explicitly (important!)
if not os.access(backup_base_dir, os.W_OK | os.X_OK):
# Raise PermissionError which inherits from OSError
raise PermissionError(
f"Write/traverse permission denied for backup directory: {backup_base_dir}"
)
except OSError as e:
# Log and re-raise directory creation/access errors as IOError
log_handler.log_error(
f"Cannot create/access backup dir '{backup_base_dir}': {e}",
func_name=func_name,
)
raise IOError(f"Could not prepare backup directory: {e}") from e
# --- 3. Construct Backup Filename ---
try:
# Timestamp format
now_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Sanitize profile name for use in filename
safe_profile = "".join(
c for c in profile_name if c.isalnum() or c in ("_", "-")
).rstrip()
# Use default if sanitized name is empty
safe_profile = safe_profile or "profile"
# Construct filename
backup_filename = f"{now_str}_backup_{safe_profile}.zip"
backup_full_path = os.path.join(backup_base_dir, backup_filename)
log_handler.log_info(
f"Target backup ZIP file: {backup_full_path}", func_name=func_name
)
except Exception as e:
# Wrap filename construction errors in ValueError
raise ValueError(f"Could not construct valid backup filename: {e}") from e
# --- 4. Create ZIP Archive ---
files_added_count = 0
files_excluded_count = 0
dirs_excluded_count = 0
zip_file_object = None # Use a more descriptive name
try:
# Open ZIP file with compression and large file support
# Use 'with' statement for automatic closing even on error
with zipfile.ZipFile(
backup_full_path,
"w", # Write mode
compression=zipfile.ZIP_DEFLATED, # Use DEFLATE compression
allowZip64=True, # Support archives larger than 4GB
) as zip_file_object:
# Walk through the source directory tree
# topdown=True allows modification of 'dirs' list to prune traversal
# onerror=_log_walk_error handles issues like permission errors during walk
for root, dirs, files in os.walk(
source_repo_path,
topdown=True,
onerror=self._log_walk_error, # Use the error handler defined above
):
# --- Directory Exclusion Logic ---
# Keep original list for logging/counting excluded dirs
original_dirs = list(dirs)
# Modify 'dirs' in-place: keep only directories whose lowercase
# base name is NOT in the excluded_dirs_base set.
# This prevents os.walk from descending into excluded directories.
dirs[:] = [d for d in dirs if d.lower() not in excluded_dirs_base]
# Log and count the directories that were excluded at this level
excluded_at_this_level = set(original_dirs) - set(dirs)
if excluded_at_this_level:
count = len(excluded_at_this_level)
dirs_excluded_count += count
for excluded_dir_name in excluded_at_this_level:
# Log the full path of the excluded directory
excluded_dir_path = os.path.join(root, excluded_dir_name)
log_handler.log_debug(
f"Excluding directory (and contents): {excluded_dir_path}",
func_name=func_name,
)
# --- File Exclusion and Addition ---
for filename in files:
# Get file extension (lowercase)
_, file_ext = os.path.splitext(filename)
file_ext_lower = file_ext.lower()
file_full_path = os.path.join(root, filename)
# Check exclusion rules (extension)
if file_ext_lower in excluded_extensions:
log_handler.log_debug(
f"Excluding file by extension: {file_full_path}",
func_name=func_name,
)
files_excluded_count += 1
continue # Skip this file
# --- Add file to ZIP ---
# Calculate relative path for storage inside ZIP archive
try:
archive_name = os.path.relpath(
file_full_path, source_repo_path
)
except ValueError:
# Handle cases like different drives on Windows
# Use fallback: basename of dir + filename
archive_name = os.path.join(
os.path.basename(root), filename
)
log_handler.log_warning(
f"Could not get relpath for {file_full_path}. Using fallback arcname: {archive_name}",
func_name=func_name,
)
# Write the file to the ZIP archive
try:
zip_file_object.write(file_full_path, arcname=archive_name)
files_added_count += 1
# Log progress periodically for large backups
if files_added_count % 500 == 0 and files_added_count > 0:
log_handler.log_debug(
f"Added {files_added_count} files...",
func_name=func_name,
)
except FileNotFoundError:
log_handler.log_warning(
f"File vanished before writing: {file_full_path}",
func_name=func_name,
)
except PermissionError:
log_handler.log_warning(
f"Permission denied reading file: {file_full_path}",
func_name=func_name,
)
except Exception as write_e:
# Log error writing specific file but allow backup to continue if possible
log_handler.log_error(
f"Error writing file '{file_full_path}' to ZIP: {write_e}",
func_name=func_name,
)
# Do not increment files_excluded_count here, it's a write error
# --- End of os.walk loop ---
log_handler.log_info(
"Finished walking directory tree and adding files.",
func_name=func_name,
)
# --- End of 'with zipfile.ZipFile' block (file automatically closed) ---
log_handler.log_debug(
f"ZIP file '{backup_full_path}' closed.", func_name=func_name
)
# Log final summary after successful walk and write attempts
log_handler.log_info(
f"Backup Summary - Files Added: {files_added_count}, "
f"Excluded Files (ext): {files_excluded_count}, "
f"Excluded Dirs (name): {dirs_excluded_count}",
func_name=func_name,
)
except (OSError, zipfile.BadZipFile, zipfile.LargeZipFile) as e:
# Handle critical OS errors (permissions, disk space) and ZIP format errors
log_handler.log_error(
f"Error creating/writing ZIP '{backup_full_path}': {e}",
func_name=func_name,
)
# Re-raise as IOError for the caller (ActionHandler)
raise IOError(f"Failed to create/write backup ZIP: {e}") from e
except Exception as e:
# Catch any other unexpected error during the process
log_handler.log_exception(
f"Unexpected error during ZIP backup: {e}", func_name=func_name
)
raise # Re-raise the original exception
# --- Final Check and Cleanup ---
final_path_to_return = None
try:
# Check if the file exists after the process
if os.path.exists(backup_full_path):
# Check if any files were actually added
if files_added_count > 0:
# Success: file exists and has content
final_path_to_return = backup_full_path
else:
# File exists but is empty, likely source was empty or all excluded
log_handler.log_warning(
f"Backup ZIP is empty (0 files added): {backup_full_path}. Removing.",
func_name=func_name,
)
try:
os.remove(backup_full_path)
log_handler.log_info(
"Removed empty backup ZIP file.", func_name=func_name
)
except OSError as rm_e:
log_handler.log_error(
f"Failed to remove empty backup ZIP: {rm_e}",
func_name=func_name,
)
final_path_to_return = None # Indicate no valid backup created
else:
# File doesn't exist at the end
if files_added_count > 0:
# This indicates an unexpected issue
log_handler.log_error(
f"Backup process reported adding files, but ZIP missing: {backup_full_path}",
func_name=func_name,
)
else:
# Normal if source was empty or all excluded
log_handler.log_info(
"Backup process completed without creating a file (source empty or all excluded).",
func_name=func_name,
)
final_path_to_return = None
except Exception as final_check_e:
# Error during the final check/cleanup phase
log_handler.log_error(
f"Error during final check/cleanup of backup file: {final_check_e}",
func_name=func_name,
)
final_path_to_return = None # Assume failure if check fails
return final_path_to_return # Return path or None
# --- END OF FILE backup_handler.py ---