732 lines
28 KiB
Python
732 lines
28 KiB
Python
# --- FILE: gitsync_tool/logic/repository_handler.py ---
|
|
|
|
import os
|
|
from typing import TYPE_CHECKING, Optional, List, Dict, Any, Tuple
|
|
|
|
from gitutility.async_tasks import async_workers
|
|
from gitutility.gui.dialogs import CreateTagDialog, CreateBranchDialog
|
|
from gitutility.logging_setup import log_handler
|
|
from gitutility.commands.git_commands import GitCommandError
|
|
|
|
# Forward reference for type hinting to avoid circular import
|
|
if TYPE_CHECKING:
|
|
from ..app import GitSvnSyncApp
|
|
|
|
|
|
class RepositoryHandler:
|
|
"""
|
|
Handles logic for local repository operations such as preparing the repo,
|
|
managing bundles, committing changes, and managing local tags and branches.
|
|
"""
|
|
|
|
def __init__(self, app: "GitSvnSyncApp"):
|
|
"""
|
|
Initializes the RepositoryHandler.
|
|
|
|
Args:
|
|
app: The main application instance.
|
|
"""
|
|
self.app = app
|
|
self.main_frame = app.main_frame
|
|
self.action_handler = app.action_handler
|
|
self.git_commands = app.git_commands
|
|
self.backup_handler = app.backup_handler
|
|
|
|
def prepare_svn_for_git(self):
|
|
"""Starts async operation to prepare the repository (init, gitignore)."""
|
|
func_name = "prepare_svn_for_git"
|
|
svn_path = self.app._get_and_validate_svn_path("Prepare Repository")
|
|
if not svn_path:
|
|
return
|
|
|
|
if self.app._is_repo_ready(svn_path):
|
|
log_handler.log_info(
|
|
"Prepare skipped: Repository already prepared.", func_name=func_name
|
|
)
|
|
self.main_frame.show_info("Info", "Repository is already prepared.")
|
|
self.app.update_svn_status_indicator(svn_path)
|
|
return
|
|
|
|
args = (self.action_handler, svn_path)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_prepare_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "prepare_repo",
|
|
"status_msg": "Preparing repository",
|
|
},
|
|
)
|
|
|
|
def create_git_bundle(self):
|
|
"""Starts async operation to create a Git bundle file."""
|
|
func_name = "create_git_bundle"
|
|
profile = self.main_frame.profile_var.get()
|
|
svn_path = self.app._get_and_validate_svn_path("Create Bundle")
|
|
usb_path = self.app._get_and_validate_usb_path("Create Bundle")
|
|
bundle_name = self.main_frame.repo_tab.bundle_name_entry.get().strip()
|
|
|
|
if not all([profile, svn_path, usb_path, bundle_name]):
|
|
log_handler.log_warning(
|
|
"Create Bundle cancelled: Missing inputs.", func_name=func_name
|
|
)
|
|
return
|
|
|
|
if not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
|
|
return
|
|
|
|
if not bundle_name.lower().endswith(".bundle"):
|
|
bundle_name += ".bundle"
|
|
bundle_full_path = os.path.join(usb_path, bundle_name)
|
|
|
|
if not self.app.profile_handler.save_profile_settings():
|
|
if not self.main_frame.ask_yes_no(
|
|
"Warning",
|
|
"Could not save profile settings.\nContinue creating bundle anyway?",
|
|
):
|
|
return
|
|
|
|
excluded_extensions, excluded_dirs = self.app._parse_exclusions()
|
|
backup_enabled = self.main_frame.backup_tab.autobackup_var.get()
|
|
backup_dir = self.main_frame.backup_tab.backup_dir_var.get()
|
|
commit_enabled = self.main_frame.commit_tab.autocommit_var.get()
|
|
commit_msg = self.main_frame.commit_tab.get_commit_message()
|
|
|
|
args = (
|
|
self.action_handler,
|
|
svn_path,
|
|
bundle_full_path,
|
|
profile,
|
|
backup_enabled,
|
|
backup_dir,
|
|
commit_enabled,
|
|
commit_msg,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_create_bundle_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "create_bundle",
|
|
"status_msg": f"Creating bundle '{bundle_name}'",
|
|
"committed_flag_possible": True,
|
|
},
|
|
)
|
|
|
|
def fetch_from_git_bundle(self):
|
|
"""Starts async operation to fetch/clone from a Git bundle file."""
|
|
func_name = "fetch_from_git_bundle"
|
|
profile = self.main_frame.profile_var.get()
|
|
svn_path_str = self.main_frame.repo_tab.svn_path_entry.get().strip()
|
|
usb_path = self.app._get_and_validate_usb_path("Fetch Bundle")
|
|
bundle_name = self.main_frame.repo_tab.bundle_updated_name_entry.get().strip()
|
|
|
|
if not all([profile, svn_path_str, usb_path, bundle_name]):
|
|
log_handler.log_warning(
|
|
"Fetch Bundle cancelled: Missing inputs.", func_name=func_name
|
|
)
|
|
return
|
|
|
|
bundle_full_path = os.path.join(usb_path, bundle_name)
|
|
if not os.path.isfile(bundle_full_path):
|
|
self.main_frame.show_error(
|
|
"File Not Found", f"Bundle file not found:\n{bundle_full_path}"
|
|
)
|
|
return
|
|
|
|
if not self.app.profile_handler.save_profile_settings():
|
|
if not self.main_frame.ask_yes_no(
|
|
"Warning", "Could not save profile settings.\nContinue fetching anyway?"
|
|
):
|
|
return
|
|
|
|
excluded_extensions, excluded_dirs = self.app._parse_exclusions()
|
|
backup_enabled = self.main_frame.backup_tab.main_frame.autobackup_var.get()
|
|
backup_dir = self.main_frame.backup_tab.main_frame.backup_dir_var.get()
|
|
|
|
args = (
|
|
self.action_handler,
|
|
svn_path_str,
|
|
bundle_full_path,
|
|
profile,
|
|
backup_enabled,
|
|
backup_dir,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_fetch_bundle_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "fetch_bundle",
|
|
"status_msg": f"Fetching from '{bundle_name}'",
|
|
"repo_path": svn_path_str,
|
|
},
|
|
)
|
|
|
|
def commit_changes(self):
|
|
"""Starts async operation to commit staged changes."""
|
|
func_name = "commit_changes"
|
|
svn_path = self.app._get_and_validate_svn_path("Commit")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
commit_msg = self.main_frame.commit_tab.get_commit_message()
|
|
if not commit_msg:
|
|
self.main_frame.show_error("Input Error", "Commit message cannot be empty.")
|
|
return
|
|
|
|
args = (self.action_handler, svn_path, commit_msg)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_commit_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "commit",
|
|
"status_msg": "Committing changes",
|
|
"committed_flag_possible": True,
|
|
},
|
|
)
|
|
|
|
def create_tag(self):
|
|
"""Handles tag creation: shows dialog, suggests name, starts async operation."""
|
|
func_name = "create_tag"
|
|
svn_path = self.app._get_and_validate_svn_path("Create Tag")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
self.main_frame.update_status_bar("Processing: Generating tag suggestion...")
|
|
suggested_name = self.app._generate_next_tag_suggestion(svn_path)
|
|
|
|
dialog = CreateTagDialog(
|
|
self.main_frame.master, suggested_tag_name=suggested_name
|
|
)
|
|
tag_info = dialog.result
|
|
|
|
if tag_info:
|
|
tag_name, tag_message = tag_info
|
|
args = (self.action_handler, svn_path, tag_name, tag_message)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_create_tag_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "create_tag",
|
|
"status_msg": f"Creating tag '{tag_name}'",
|
|
"committed_flag_possible": True,
|
|
},
|
|
)
|
|
else:
|
|
self.main_frame.update_status_bar("Cancelled.")
|
|
|
|
def checkout_tag(self):
|
|
"""Handles tag checkout after user confirmation."""
|
|
func_name = "checkout_tag"
|
|
svn_path = self.app._get_and_validate_svn_path("Checkout Tag")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
tag_name = self.main_frame.tags_tab.get_selected_tag()
|
|
if not tag_name:
|
|
self.main_frame.show_error(
|
|
"Selection Error", "No tag selected from the list."
|
|
)
|
|
return
|
|
|
|
confirmation_message = (
|
|
f"Checkout tag '{tag_name}'?\n\n"
|
|
f"Warning: This will put your repository in a 'detached HEAD' state."
|
|
)
|
|
if not self.main_frame.ask_yes_no("Confirm Checkout Tag", confirmation_message):
|
|
self.main_frame.update_status_bar("Cancelled.")
|
|
return
|
|
|
|
args = (self.action_handler, svn_path, tag_name)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_checkout_tag_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "checkout_tag",
|
|
"status_msg": f"Checking out tag '{tag_name}'",
|
|
},
|
|
)
|
|
|
|
def revert_to_tag(self):
|
|
"""Handles the destructive 'Revert to Tag' action."""
|
|
func_name = "revert_to_tag"
|
|
svn_path = self.app._get_and_validate_svn_path("Revert to Tag")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
tag_name = self.main_frame.tags_tab.get_selected_tag()
|
|
if not tag_name:
|
|
self.main_frame.show_error(
|
|
"Selection Error", "No tag selected from the list."
|
|
)
|
|
return
|
|
|
|
warning_message = (
|
|
f"WARNING: Destructive Operation\n\n"
|
|
f"This will reset the repository to the state of tag '{tag_name}', "
|
|
f"permanently deleting all uncommitted changes and later commits on this branch.\n\n"
|
|
"Are you absolutely sure?"
|
|
)
|
|
if not self.main_frame.ask_yes_no(
|
|
"Confirm Destructive Revert", warning_message
|
|
):
|
|
self.main_frame.update_status_bar("Revert cancelled.")
|
|
return
|
|
|
|
args = (self.action_handler, svn_path, tag_name)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_revert_to_tag_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "revert_to_tag",
|
|
"status_msg": f"Reverting to tag '{tag_name}'",
|
|
"tag_name": tag_name,
|
|
},
|
|
)
|
|
|
|
def create_branch(self):
|
|
"""Handles branch creation after getting name from a dialog."""
|
|
func_name = "create_branch"
|
|
svn_path = self.app._get_and_validate_svn_path("Create Branch")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
dialog = CreateBranchDialog(self.main_frame.master)
|
|
branch_name = dialog.result
|
|
|
|
if branch_name:
|
|
args = (self.action_handler, svn_path, branch_name)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_create_branch_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "create_branch",
|
|
"status_msg": f"Creating branch '{branch_name}'",
|
|
"new_branch_name": branch_name,
|
|
},
|
|
)
|
|
else:
|
|
self.main_frame.update_status_bar("Cancelled.")
|
|
|
|
def checkout_branch(self, branch_to_checkout: Optional[str] = None):
|
|
"""Handles checkout of an existing local branch."""
|
|
func_name = "checkout_branch"
|
|
svn_path = self.app._get_and_validate_svn_path("Checkout Branch")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
target_branch = branch_to_checkout or self.main_frame.get_selected_branch()
|
|
if not target_branch:
|
|
self.main_frame.show_error(
|
|
"Selection Error", "No branch selected from the list."
|
|
)
|
|
return
|
|
|
|
if not branch_to_checkout: # Only ask for confirmation if triggered by button
|
|
if not self.main_frame.ask_yes_no(
|
|
"Confirm Checkout", f"Switch to branch '{target_branch}'?"
|
|
):
|
|
self.main_frame.update_status_bar("Cancelled.")
|
|
return
|
|
|
|
args = (self.action_handler, svn_path, target_branch)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_checkout_branch_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "checkout_branch",
|
|
"status_msg": f"Checking out branch '{target_branch}'",
|
|
},
|
|
)
|
|
|
|
def get_gitignore_path(self, repo_path: str) -> str:
|
|
"""Helper to get the full path of the .gitignore file."""
|
|
return os.path.join(repo_path, ".gitignore")
|
|
|
|
def handle_gitignore_save(self):
|
|
"""Callback executed after .gitignore is saved. Starts an async untrack check."""
|
|
func_name = "handle_gitignore_save"
|
|
svn_path = self.app._get_and_validate_svn_path("Untrack Check")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
log_handler.log_error(
|
|
"Cannot start untrack check: Invalid/Not ready path.",
|
|
func_name=func_name,
|
|
)
|
|
return
|
|
|
|
args = (self.action_handler, svn_path)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_untrack_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "_handle_gitignore_save",
|
|
"status_msg": "Checking files to untrack",
|
|
"committed_flag_possible": True,
|
|
},
|
|
)
|
|
|
|
def handle_manual_backup(self):
|
|
"""Starts a manual backup operation."""
|
|
func_name = "handle_manual_backup"
|
|
profile = self.main_frame.profile_var.get()
|
|
svn_path = self.app._get_and_validate_svn_path(f"Manual Backup ({profile})")
|
|
backup_dir_str = self.main_frame.backup_tab.backup_dir_var.get().strip()
|
|
|
|
if not profile or not svn_path:
|
|
return
|
|
if not backup_dir_str:
|
|
self.main_frame.show_error(
|
|
"Input Error", "Backup directory cannot be empty for manual backup."
|
|
)
|
|
return
|
|
|
|
if not self.app.profile_handler.save_profile_settings():
|
|
if not self.main_frame.ask_yes_no(
|
|
"Warning", "Could not save profile settings.\nContinue backup anyway?"
|
|
):
|
|
return
|
|
|
|
excluded_extensions, excluded_dirs = self.app._parse_exclusions()
|
|
args = (
|
|
self.backup_handler,
|
|
svn_path,
|
|
os.path.abspath(backup_dir_str),
|
|
profile,
|
|
excluded_extensions,
|
|
excluded_dirs,
|
|
)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_manual_backup_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "manual_backup",
|
|
"status_msg": "Creating manual backup",
|
|
},
|
|
)
|
|
|
|
def handle_add_selected_file(self, file_status_line: str):
|
|
"""Starts an async operation to add a selected untracked file."""
|
|
func_name = "handle_add_selected_file"
|
|
svn_path = self.app._get_and_validate_svn_path("Add File")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
relative_path = self.app._extract_path_from_status_line(file_status_line)
|
|
if not relative_path:
|
|
self.main_frame.show_error(
|
|
"Parsing Error", f"Cannot parse file path from:\n{file_status_line}"
|
|
)
|
|
return
|
|
|
|
if not file_status_line.strip().startswith("??"):
|
|
self.main_frame.show_error(
|
|
"Invalid Action",
|
|
f"Cannot 'Add' file with status '{file_status_line[:2]}'.",
|
|
)
|
|
return
|
|
|
|
args = (self.git_commands, svn_path, relative_path)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_add_file_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "add_file",
|
|
"status_msg": f"Adding '{os.path.basename(relative_path)}'",
|
|
},
|
|
)
|
|
|
|
def handle_delete_local_branch(self, branch_name: str, force: bool):
|
|
"""Handles the request to delete a local branch."""
|
|
func_name = "handle_delete_local_branch"
|
|
action_desc = "Force delete" if force else "Delete"
|
|
svn_path = self.app._get_and_validate_svn_path(f"{action_desc} Branch")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
return
|
|
|
|
confirm_message = (
|
|
f"Are you sure you want to delete the local branch '{branch_name}'?"
|
|
)
|
|
if force:
|
|
confirm_message += (
|
|
"\n\nWARNING: Force delete will discard any unmerged changes!"
|
|
)
|
|
|
|
if not self.main_frame.ask_yes_no(f"Confirm {action_desc}", confirm_message):
|
|
return
|
|
|
|
args = (self.action_handler, svn_path, branch_name, force)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_delete_local_branch_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "delete_local_branch",
|
|
"status_msg": f"{action_desc} branch '{branch_name}'",
|
|
},
|
|
)
|
|
|
|
def handle_merge_local_branch(self, branch_to_merge: str):
|
|
"""Handles the request to merge a local branch."""
|
|
func_name = "handle_merge_local_branch"
|
|
svn_path = self.app._get_and_validate_svn_path("Merge Branch")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
return
|
|
|
|
try:
|
|
current_branch = self.git_commands.get_current_branch_name(svn_path)
|
|
if not current_branch:
|
|
raise ValueError("Cannot merge: Currently in detached HEAD state.")
|
|
if current_branch == branch_to_merge:
|
|
raise ValueError("Cannot merge a branch into itself.")
|
|
except (GitCommandError, ValueError) as e:
|
|
self.main_frame.show_error("Merge Error", f"Cannot start merge:\n{e}")
|
|
return
|
|
|
|
if not self.main_frame.ask_yes_no(
|
|
"Confirm Merge",
|
|
f"Merge branch '{branch_to_merge}' into '{current_branch}'?",
|
|
):
|
|
return
|
|
|
|
args = (
|
|
self.action_handler,
|
|
self.git_commands,
|
|
svn_path,
|
|
branch_to_merge,
|
|
False,
|
|
) # no_ff = False
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_merge_local_branch_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "merge_local_branch",
|
|
"status_msg": f"Merging '{branch_to_merge}' into '{current_branch}'",
|
|
"repo_path": svn_path,
|
|
},
|
|
)
|
|
|
|
def handle_promote_branch_to_main(self, branch_to_promote: Optional[str] = None):
|
|
"""Promote a selected branch to become the new main.
|
|
|
|
This creates an optional local backup of `main`, checks out `main`, resets
|
|
it to the target branch's tip, and (optionally) pushes the updated main to remote
|
|
with a force-with-lease or force push.
|
|
"""
|
|
func_name = "handle_promote_branch_to_main"
|
|
svn_path = self.app._get_and_validate_svn_path("Promote Branch to main")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
return
|
|
|
|
# Determine the actual target. The branch list may contain special entries
|
|
# like "(HEAD detached from ...)" which `get_selected_branch()` returns as None.
|
|
target_branch = None
|
|
if branch_to_promote:
|
|
target_branch = branch_to_promote
|
|
else:
|
|
# Try the helper first (normal branch names)
|
|
try:
|
|
target_branch = self.main_frame.get_selected_branch()
|
|
except Exception:
|
|
target_branch = None
|
|
|
|
# If still None, inspect the raw listbox selection to handle detached HEAD entries
|
|
if not target_branch:
|
|
try:
|
|
lb = getattr(self.main_frame, "branch_tab", None)
|
|
if lb and hasattr(lb, "branch_listbox"):
|
|
sel = lb.branch_listbox.curselection()
|
|
if sel:
|
|
raw = lb.branch_listbox.get(sel[0]).strip()
|
|
# If the entry is a parenthesized label like '(HEAD detached ...)',
|
|
# we'll treat the promotion target as the current HEAD commit.
|
|
if raw.startswith("("):
|
|
target_branch = "HEAD"
|
|
else:
|
|
# Fallback to normal parsing
|
|
target_branch = raw.lstrip("* ").strip()
|
|
except Exception:
|
|
target_branch = None
|
|
|
|
if not target_branch:
|
|
self.main_frame.show_error(
|
|
"Selection Error", "No branch selected to promote."
|
|
)
|
|
return
|
|
|
|
# Ask which branch should become the target (default to 'master')
|
|
target_main = self.main_frame.ask_branch_name(
|
|
"Target Branch",
|
|
"Enter the name of the branch to be replaced (target branch):",
|
|
default="master",
|
|
)
|
|
if not target_main:
|
|
self.main_frame.show_error(
|
|
"Input Error", "No target branch specified. Promotion cancelled."
|
|
)
|
|
return
|
|
|
|
# Prevent promoting into the same branch
|
|
if target_branch == target_main:
|
|
self.main_frame.show_error(
|
|
"Invalid Action", "Selected branch is the same as the target branch."
|
|
)
|
|
return
|
|
|
|
# Confirmation + options
|
|
confirm = self.main_frame.ask_yes_no(
|
|
"Confirm Promote",
|
|
f"This will make '{target_main}' point to the tip of '{target_branch}'.\n"
|
|
f"This is potentially destructive for history on '{target_main}'.\n\n"
|
|
"Proceed?",
|
|
)
|
|
if not confirm:
|
|
self.main_frame.update_status_bar("Cancelled.")
|
|
return
|
|
|
|
create_backup = self.main_frame.ask_yes_no(
|
|
"Create Backup",
|
|
f"Create a local backup branch for the current '{target_main}' before changing it?",
|
|
)
|
|
|
|
push_to_remote = self.main_frame.ask_yes_no(
|
|
"Push to Remote",
|
|
f"After changing '{target_main}' locally, push the new '{target_main}' to remote? (This may require force push)",
|
|
)
|
|
|
|
force_with_lease = True
|
|
if push_to_remote:
|
|
# Prefer --force-with-lease but ask if user explicitly wants plain force
|
|
use_fwl = self.main_frame.ask_yes_no(
|
|
"Use Force-With-Lease?",
|
|
"Use '--force-with-lease' when pushing (safer)?\nSelect 'No' to use plain --force.",
|
|
)
|
|
force_with_lease = bool(use_fwl)
|
|
|
|
# Determine remote name (fallback to 'origin')
|
|
remote_name = getattr(self.main_frame, "remote_tab", None)
|
|
remote_val = "origin"
|
|
try:
|
|
if hasattr(self.main_frame, "remote_tab"):
|
|
remote_val = (
|
|
self.main_frame.remote_tab.remote_name_var.get() or "origin"
|
|
)
|
|
except Exception:
|
|
remote_val = "origin"
|
|
|
|
# If the repository doesn't have the target branch, ask the user whether
|
|
# to create it before proceeding. We pass a flag to the worker that
|
|
# instructs it to create the target branch at the target commit if the user agreed.
|
|
create_main_if_missing = False
|
|
try:
|
|
branches, _ = self.git_commands.list_branches(svn_path)
|
|
except Exception:
|
|
branches = []
|
|
if target_main not in branches:
|
|
want_create = self.main_frame.ask_yes_no(
|
|
"Target Not Found",
|
|
(
|
|
f"Branch '{target_main}' was not found in the repository.\n\n"
|
|
f"Do you want to create '{target_main}' pointing to the tip of '{target_branch}' and continue?\n\n"
|
|
"Selecting 'No' will cancel the promote action."
|
|
),
|
|
)
|
|
if not want_create:
|
|
self.main_frame.show_error(
|
|
"Action Aborted",
|
|
f"Promotion cancelled because '{target_main}' branch does not exist.",
|
|
)
|
|
return
|
|
create_main_if_missing = True
|
|
|
|
args = (
|
|
self.git_commands,
|
|
svn_path,
|
|
target_branch,
|
|
target_main,
|
|
remote_val,
|
|
create_main_if_missing,
|
|
create_backup,
|
|
push_to_remote,
|
|
force_with_lease,
|
|
)
|
|
|
|
# Resolve the worker function lazily to avoid AttributeError if the
|
|
# module was partially imported earlier. Try direct attribute access
|
|
# first, then fallback to importlib to re-import the module.
|
|
try:
|
|
worker_func = async_workers.run_promote_branch_to_main_async
|
|
except Exception:
|
|
try:
|
|
import importlib
|
|
|
|
aw_mod = importlib.import_module("gitutility.async_tasks.async_workers")
|
|
worker_func = getattr(aw_mod, "run_promote_branch_to_main_async", None)
|
|
except Exception:
|
|
worker_func = None
|
|
|
|
if not callable(worker_func):
|
|
log_handler.log_error(
|
|
"Promote worker is not available in async_workers module.",
|
|
func_name=func_name,
|
|
)
|
|
self.main_frame.show_error(
|
|
"Internal Error",
|
|
"Promote action is not available. Please restart the application.",
|
|
)
|
|
return
|
|
|
|
self.app._start_async_operation(
|
|
worker_func=worker_func,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "promote_branch",
|
|
"status_msg": f"Promoting '{target_branch}' to '{target_main}'",
|
|
"repo_path": svn_path,
|
|
},
|
|
)
|
|
|
|
def handle_view_commit_details(self, commit_hash_short: str):
|
|
"""Starts an async worker to fetch and display details for a commit."""
|
|
func_name = "handle_view_commit_details"
|
|
svn_path = self.app._get_and_validate_svn_path("View Commit Details")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository path is not valid.")
|
|
return
|
|
|
|
args = (self.git_commands, svn_path, commit_hash_short)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_get_commit_details_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "get_commit_details",
|
|
"status_msg": f"Loading details for commit {commit_hash_short}",
|
|
},
|
|
)
|
|
|
|
def handle_reset_to_commit(self, commit_hash: str):
|
|
"""Handles the destructive 'Reset to Commit' action."""
|
|
func_name = "handle_reset_to_commit"
|
|
svn_path = self.app._get_and_validate_svn_path("Reset to Commit")
|
|
if not svn_path or not self.app._is_repo_ready(svn_path):
|
|
self.main_frame.show_error("Action Failed", "Repository is not ready.")
|
|
return
|
|
|
|
args = (self.action_handler, svn_path, commit_hash)
|
|
self.app._start_async_operation(
|
|
worker_func=async_workers.run_reset_to_commit_async,
|
|
args_tuple=args,
|
|
context_dict={
|
|
"context": "reset_to_commit",
|
|
"status_msg": f"Resetting to commit '{commit_hash[:7]}'",
|
|
},
|
|
)
|