SXXXXXXX_GitUtility/gitutility/logic/repository_handler.py
2025-07-29 09:06:29 +02:00

410 lines
19 KiB
Python

# --- FILE: gitsync_tool/logic/repository_handler.py ---
import os
from typing import TYPE_CHECKING, Optional, List, Dict, Any, Tuple
from gitutility.async_tasks import async_workers
from gitutility.gui.dialogs import CreateTagDialog, CreateBranchDialog
from gitutility.logging_setup import log_handler
from gitutility.commands.git_commands import GitCommandError
# Forward reference for type hinting to avoid circular import
if TYPE_CHECKING:
from ..app import GitSvnSyncApp
class RepositoryHandler:
"""
Handles logic for local repository operations such as preparing the repo,
managing bundles, committing changes, and managing local tags and branches.
"""
def __init__(self, app: "GitSvnSyncApp"):
"""
Initializes the RepositoryHandler.
Args:
app: The main application instance.
"""
self.app = app
self.main_frame = app.main_frame
self.action_handler = app.action_handler
self.git_commands = app.git_commands
self.backup_handler = app.backup_handler
def prepare_svn_for_git(self):
"""Starts async operation to prepare the repository (init, gitignore)."""
func_name = "prepare_svn_for_git"
svn_path = self.app._get_and_validate_svn_path("Prepare Repository")
if not svn_path:
return
if self.app._is_repo_ready(svn_path):
log_handler.log_info("Prepare skipped: Repository already prepared.", func_name=func_name)
self.main_frame.show_info("Info", "Repository is already prepared.")
self.app.update_svn_status_indicator(svn_path)
return
args = (self.action_handler, svn_path)
self.app._start_async_operation(
worker_func=async_workers.run_prepare_async,
args_tuple=args,
context_dict={"context": "prepare_repo", "status_msg": "Preparing repository"},
)
def create_git_bundle(self):
"""Starts async operation to create a Git bundle file."""
func_name = "create_git_bundle"
profile = self.main_frame.profile_var.get()
svn_path = self.app._get_and_validate_svn_path("Create Bundle")
usb_path = self.app._get_and_validate_usb_path("Create Bundle")
bundle_name = self.main_frame.repo_tab.bundle_name_entry.get().strip()
if not all([profile, svn_path, usb_path, bundle_name]):
log_handler.log_warning("Create Bundle cancelled: Missing inputs.", func_name=func_name)
return
if not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
return
if not bundle_name.lower().endswith(".bundle"):
bundle_name += ".bundle"
bundle_full_path = os.path.join(usb_path, bundle_name)
if not self.app.profile_handler.save_profile_settings():
if not self.main_frame.ask_yes_no("Warning", "Could not save profile settings.\nContinue creating bundle anyway?"):
return
excluded_extensions, excluded_dirs = self.app._parse_exclusions()
backup_enabled = self.main_frame.backup_tab.autobackup_var.get()
backup_dir = self.main_frame.backup_tab.backup_dir_var.get()
commit_enabled = self.main_frame.commit_tab.autocommit_var.get()
commit_msg = self.main_frame.commit_tab.get_commit_message()
args = (
self.action_handler, svn_path, bundle_full_path, profile,
backup_enabled, backup_dir, commit_enabled, commit_msg,
excluded_extensions, excluded_dirs,
)
self.app._start_async_operation(
worker_func=async_workers.run_create_bundle_async,
args_tuple=args,
context_dict={"context": "create_bundle", "status_msg": f"Creating bundle '{bundle_name}'", "committed_flag_possible": True},
)
def fetch_from_git_bundle(self):
"""Starts async operation to fetch/clone from a Git bundle file."""
func_name = "fetch_from_git_bundle"
profile = self.main_frame.profile_var.get()
svn_path_str = self.main_frame.repo_tab.svn_path_entry.get().strip()
usb_path = self.app._get_and_validate_usb_path("Fetch Bundle")
bundle_name = self.main_frame.repo_tab.bundle_updated_name_entry.get().strip()
if not all([profile, svn_path_str, usb_path, bundle_name]):
log_handler.log_warning("Fetch Bundle cancelled: Missing inputs.", func_name=func_name)
return
bundle_full_path = os.path.join(usb_path, bundle_name)
if not os.path.isfile(bundle_full_path):
self.main_frame.show_error("File Not Found", f"Bundle file not found:\n{bundle_full_path}")
return
if not self.app.profile_handler.save_profile_settings():
if not self.main_frame.ask_yes_no("Warning", "Could not save profile settings.\nContinue fetching anyway?"):
return
excluded_extensions, excluded_dirs = self.app._parse_exclusions()
backup_enabled = self.main_frame.backup_tab.main_frame.autobackup_var.get()
backup_dir = self.main_frame.backup_tab.main_frame.backup_dir_var.get()
args = (
self.action_handler, svn_path_str, bundle_full_path, profile,
backup_enabled, backup_dir, excluded_extensions, excluded_dirs,
)
self.app._start_async_operation(
worker_func=async_workers.run_fetch_bundle_async,
args_tuple=args,
context_dict={"context": "fetch_bundle", "status_msg": f"Fetching from '{bundle_name}'", "repo_path": svn_path_str},
)
def commit_changes(self):
"""Starts async operation to commit staged changes."""
func_name = "commit_changes"
svn_path = self.app._get_and_validate_svn_path("Commit")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not ready.")
return
commit_msg = self.main_frame.commit_tab.get_commit_message()
if not commit_msg:
self.main_frame.show_error("Input Error", "Commit message cannot be empty.")
return
args = (self.action_handler, svn_path, commit_msg)
self.app._start_async_operation(
worker_func=async_workers.run_commit_async,
args_tuple=args,
context_dict={"context": "commit", "status_msg": "Committing changes", "committed_flag_possible": True},
)
def create_tag(self):
"""Handles tag creation: shows dialog, suggests name, starts async operation."""
func_name = "create_tag"
svn_path = self.app._get_and_validate_svn_path("Create Tag")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not ready.")
return
self.main_frame.update_status_bar("Processing: Generating tag suggestion...")
suggested_name = self.app._generate_next_tag_suggestion(svn_path)
dialog = CreateTagDialog(self.main_frame.master, suggested_tag_name=suggested_name)
tag_info = dialog.result
if tag_info:
tag_name, tag_message = tag_info
args = (self.action_handler, svn_path, tag_name, tag_message)
self.app._start_async_operation(
worker_func=async_workers.run_create_tag_async,
args_tuple=args,
context_dict={"context": "create_tag", "status_msg": f"Creating tag '{tag_name}'", "committed_flag_possible": True},
)
else:
self.main_frame.update_status_bar("Cancelled.")
def checkout_tag(self):
"""Handles tag checkout after user confirmation."""
func_name = "checkout_tag"
svn_path = self.app._get_and_validate_svn_path("Checkout Tag")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not ready.")
return
tag_name = self.main_frame.tags_tab.get_selected_tag()
if not tag_name:
self.main_frame.show_error("Selection Error", "No tag selected from the list.")
return
confirmation_message = (
f"Checkout tag '{tag_name}'?\n\n"
f"Warning: This will put your repository in a 'detached HEAD' state."
)
if not self.main_frame.ask_yes_no("Confirm Checkout Tag", confirmation_message):
self.main_frame.update_status_bar("Cancelled.")
return
args = (self.action_handler, svn_path, tag_name)
self.app._start_async_operation(
worker_func=async_workers.run_checkout_tag_async,
args_tuple=args,
context_dict={"context": "checkout_tag", "status_msg": f"Checking out tag '{tag_name}'"},
)
def revert_to_tag(self):
"""Handles the destructive 'Revert to Tag' action."""
func_name = "revert_to_tag"
svn_path = self.app._get_and_validate_svn_path("Revert to Tag")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not ready.")
return
tag_name = self.main_frame.tags_tab.get_selected_tag()
if not tag_name:
self.main_frame.show_error("Selection Error", "No tag selected from the list.")
return
warning_message = (
f"WARNING: Destructive Operation\n\n"
f"This will reset the repository to the state of tag '{tag_name}', "
f"permanently deleting all uncommitted changes and later commits on this branch.\n\n"
"Are you absolutely sure?"
)
if not self.main_frame.ask_yes_no("Confirm Destructive Revert", warning_message):
self.main_frame.update_status_bar("Revert cancelled.")
return
args = (self.action_handler, svn_path, tag_name)
self.app._start_async_operation(
worker_func=async_workers.run_revert_to_tag_async,
args_tuple=args,
context_dict={"context": "revert_to_tag", "status_msg": f"Reverting to tag '{tag_name}'", "tag_name": tag_name},
)
def create_branch(self):
"""Handles branch creation after getting name from a dialog."""
func_name = "create_branch"
svn_path = self.app._get_and_validate_svn_path("Create Branch")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not ready.")
return
dialog = CreateBranchDialog(self.main_frame.master)
branch_name = dialog.result
if branch_name:
args = (self.action_handler, svn_path, branch_name)
self.app._start_async_operation(
worker_func=async_workers.run_create_branch_async,
args_tuple=args,
context_dict={"context": "create_branch", "status_msg": f"Creating branch '{branch_name}'", "new_branch_name": branch_name},
)
else:
self.main_frame.update_status_bar("Cancelled.")
def checkout_branch(self, branch_to_checkout: Optional[str] = None):
"""Handles checkout of an existing local branch."""
func_name = "checkout_branch"
svn_path = self.app._get_and_validate_svn_path("Checkout Branch")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not ready.")
return
target_branch = branch_to_checkout or self.main_frame.get_selected_branch()
if not target_branch:
self.main_frame.show_error("Selection Error", "No branch selected from the list.")
return
if not branch_to_checkout: # Only ask for confirmation if triggered by button
if not self.main_frame.ask_yes_no("Confirm Checkout", f"Switch to branch '{target_branch}'?"):
self.main_frame.update_status_bar("Cancelled.")
return
args = (self.action_handler, svn_path, target_branch)
self.app._start_async_operation(
worker_func=async_workers.run_checkout_branch_async,
args_tuple=args,
context_dict={"context": "checkout_branch", "status_msg": f"Checking out branch '{target_branch}'"},
)
def get_gitignore_path(self, repo_path: str) -> str:
"""Helper to get the full path of the .gitignore file."""
return os.path.join(repo_path, ".gitignore")
def handle_gitignore_save(self):
"""Callback executed after .gitignore is saved. Starts an async untrack check."""
func_name = "handle_gitignore_save"
svn_path = self.app._get_and_validate_svn_path("Untrack Check")
if not svn_path or not self.app._is_repo_ready(svn_path):
log_handler.log_error("Cannot start untrack check: Invalid/Not ready path.", func_name=func_name)
return
args = (self.action_handler, svn_path)
self.app._start_async_operation(
worker_func=async_workers.run_untrack_async,
args_tuple=args,
context_dict={"context": "_handle_gitignore_save", "status_msg": "Checking files to untrack", "committed_flag_possible": True}
)
def handle_manual_backup(self):
"""Starts a manual backup operation."""
func_name = "handle_manual_backup"
profile = self.main_frame.profile_var.get()
svn_path = self.app._get_and_validate_svn_path(f"Manual Backup ({profile})")
backup_dir_str = self.main_frame.backup_tab.backup_dir_var.get().strip()
if not profile or not svn_path: return
if not backup_dir_str:
self.main_frame.show_error("Input Error", "Backup directory cannot be empty for manual backup.")
return
if not self.app.profile_handler.save_profile_settings():
if not self.main_frame.ask_yes_no("Warning", "Could not save profile settings.\nContinue backup anyway?"):
return
excluded_extensions, excluded_dirs = self.app._parse_exclusions()
args = (self.backup_handler, svn_path, os.path.abspath(backup_dir_str), profile, excluded_extensions, excluded_dirs)
self.app._start_async_operation(
worker_func=async_workers.run_manual_backup_async,
args_tuple=args,
context_dict={"context": "manual_backup", "status_msg": "Creating manual backup"}
)
def handle_add_selected_file(self, file_status_line: str):
"""Starts an async operation to add a selected untracked file."""
func_name = "handle_add_selected_file"
svn_path = self.app._get_and_validate_svn_path("Add File")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository is not ready.")
return
relative_path = self.app._extract_path_from_status_line(file_status_line)
if not relative_path:
self.main_frame.show_error("Parsing Error", f"Cannot parse file path from:\n{file_status_line}")
return
if not file_status_line.strip().startswith("??"):
self.main_frame.show_error("Invalid Action", f"Cannot 'Add' file with status '{file_status_line[:2]}'.")
return
args = (self.git_commands, svn_path, relative_path)
self.app._start_async_operation(
worker_func=async_workers.run_add_file_async,
args_tuple=args,
context_dict={"context": "add_file", "status_msg": f"Adding '{os.path.basename(relative_path)}'"}
)
def handle_delete_local_branch(self, branch_name: str, force: bool):
"""Handles the request to delete a local branch."""
func_name = "handle_delete_local_branch"
action_desc = "Force delete" if force else "Delete"
svn_path = self.app._get_and_validate_svn_path(f"{action_desc} Branch")
if not svn_path or not self.app._is_repo_ready(svn_path): return
confirm_message = f"Are you sure you want to delete the local branch '{branch_name}'?"
if force:
confirm_message += "\n\nWARNING: Force delete will discard any unmerged changes!"
if not self.main_frame.ask_yes_no(f"Confirm {action_desc}", confirm_message):
return
args = (self.action_handler, svn_path, branch_name, force)
self.app._start_async_operation(
worker_func=async_workers.run_delete_local_branch_async,
args_tuple=args,
context_dict={"context": "delete_local_branch", "status_msg": f"{action_desc} branch '{branch_name}'"}
)
def handle_merge_local_branch(self, branch_to_merge: str):
"""Handles the request to merge a local branch."""
func_name = "handle_merge_local_branch"
svn_path = self.app._get_and_validate_svn_path("Merge Branch")
if not svn_path or not self.app._is_repo_ready(svn_path): return
try:
current_branch = self.git_commands.get_current_branch_name(svn_path)
if not current_branch:
raise ValueError("Cannot merge: Currently in detached HEAD state.")
if current_branch == branch_to_merge:
raise ValueError("Cannot merge a branch into itself.")
except (GitCommandError, ValueError) as e:
self.main_frame.show_error("Merge Error", f"Cannot start merge:\n{e}")
return
if not self.main_frame.ask_yes_no("Confirm Merge", f"Merge branch '{branch_to_merge}' into '{current_branch}'?"):
return
args = (self.action_handler, self.git_commands, svn_path, branch_to_merge, False) # no_ff = False
self.app._start_async_operation(
worker_func=async_workers.run_merge_local_branch_async,
args_tuple=args,
context_dict={
"context": "merge_local_branch",
"status_msg": f"Merging '{branch_to_merge}' into '{current_branch}'",
"repo_path": svn_path
}
)
def handle_view_commit_details(self, commit_hash_short: str):
"""Starts an async worker to fetch and display details for a commit."""
func_name = "handle_view_commit_details"
svn_path = self.app._get_and_validate_svn_path("View Commit Details")
if not svn_path or not self.app._is_repo_ready(svn_path):
self.main_frame.show_error("Action Failed", "Repository path is not valid.")
return
args = (self.git_commands, svn_path, commit_hash_short)
self.app._start_async_operation(
worker_func=async_workers.run_get_commit_details_async,
args_tuple=args,
context_dict={"context": "get_commit_details", "status_msg": f"Loading details for commit {commit_hash_short}"}
)