SXXXXXXX_GitUtility/gitutility/async_tasks/async_workers.py
2025-11-12 12:42:14 +01:00

2829 lines
107 KiB
Python

# --- FILE: gitsync_tool/async_tasks/async_workers.py ---
import os
import queue
import logging
import datetime
from typing import List, Dict, Any, Tuple, Optional, Set
import subprocess
import re
import tempfile
from gitutility.logging_setup import log_handler
from ..commands.git_commands import GitCommands, GitCommandError
from ..core.action_handler import ActionHandler
from ..core.backup_handler import BackupHandler
from ..core.remote_actions import RemoteActionHandler
from ..core.wiki_updater import WikiUpdater, WikiUpdateStatus
from ..core.history_cleaner import HistoryCleaner
from ..core.submodule_handler import SubmoduleHandler
# === Worker per Refresh GUI ===
def run_refresh_tags_async(
git_commands: GitCommands,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to fetch tag list asynchronously."""
func_name = "run_refresh_tags_async"
log_handler.log_debug(
f"[Worker] Started: Refresh Tags for '{repo_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": [("(Error)", "")],
"message": "Tag refresh failed.",
"exception": None,
}
try:
tags_data: List[Tuple[str, str]] = git_commands.list_tags(repo_path)
count = len(tags_data)
message = f"Tags refreshed ({count} found)."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=tags_data, message=message)
except (GitCommandError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION refreshing tags: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error refreshing tags: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(f"[Worker] Finished: Refresh Tags", func_name=func_name)
def run_refresh_branches_async(
git_commands: GitCommands,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to fetch local branch list asynchronously."""
func_name = "run_refresh_branches_async"
log_handler.log_debug(
f"[Worker] Started: Refresh Local Branches for '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": (["(Error)"], None),
"message": "Branch refresh failed.",
"exception": None,
}
try:
branches, current = git_commands.list_branches(repo_path)
count = len(branches)
curr_disp = current if current else "None (Detached?)"
message = f"Local branches refreshed ({count} found). Current: {curr_disp}"
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="success", result=(branches, current), message=message
)
except (GitCommandError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION refreshing local branches: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error refreshing local branches: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Refresh Local Branches", func_name=func_name
)
def run_refresh_history_async(
git_commands: GitCommands,
repo_path: str,
branch_filter: Optional[str],
log_scope: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to fetch commit history asynchronously."""
func_name = "run_refresh_history_async"
log_handler.log_debug(
f"[Worker] Started: Refresh History ({log_scope}) for '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": ["(Error retrieving history)"],
"message": "History refresh failed.",
"exception": None,
}
try:
log_data = git_commands.get_commit_log(
repo_path, max_count=200, branch=branch_filter
)
count = len(log_data)
message = f"History refreshed ({count} entries for {log_scope})."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=log_data, message=message)
except (GitCommandError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION refreshing history: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error refreshing history: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Refresh History ({log_scope})", func_name=func_name
)
def run_refresh_changes_async(
git_commands: GitCommands,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to get status of changed files asynchronously."""
func_name = "run_refresh_changes_async"
log_handler.log_debug(
f"[Worker] Started: Refresh Changes for '{repo_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": ["(Error refreshing changes)"],
"message": "Changes refresh failed.",
"exception": None,
}
try:
files_status_list = git_commands.get_status_short(repo_path)
count = len(files_status_list)
log_handler.log_info(f"[Worker] Found {count} changes.", func_name=func_name)
message = (
f"Ready ({count} changes detected)."
if count > 0
else "Ready (No changes detected)."
)
result_payload.update(
status="success", result=files_status_list, message=message
)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION refreshing changes: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error refreshing changes: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Refresh Changes", func_name=func_name
)
# === Worker per Azioni Principali ===
def run_prepare_async(
action_handler: ActionHandler,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to prepare repository asynchronously."""
func_name = "run_prepare_async"
log_handler.log_debug(
f"[Worker] Started: Prepare Repo for '{repo_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": "Prepare failed.",
"exception": None,
}
try:
success = action_handler.execute_prepare_repo(repo_path)
message = "Repository prepared successfully."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=success, message=message)
except ValueError as e:
if "already prepared" in str(e).lower():
log_handler.log_warning(f"[Worker] Warning: {e}", func_name=func_name)
result_payload.update(
status="warning", result=True, message=str(e), exception=e
)
else:
log_handler.log_exception(
f"[Worker] VALUE ERROR preparing repo: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error preparing repository: {e}"
)
except (GitCommandError, IOError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION preparing repo: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error preparing repository: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(f"[Worker] Finished: Prepare Repo", func_name=func_name)
def run_create_bundle_async(
action_handler: ActionHandler,
repo_path: str,
bundle_full_path: str,
profile_name: str,
autobackup_enabled: bool,
backup_base_dir: str,
autocommit_enabled: bool,
commit_message: str,
excluded_extensions: Set[str],
excluded_dirs: Set[str],
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to create Git bundle asynchronously."""
func_name = "run_create_bundle_async"
log_handler.log_debug(
f"[Worker] Started: Create Bundle '{os.path.basename(bundle_full_path)}' from '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": None,
"message": "Bundle creation failed.",
"exception": None,
"committed": False,
}
try:
result_path = action_handler.execute_create_bundle(
repo_path,
bundle_full_path,
profile_name,
autobackup_enabled,
backup_base_dir,
autocommit_enabled,
commit_message,
excluded_extensions,
excluded_dirs,
)
result_payload["status"] = "success"
result_payload["result"] = result_path
result_payload["committed"] = autocommit_enabled
if result_path:
message = f"Bundle created successfully: {os.path.basename(result_path)}"
result_payload["message"] = message
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
else:
message = "Bundle creation finished (no file generated - repo empty or no changes?)."
result_payload["message"] = message
log_handler.log_warning(f"[Worker] {message}", func_name=func_name)
except (IOError, GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION creating bundle: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error creating bundle: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(f"[Worker] Finished: Create Bundle", func_name=func_name)
def run_fetch_bundle_async(
action_handler: ActionHandler,
target_repo_path_str: str,
bundle_full_path: str,
profile_name: str,
autobackup_enabled: bool,
backup_base_dir: str,
excluded_extensions: Set[str],
excluded_dirs: Set[str],
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to fetch/clone from Git bundle asynchronously."""
func_name = "run_fetch_bundle_async"
log_handler.log_debug(
f"[Worker] Started: Fetch Bundle '{os.path.basename(bundle_full_path)}' into '{target_repo_path_str}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": "Fetch from bundle failed.",
"exception": None,
"conflict": False,
"repo_path": target_repo_path_str,
}
try:
success = action_handler.execute_fetch_bundle(
target_repo_path_str,
bundle_full_path,
profile_name,
autobackup_enabled,
backup_base_dir,
excluded_extensions,
excluded_dirs,
)
message = "Fetch/Clone from bundle completed successfully."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=success, message=message)
except (FileNotFoundError, IOError, GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION fetching bundle: {e}", func_name=func_name
)
result_payload["exception"] = e
result_payload["message"] = f"Error fetching from bundle: {type(e).__name__}"
if isinstance(e, GitCommandError) and "merge conflict" in str(e).lower():
result_payload["conflict"] = True
result_payload["message"] = (
"Merge conflict occurred during fetch/merge from bundle."
)
log_handler.log_error(
"[Worker] Merge conflict detected during fetch from bundle.",
func_name=func_name,
)
elif isinstance(e, FileNotFoundError):
result_payload["message"] = (
f"Bundle file not found: {os.path.basename(bundle_full_path)}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(f"[Worker] Finished: Fetch Bundle", func_name=func_name)
def run_manual_backup_async(
backup_handler: BackupHandler,
repo_path: str,
backup_base_dir: str,
profile_name: str,
excluded_extensions: Set[str],
excluded_dirs: Set[str],
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to create manual backup asynchronously."""
func_name = "run_manual_backup_async"
log_handler.log_debug(
f"[Worker] Started: Manual Backup for '{repo_path}' (Profile: {profile_name})",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": None,
"message": "Manual backup failed.",
"exception": None,
}
try:
result_path = backup_handler.create_zip_backup(
repo_path, backup_base_dir, profile_name, excluded_extensions, excluded_dirs
)
ts = datetime.datetime.now().strftime("%H:%M:%S")
message = (
f"Manual backup created: {os.path.basename(result_path)} ({ts})."
if result_path
else f"Manual backup finished (no file generated - empty/excluded?) ({ts})."
)
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=result_path, message=message)
except (IOError, ValueError, PermissionError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION creating manual backup: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error creating backup: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(f"[Worker] Finished: Manual Backup", func_name=func_name)
def run_commit_async(
action_handler: ActionHandler,
repo_path: str,
commit_message: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to perform manual commit asynchronously."""
func_name = "run_commit_async"
log_handler.log_debug(
f"[Worker] Started: Commit for '{repo_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": "Commit failed.",
"exception": None,
"committed": False,
}
try:
committed = action_handler.execute_manual_commit(repo_path, commit_message)
message = (
"Commit successful."
if committed
else "Commit finished (no changes detected to commit)."
)
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="success", result=committed, message=message, committed=committed
)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION committing: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error committing changes: {type(e).__name__}"
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(f"[Worker] Finished: Commit", func_name=func_name)
def run_untrack_async(
action_handler: ActionHandler,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to untrack files based on .gitignore asynchronously."""
func_name = "run_untrack_async"
log_handler.log_debug(
f"[Worker] Started: Untrack Files Check for '{repo_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": "Untracking failed.",
"exception": None,
"committed": False,
}
try:
committed = action_handler.execute_untrack_files_from_gitignore(repo_path)
message = (
"Untracking complete: Files removed from index and commit created."
if committed
else "Untrack check complete (no tracked files matched .gitignore rules)."
)
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="success", result=committed, message=message, committed=committed
)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION untracking files: {e}", func_name=func_name
)
result_payload.update(
exception=e,
message=f"Error during untracking operation: {type(e).__name__}",
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Untrack Files Check", func_name=func_name
)
def run_add_file_async(
git_commands: GitCommands,
repo_path: str,
relative_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to add a file to staging asynchronously."""
func_name = "run_add_file_async"
base_filename = os.path.basename(relative_path)
log_handler.log_debug(
f"[Worker] Started: Add File '{relative_path}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": f"Add file '{base_filename}' failed.",
"exception": None,
}
try:
success = git_commands.add_file(repo_path, relative_path)
message = f"File '{base_filename}' added to staging area successfully."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=success, message=message)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION adding file: {e}", func_name=func_name
)
result_payload.update(
exception=e,
message=f"Error adding file '{base_filename}': {type(e).__name__}",
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Add File '{relative_path}'", func_name=func_name
)
def run_create_tag_async(
action_handler: ActionHandler,
repo_path: str,
tag_name: str,
tag_message: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to create an annotated tag asynchronously."""
func_name = "run_create_tag_async"
log_handler.log_debug(
f"[Worker] Started: Create Tag '{tag_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": f"Create tag '{tag_name}' failed.",
"exception": None,
"committed": False,
}
try:
success = action_handler.execute_create_tag(
repo_path, None, tag_name, tag_message
)
message = f"Tag '{tag_name}' created successfully."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="success", result=success, message=message, committed=True
)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION creating tag: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error creating tag '{tag_name}': {type(e).__name__}"
)
if isinstance(e, GitCommandError) and "already exists" in str(e).lower():
result_payload["message"] = f"Tag '{tag_name}' already exists."
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Create Tag '{tag_name}'", func_name=func_name
)
def run_checkout_tag_async(
action_handler: ActionHandler,
repo_path: str,
tag_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to checkout a tag asynchronously."""
func_name = "run_checkout_tag_async"
log_handler.log_debug(
f"[Worker] Started: Checkout Tag '{tag_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": f"Checkout tag '{tag_name}' failed.",
"exception": None,
}
try:
success = action_handler.execute_checkout_tag(repo_path, tag_name)
message = f"Checked out tag '{tag_name}' (Detached HEAD state)."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=success, message=message)
except (ValueError, GitCommandError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION checking out tag: {e}", func_name=func_name
)
msg = f"Error checking out tag '{tag_name}': {type(e).__name__}"
if isinstance(e, ValueError) and "Uncommitted changes" in str(e):
msg = "Checkout failed: Uncommitted changes exist. Commit or stash first."
elif isinstance(e, GitCommandError) and (
"not found" in str(e).lower() or "did not match" in str(e).lower()
):
msg = f"Checkout failed: Tag '{tag_name}' not found or invalid."
result_payload.update(exception=e, message=msg)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Checkout Tag '{tag_name}'", func_name=func_name
)
def run_create_branch_async(
action_handler: ActionHandler,
repo_path: str,
branch_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to create a branch asynchronously."""
func_name = "run_create_branch_async"
log_handler.log_debug(
f"[Worker] Started: Create Branch '{branch_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": f"Create branch '{branch_name}' failed.",
"exception": None,
}
try:
success = action_handler.execute_create_branch(repo_path, branch_name)
message = f"Branch '{branch_name}' created successfully."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=success, message=message)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION creating branch: {e}", func_name=func_name
)
message = f"Error creating branch '{branch_name}': {type(e).__name__}"
if isinstance(e, GitCommandError) and "already exists" in str(e).lower():
message = f"Branch '{branch_name}' already exists."
elif isinstance(e, ValueError):
message = f"Invalid branch name: {e}"
result_payload.update(exception=e, message=message)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Create Branch '{branch_name}'", func_name=func_name
)
def run_checkout_branch_async(
action_handler: ActionHandler,
repo_path: str,
branch_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to checkout an existing local branch asynchronously."""
func_name = "run_checkout_branch_async"
log_handler.log_debug(
f"[Worker] Started: Checkout Branch '{branch_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": f"Checkout branch '{branch_name}' failed.",
"exception": None,
}
try:
success = action_handler.execute_switch_branch(repo_path, branch_name)
message = f"Switched successfully to branch '{branch_name}'."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=success, message=message)
except (ValueError, GitCommandError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION checking out branch: {e}", func_name=func_name
)
msg = f"Error checking out branch '{branch_name}': {type(e).__name__}"
if isinstance(e, ValueError) and "Uncommitted changes" in str(e):
msg = "Checkout failed: Uncommitted changes exist. Commit or stash first."
elif isinstance(e, GitCommandError) and (
"not found" in str(e).lower() or "did not match" in str(e).lower()
):
msg = f"Checkout failed: Branch '{branch_name}' not found or invalid."
result_payload.update(exception=e, message=msg)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Checkout Branch '{branch_name}'", func_name=func_name
)
# === Worker per Azioni Remote ===
def run_apply_remote_config_async(
remote_action_handler: RemoteActionHandler,
repo_path: str,
remote_name: str,
remote_url: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to apply remote configuration asynchronously."""
func_name = "run_apply_remote_config_async"
log_handler.log_debug(
f"[Worker] Started: Apply Remote Config for '{remote_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": "Apply remote config failed.",
"exception": None,
}
try:
success = remote_action_handler.apply_remote_config(
repo_path, remote_name, remote_url
)
message = f"Remote '{remote_name}' configuration applied successfully."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result=success, message=message)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION applying remote config: {e}", func_name=func_name
)
result_payload.update(exception=e, message=f"Error applying remote config: {e}")
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Apply Remote Config for '{remote_name}'",
func_name=func_name,
)
def run_check_connection_async(
git_commands: GitCommands,
repo_path: str,
remote_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to check remote connection/auth using 'git ls-remote'."""
func_name = "run_check_connection_async"
log_handler.log_debug(
f"[Worker] Started: Check Connection/Auth for '{remote_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": "unknown_error",
"message": f"Failed to check remote '{remote_name}'.",
"exception": None,
}
try:
result = git_commands.git_ls_remote(repo_path, remote_name)
if result.returncode == 0:
message = f"Connection to remote '{remote_name}' successful."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(status="success", result="connected", message=message)
elif result.returncode == 2:
message = f"Connected to remote '{remote_name}' (Note: Repository might be empty or unborn)."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="success", result="connected_empty", message=message
)
else:
stderr_lower = (result.stderr or "").lower()
log_handler.log_warning(
f"[Worker] ls-remote failed (RC={result.returncode}). Stderr: {stderr_lower}",
func_name=func_name,
)
auth_errors = [
"authentication failed",
"permission denied",
"could not read username",
"could not read password",
]
conn_errors = [
"repository not found",
"could not resolve host",
"name or service not known",
"network is unreachable",
"failed to connect",
"unable to access",
"could not connect",
"connection timed out",
]
if any(err in stderr_lower for err in auth_errors):
message = (
f"Authentication required or failed for remote '{remote_name}'."
)
log_handler.log_warning(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="auth_required",
result="authentication needed",
message=message,
)
elif any(err in stderr_lower for err in conn_errors):
message = f"Connection failed for remote '{remote_name}': Repository or host not found/reachable."
log_handler.log_error(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="error", result="connection_failed", message=message
)
else:
message = f"Failed to check remote '{remote_name}'. Check logs. (RC={result.returncode})"
log_handler.log_error(
f"[Worker] Unknown error checking remote. Stderr: {result.stderr}",
func_name=func_name,
)
result_payload.update(
status="error", result="unknown_error", message=message
)
result_payload["exception"] = GitCommandError(message, stderr=result.stderr)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION checking connection: {e}",
func_name=func_name,
)
result_payload.update(
status="error",
result="worker_exception",
message=f"Unexpected error checking connection: {type(e).__name__}",
exception=e,
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Check Connection/Auth for '{remote_name}'",
func_name=func_name,
)
def run_interactive_auth_attempt_async(
git_commands: GitCommands,
repo_path: str,
remote_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to attempt interactive Git fetch to trigger credential prompts."""
func_name = "run_interactive_auth_attempt_async"
log_handler.log_info(
f"[Worker] Started: Interactive Auth Attempt for '{remote_name}' via Fetch in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": "auth_attempt_failed",
"message": f"Interactive auth for '{remote_name}' failed.",
"exception": None,
}
try:
result = git_commands.git_fetch_interactive(repo_path, remote_name)
if result.returncode == 0:
message = f"Interactive authentication attempt for '{remote_name}' seems successful."
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
result_payload.update(
status="success", result="auth_attempt_success", message=message
)
else:
message = f"Interactive authentication attempt for '{remote_name}' failed or was cancelled (RC={result.returncode})."
log_handler.log_warning(f"[Worker] {message}", func_name=func_name)
result_payload["message"] = message
result_payload["exception"] = GitCommandError(message, stderr=None)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during interactive auth attempt: {e}",
func_name=func_name,
)
result_payload.update(
status="error",
result="worker_exception",
message=f"Unexpected error during interactive auth: {type(e).__name__}",
exception=e,
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Interactive Auth Attempt for '{remote_name}'",
func_name=func_name,
)
def run_fetch_remote_async(
remote_action_handler: RemoteActionHandler,
repo_path: str,
remote_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to execute 'git fetch' asynchronously via RemoteActionHandler."""
func_name = "run_fetch_remote_async"
log_handler.log_debug(
f"[Worker] Started: Fetch Remote '{remote_name}' for '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": f"Fetch remote '{remote_name}' failed.",
"exception": None,
}
try:
result_info = remote_action_handler.execute_remote_fetch(repo_path, remote_name)
result_payload = result_info
log_handler.log_info(
f"[Worker] Fetch result status for '{remote_name}': {result_payload.get('status')}",
func_name=func_name,
)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during fetch execution: {e}",
func_name=func_name,
)
result_payload["message"] = (
f"Unexpected error during fetch operation: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Fetch Remote '{remote_name}'", func_name=func_name
)
def run_pull_remote_async(
remote_action_handler: RemoteActionHandler,
git_commands: GitCommands,
repo_path: str,
remote_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to execute 'git pull' asynchronously via RemoteActionHandler."""
func_name = "run_pull_remote_async"
log_handler.log_debug(
f"[Worker] Started: Pull Remote '{remote_name}' for '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": f"Pull remote '{remote_name}' failed.",
"exception": None,
}
try:
current_branch_name = git_commands.get_current_branch_name(repo_path)
if not current_branch_name:
raise ValueError(
"Cannot perform pull: Unable to determine current branch (possibly detached HEAD)."
)
log_handler.log_debug(
f"[Worker] Current branch for pull: '{current_branch_name}'",
func_name=func_name,
)
result_info = remote_action_handler.execute_remote_pull(
repo_path, remote_name, current_branch_name
)
result_payload = result_info
log_handler.log_info(
f"[Worker] Pull result status for '{remote_name}': {result_payload.get('status')}",
func_name=func_name,
)
if result_payload.get("status") == "conflict":
result_payload["repo_path"] = repo_path
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"[Worker] Handled EXCEPTION during pull setup/execution: {e}",
func_name=func_name,
)
result_payload["message"] = f"Pull failed: {e}"
result_payload["exception"] = e
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during pull operation: {e}",
func_name=func_name,
)
result_payload["message"] = (
f"Unexpected error during pull operation: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Pull Remote '{remote_name}'", func_name=func_name
)
def run_push_remote_async(
remote_action_handler: RemoteActionHandler,
git_commands: GitCommands,
repo_path: str,
remote_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to execute 'git push' asynchronously via RemoteActionHandler."""
func_name = "run_push_remote_async"
log_handler.log_debug(
f"[Worker] Started: Push Remote '{remote_name}' for '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": f"Push remote '{remote_name}' failed.",
"exception": None,
}
try:
current_branch_name = git_commands.get_current_branch_name(repo_path)
if not current_branch_name:
raise ValueError(
"Cannot perform push: Unable to determine current branch (possibly detached HEAD)."
)
log_handler.log_debug(
f"[Worker] Current branch for push: '{current_branch_name}'",
func_name=func_name,
)
result_info = remote_action_handler.execute_remote_push(
repo_path, remote_name, current_branch_name, force=False
)
result_payload = result_info
log_handler.log_info(
f"[Worker] Push result status for '{current_branch_name}' to '{remote_name}': {result_payload.get('status')}",
func_name=func_name,
)
if result_payload.get("status") == "rejected":
result_payload["branch_name"] = current_branch_name
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"[Worker] Handled EXCEPTION during push setup/execution: {e}",
func_name=func_name,
)
result_payload["message"] = f"Push failed: {e}"
result_payload["exception"] = e
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during push operation: {e}",
func_name=func_name,
)
result_payload["message"] = (
f"Unexpected error during push operation: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Push Remote '{remote_name}'", func_name=func_name
)
def run_push_tags_async(
remote_action_handler: RemoteActionHandler,
repo_path: str,
remote_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to execute 'git push --tags' asynchronously via RemoteActionHandler."""
func_name = "run_push_tags_async"
log_handler.log_debug(
f"[Worker] Started: Push Tags to '{remote_name}' for '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": f"Push tags to '{remote_name}' failed.",
"exception": None,
}
try:
result_info = remote_action_handler.execute_push_tags(repo_path, remote_name)
result_payload = result_info
log_handler.log_info(
f"[Worker] Push tags result status for '{remote_name}': {result_payload.get('status')}",
func_name=func_name,
)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"[Worker] Handled EXCEPTION during push tags execution: {e}",
func_name=func_name,
)
result_payload["message"] = f"Push tags failed: {e}"
result_payload["exception"] = e
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during push tags operation: {e}",
func_name=func_name,
)
result_payload["message"] = (
f"Unexpected error during push tags operation: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Push Tags to '{remote_name}'", func_name=func_name
)
def run_get_ahead_behind_async(
git_commands: GitCommands,
repo_path: str,
local_branch: str,
upstream_branch: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker function to get ahead/behind commit counts asynchronously."""
func_name = "run_get_ahead_behind_async"
log_handler.log_debug(
f"[Worker] Started: Get Ahead/Behind for '{local_branch}' vs '{upstream_branch}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": (None, None),
"message": f"Could not get ahead/behind status for '{local_branch}'.",
"exception": None,
}
try:
ahead_count, behind_count = git_commands.get_ahead_behind_count(
repo_path, local_branch, upstream_branch
)
if ahead_count is not None and behind_count is not None:
result_payload["status"] = "success"
result_payload["result"] = (ahead_count, behind_count)
message = ""
if ahead_count == 0 and behind_count == 0:
message = (
f"Branch '{local_branch}' is up to date with '{upstream_branch}'."
)
else:
parts = []
if ahead_count > 0:
plural_a = "s" if ahead_count > 1 else ""
parts.append(f"{ahead_count} commit{plural_a} ahead")
if behind_count > 0:
plural_b = "s" if behind_count > 1 else ""
parts.append(f"{behind_count} commit{plural_b} behind")
message = (
f"Branch '{local_branch}' is "
+ " and ".join(parts)
+ f" of '{upstream_branch}'."
)
result_payload["message"] = message
log_handler.log_info(f"[Worker] {message}", func_name=func_name)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION getting ahead/behind: {e}",
func_name=func_name,
)
result_payload["message"] = (
f"Unexpected error getting ahead/behind status: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Get Ahead/Behind for '{local_branch}'",
func_name=func_name,
)
def run_clone_remote_async(
git_commands: GitCommands,
remote_url: str,
local_clone_path: str,
profile_name_to_create: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker function to execute 'git clone' asynchronously."""
func_name = "run_clone_remote_async"
log_handler.log_debug(
f"[Worker] Started: Clone from '{remote_url}' into '{local_clone_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Clone operation failed.",
"exception": None,
"result": None,
}
try:
clone_result = git_commands.git_clone(remote_url, local_clone_path)
if clone_result.returncode == 0:
result_payload["status"] = "success"
result_payload["message"] = (
f"Repository cloned successfully into '{os.path.basename(local_clone_path)}'."
)
result_payload["result"] = {
"cloned_path": local_clone_path,
"profile_name": profile_name_to_create,
"remote_url": remote_url,
}
log_handler.log_info(
f"[Worker] Clone successful: {result_payload['message']}",
func_name=func_name,
)
else:
stderr_full = clone_result.stderr if clone_result.stderr else ""
stderr_lower = stderr_full.lower()
log_handler.log_error(
f"Clone command failed (RC={clone_result.returncode}). Stderr: {stderr_lower}",
func_name=func_name,
)
auth_errors = [
"authentication failed",
"permission denied",
"could not read username",
"could not read password",
]
conn_errors = [
"repository not found",
"could not resolve host",
"name or service not known",
"network is unreachable",
"failed to connect",
"unable to access",
]
path_errors = [
"already exists and is not an empty directory",
"could not create work tree",
]
if any(err in stderr_lower for err in auth_errors):
result_payload["message"] = (
f"Authentication required or failed for cloning '{remote_url}'."
)
elif any(err in stderr_lower for err in conn_errors):
result_payload["message"] = (
f"Connection failed while cloning: '{remote_url}' not found/reachable."
)
elif any(err in stderr_lower for err in path_errors):
result_payload["message"] = (
f"Clone failed: Target directory '{local_clone_path}' invalid or not empty."
)
else:
result_payload["message"] = (
f"Clone from '{remote_url}' failed (RC={clone_result.returncode}). Check logs."
)
result_payload["exception"] = GitCommandError(
result_payload["message"], stderr=stderr_full
)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during clone operation: {e}",
func_name=func_name,
)
result_payload.update(
status="error",
result="worker_exception",
message=f"Unexpected error during clone: {type(e).__name__}",
exception=e,
)
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Clone Remote '{remote_url}'", func_name=func_name
)
def run_refresh_remote_branches_async(
git_commands: GitCommands,
repo_path: str,
remote_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker function to get the list of remote branches asynchronously."""
func_name = "run_refresh_remote_branches_async"
log_handler.log_debug(
f"[Worker] Started: Refresh Remote Branches for '{remote_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": ["(Error)"],
"message": f"Could not get remote branches for '{remote_name}'.",
"exception": None,
}
try:
remote_branches = git_commands.git_list_remote_branches(repo_path, remote_name)
result_payload["status"] = "success"
result_payload["result"] = remote_branches
count = len(remote_branches)
if count > 0:
result_payload["message"] = (
f"Found {count} remote branches for '{remote_name}'."
)
else:
result_payload["message"] = (
f"No remote branches found for '{remote_name}' (or remote invalid)."
)
log_handler.log_info(
f"[Worker] {result_payload['message']}", func_name=func_name
)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION refreshing remote branches: {e}",
func_name=func_name,
)
result_payload["message"] = (
f"Unexpected error listing remote branches: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Refresh Remote Branches for '{remote_name}'",
func_name=func_name,
)
def run_checkout_tracking_branch_async(
action_handler: ActionHandler,
repo_path: str,
new_local_branch_name: str,
remote_tracking_branch_full_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker function to checkout a remote branch as a new local tracking branch."""
func_name = "run_checkout_tracking_branch_async"
log_handler.log_debug(
f"[Worker] Started: Checkout Remote '{remote_tracking_branch_full_name}' as Local '{new_local_branch_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Checkout tracking branch failed.",
"exception": None,
}
try:
result_info = action_handler.execute_checkout_tracking_branch(
repo_path, new_local_branch_name, remote_tracking_branch_full_name
)
result_payload = result_info
log_handler.log_info(
f"[Worker] Checkout tracking branch result status: {result_payload.get('status')}",
func_name=func_name,
)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during checkout tracking branch execution: {e}",
func_name=func_name,
)
result_payload["message"] = (
f"Unexpected error during checkout operation: {type(e).__name__}"
)
result_payload["exception"] = e
result_payload["result"] = "worker_exception"
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Checkout Remote Branch '{remote_tracking_branch_full_name}' as Local '{new_local_branch_name}'",
func_name=func_name,
)
def run_delete_local_branch_async(
action_handler: ActionHandler,
repo_path: str,
branch_name: str,
force: bool,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker function to delete a local branch asynchronously."""
func_name = "run_delete_local_branch_async"
action_type = "Force delete" if force else "Delete"
log_handler.log_debug(
f"[Worker] Started: {action_type} Local Branch '{branch_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": f"Delete branch '{branch_name}' failed.",
"exception": None,
}
try:
result_info = action_handler.execute_delete_local_branch(
repo_path, branch_name, force
)
result_payload = result_info
log_handler.log_info(
f"[Worker] Delete local branch '{branch_name}' result status: {result_payload.get('status')}",
func_name=func_name,
)
except ValueError as ve:
log_handler.log_error(
f"[Worker] Handled VALIDATION EXCEPTION during delete branch setup: {ve}",
func_name=func_name,
)
result_payload.update(status="error", message=str(ve), exception=ve)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during delete local branch: {e}",
func_name=func_name,
)
result_payload.update(
status="error",
message=f"Unexpected error deleting branch: {type(e).__name__}",
exception=e,
)
result_payload["result"] = "worker_exception"
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: {action_type} Local Branch '{branch_name}'",
func_name=func_name,
)
def run_merge_local_branch_async(
action_handler: ActionHandler,
git_commands: GitCommands,
repo_path: str,
branch_to_merge: str,
no_ff: bool,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker function to merge a local branch into the current branch asynchronously."""
func_name = "run_merge_local_branch_async"
log_handler.log_debug(
f"[Worker] Started: Merge Local Branch '{branch_to_merge}' into current for '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": f"Merge branch '{branch_to_merge}' failed.",
"exception": None,
}
current_branch_name: Optional[str] = None
try:
current_branch_name = git_commands.get_current_branch_name(repo_path)
if not current_branch_name:
raise ValueError("Cannot perform merge: Currently in detached HEAD state.")
log_handler.log_debug(
f"[Worker] Current branch for merge validation: '{current_branch_name}'",
func_name=func_name,
)
result_info = action_handler.execute_merge_local_branch(
repo_path, branch_to_merge, current_branch_name, no_ff
)
result_payload = result_info
log_handler.log_info(
f"[Worker] Merge local branch '{branch_to_merge}' result status: {result_payload.get('status')}",
func_name=func_name,
)
if result_payload.get("status") == "conflict":
result_payload["repo_path"] = repo_path
result_payload["branch_merged_into"] = current_branch_name
except ValueError as ve:
log_handler.log_error(
f"[Worker] Handled VALIDATION EXCEPTION during merge setup: {ve}",
func_name=func_name,
)
result_payload.update(
status="error", message=f"Merge failed: {ve}", exception=ve
)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during merge operation: {e}",
func_name=func_name,
)
result_payload.update(
status="error",
message=f"Unexpected error during merge: {type(e).__name__}",
exception=e,
)
result_payload["result"] = "worker_exception"
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Merge Local Branch '{branch_to_merge}'",
func_name=func_name,
)
def run_compare_branches_async(
git_commands: GitCommands,
repo_path: str,
ref1: str,
ref2: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to get the list of changed files between two references using diff-tree."""
func_name = "run_compare_branches_async"
log_handler.log_debug(
f"[Worker] Started: Compare Branches '{ref1}' vs '{ref2}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": ["(Error)"],
"message": f"Could not compare '{ref1}' and '{ref2}'.",
"exception": None,
}
try:
return_code, changed_files_list = git_commands.git_diff_tree(
repo_path, ref1, ref2
)
if return_code == 0:
result_payload["status"] = "success"
result_payload["result"] = changed_files_list
count = len(changed_files_list)
if count > 0:
result_payload["message"] = (
f"Comparison complete: Found {count} differences between '{ref1}' and '{ref2}'."
)
else:
result_payload["message"] = (
f"No differences found between '{ref1}' and '{ref2}'."
)
log_handler.log_info(
f"[Worker] {result_payload['message']}", func_name=func_name
)
else:
result_payload["status"] = "error"
result_payload["message"] = (
f"Failed to compare '{ref1}' and '{ref2}'. Invalid reference(s)?"
)
log_handler.log_error(
f"[Worker] git diff-tree command failed (RC={return_code})",
func_name=func_name,
)
result_payload["exception"] = GitCommandError(
result_payload["message"], stderr="See previous logs"
)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during branch comparison: {e}",
func_name=func_name,
)
result_payload["status"] = "error"
result_payload["message"] = (
f"Unexpected error comparing branches: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Compare Branches '{ref1}' vs '{ref2}'",
func_name=func_name,
)
def run_get_commit_details_async(
git_commands: GitCommands,
repo_path: str,
commit_hash: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to fetch detailed information about a specific commit."""
func_name = "run_get_commit_details_async"
log_handler.log_debug(
f"[Worker] Started: Get details for commit '{commit_hash}' in '{repo_path}'",
func_name=func_name,
)
commit_details: Dict[str, Any] = {
"hash_full": None,
"author_name": None,
"author_email": None,
"author_date": None,
"subject": None,
"body": "",
"files_changed": [],
}
result_payload: Dict[str, Any] = {
"status": "error",
"result": commit_details,
"message": f"Could not get details for commit '{commit_hash}'.",
"exception": None,
}
try:
# Attempt to retrieve commit metadata using `git show --no-patch` so we don't include the diff
show_fmt = "%H%n%an%n%ae%n%ad%n%s%n%b"
cmd = [
"git",
"show",
"--no-patch",
f"--pretty=format:{show_fmt}",
commit_hash,
]
try:
res = git_commands.log_and_execute(
cmd, repo_path, check=True, log_output_level=logging.DEBUG
)
out = (res.stdout or "").splitlines()
# Parse mandatory fields if present
if len(out) >= 5:
commit_details["hash_full"] = out[0].strip()
commit_details["author_name"] = out[1].strip()
commit_details["author_email"] = out[2].strip()
commit_details["author_date"] = out[3].strip()
commit_details["subject"] = out[4].strip()
commit_details["body"] = "\n".join(
line.rstrip() for line in out[5:]
).strip()
else:
# Fallback: use the provided short hash if parsing failed
commit_details["hash_full"] = commit_hash
except GitCommandError as gce:
# If git show failed, record the exception but continue to attempt file list
result_payload["exception"] = gce
log_handler.log_warning(
f"git show failed for '{commit_hash}': {gce}", func_name=func_name
)
# Get list of changed files between commit^ and commit
try:
ref1 = f"{commit_hash}^"
ref2 = commit_hash
rc, changed_files = git_commands.git_diff_tree(repo_path, ref1, ref2)
parsed_files: List[Tuple[str, str, Optional[str]]] = []
if rc == 0 and changed_files:
for line in changed_files:
if not line:
continue
# Prefer tab-splitting since name-status uses tabs
parts = line.split("\t")
status_char = parts[0].strip() if parts else ""
if status_char == "R" and len(parts) >= 3:
parsed_files.append(
(status_char, parts[1].strip(), parts[2].strip())
)
elif len(parts) >= 2:
parsed_files.append((status_char, parts[1].strip(), None))
else:
# Fallback: whitespace split
wparts = line.split(None, 1)
if len(wparts) == 2:
parsed_files.append(
(wparts[0].strip(), wparts[1].strip(), None)
)
except GitCommandError as gce2:
# If git_diff_tree fails, log a warning but continue with empty file list
parsed_files = []
result_payload["exception"] = gce2
log_handler.log_warning(
f"git diff-tree failed for '{commit_hash}': {gce2}", func_name=func_name
)
# Attach parsed files (may be empty) and mark success
commit_details["files_changed"] = parsed_files
result_payload["status"] = "success"
result_payload["result"] = commit_details
result_payload["message"] = f"Details retrieved for '{commit_hash}'."
log_handler.log_info(
f"[Worker] {result_payload['message']}", func_name=func_name
)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION getting commit details: {e}", func_name=func_name
)
result_payload["message"] = f"Error retrieving details: {e}"
result_payload["exception"] = e
finally:
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Get details for commit '{commit_hash}'",
func_name=func_name,
)
def run_update_wiki_async(
wiki_updater: WikiUpdater,
main_repo_path: str,
main_repo_remote_url: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to update the Gitea Wiki asynchronously."""
func_name = "run_update_wiki_async"
log_handler.log_debug("[Worker] Started: Update Gitea Wiki", func_name=func_name)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Wiki update failed.",
"exception": None,
"result": None,
}
try:
update_result = wiki_updater.update_wiki_from_docs(
main_repo_path=main_repo_path,
main_repo_remote_url=main_repo_remote_url,
)
# The status for the queue can be 'success', 'warning', or 'error'
# We map our detailed status to one of these
if update_result.status in [
WikiUpdateStatus.SUCCESS,
WikiUpdateStatus.NO_CHANGES,
]:
queue_status = "success"
elif update_result.status in [WikiUpdateStatus.DOC_NOT_FOUND]:
queue_status = "warning"
else:
queue_status = "error"
result_payload["status"] = queue_status
result_payload["message"] = update_result.message
result_payload["result"] = update_result # Pass the whole result object
log_handler.log_info(
f"[Worker] Wiki update result: {update_result.message}", func_name=func_name
)
except Exception as e:
log_handler.log_exception(
f"[Worker] UNEXPECTED EXCEPTION during wiki update: {e}",
func_name=func_name,
)
result_payload["status"] = "error"
result_payload["message"] = (
f"Unexpected error during wiki update: {type(e).__name__}"
)
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
"[Worker] Finished: Update Gitea Wiki", func_name=func_name
)
def run_promote_branch_to_main_async(
git_commands: GitCommands,
repo_path: str,
branch_to_promote: str,
main_branch: str,
remote_name: str,
create_main_if_missing: bool,
create_backup: bool,
push_to_remote: bool,
force_with_lease: bool,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to promote a given branch to become the new main.
Steps:
- Resolve branch_to_promote to a commit
- Optionally create a backup branch that references the current main
- Checkout main and reset --hard to the target commit
- Optionally push to remote with force-with-lease or force
"""
func_name = "run_promote_branch_to_main_async"
log_handler.log_debug(
f"[Worker] Started: Promote '{branch_to_promote}' to '{main_branch}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": None,
"message": f"Failed to promote '{branch_to_promote}' to '{main_branch}'.",
"exception": None,
}
try:
# Resolve target commit
try:
res = git_commands.log_and_execute(
["git", "rev-parse", "--verify", branch_to_promote],
repo_path,
check=True,
capture=True,
hide_console=True,
)
target_commit = (res.stdout or "").strip()
except Exception as e:
log_handler.log_error(
f"Failed to resolve branch '{branch_to_promote}': {e}",
func_name=func_name,
)
raise
# Ensure 'main' exists or create it if requested. If 'main' does
# not exist and create_main_if_missing is False we will abort.
created_main = False
try:
branches, _ = git_commands.list_branches(repo_path)
except Exception:
branches = []
if main_branch not in branches:
if create_main_if_missing:
try:
# Create 'main' pointing directly to the target commit.
git_commands.log_and_execute(
["git", "branch", main_branch, target_commit],
repo_path,
check=True,
)
created_main = True
log_handler.log_info(
f"Created branch '{main_branch}' at {target_commit}",
func_name=func_name,
)
except Exception as e:
log_handler.log_error(
f"Failed to create '{main_branch}': {e}", func_name=func_name
)
raise
else:
raise Exception(f"Branch '{main_branch}' not found or invalid.")
# Optionally create a backup of current main (only if main existed)
backup_branch_name = None
if create_backup and not created_main:
import datetime
ts = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
backup_branch_name = f"backup-main-before-promote-{ts}"
try:
git_commands.log_and_execute(
["git", "branch", backup_branch_name, main_branch],
repo_path,
check=True,
)
log_handler.log_info(
f"Created backup branch '{backup_branch_name}' pointing to '{main_branch}'",
func_name=func_name,
)
except Exception as e:
log_handler.log_warning(
f"Failed to create backup branch '{backup_branch_name}': {e}",
func_name=func_name,
)
# Checkout main
try:
git_commands.checkout_branch(repo_path, main_branch)
except Exception as e:
log_handler.log_error(
f"Failed to checkout '{main_branch}': {e}", func_name=func_name
)
raise
# Reset main to target commit (destructive)
try:
git_commands.git_reset_hard(repo_path, target_commit)
except Exception as e:
log_handler.log_error(
f"Failed to reset '{main_branch}' to '{target_commit}': {e}",
func_name=func_name,
)
raise
# Optionally push to remote
push_result = None
if push_to_remote:
push_cmd = ["git", "push"]
if force_with_lease:
push_cmd.append("--force-with-lease")
else:
push_cmd.append("--force")
push_cmd.extend([remote_name, main_branch])
try:
push_result = git_commands.log_and_execute(
push_cmd, repo_path, check=False, capture=True
)
if push_result.returncode == 0:
log_handler.log_info(
f"Pushed '{main_branch}' to '{remote_name}' (forced).",
func_name=func_name,
)
else:
log_handler.log_warning(
f"Push returned RC={push_result.returncode}. Stderr: {(push_result.stderr or '').strip()}",
func_name=func_name,
)
except Exception as e:
log_handler.log_error(
f"Failed to push '{main_branch}' to '{remote_name}': {e}",
func_name=func_name,
)
raise
result_payload["status"] = "success"
result_payload["result"] = {
"backup_branch": backup_branch_name,
"promoted_to": target_commit,
"pushed": bool(push_to_remote),
"push_result": push_result,
}
result_payload["message"] = (
f"Successfully promoted '{branch_to_promote}' to '{main_branch}'."
)
log_handler.log_info(
f"[Worker] {result_payload['message']}", func_name=func_name
)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION promoting branch: {e}", func_name=func_name
)
result_payload["message"] = f"Error promoting branch: {e}"
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Promote '{branch_to_promote}' to '{main_branch}'",
func_name=func_name,
)
def run_revert_to_tag_async(
action_handler: ActionHandler,
repo_path: str,
tag_name: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to perform a hard reset to a tag asynchronously."""
func_name = "run_revert_to_tag_async"
log_handler.log_debug(
f"[Worker] Started: Revert to Tag '{tag_name}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": False,
"message": f"Failed to revert to tag '{tag_name}'.",
"exception": None,
}
try:
success = action_handler.execute_revert_to_tag(repo_path, tag_name)
result_payload["status"] = "success"
result_payload["result"] = success
result_payload["message"] = (
f"Repository successfully reverted to tag '{tag_name}'."
)
log_handler.log_info(
f"[Worker] {result_payload['message']}", func_name=func_name
)
except (GitCommandError, ValueError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION reverting to tag: {e}", func_name=func_name
)
result_payload["exception"] = e
result_payload["message"] = f"Error reverting to tag '{tag_name}': {e}"
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Revert to Tag '{tag_name}'", func_name=func_name
)
def run_promote_branch_to_main_async(
git_commands: GitCommands,
repo_path: str,
branch_to_promote: str,
main_branch: str,
remote_name: str,
create_main_if_missing: bool,
create_backup: bool,
push_to_remote: bool,
force_with_lease: bool,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to promote a given branch to become the new main.
Steps:
- Resolve branch_to_promote to a commit
- Optionally create a backup branch that references the current main
- Checkout main and reset --hard to the target commit
- Optionally push to remote with force-with-lease or force
"""
func_name = "run_promote_branch_to_main_async"
log_handler.log_debug(
f"[Worker] Started: Promote '{branch_to_promote}' to '{main_branch}' in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": None,
"message": f"Failed to promote '{branch_to_promote}' to '{main_branch}'.",
"exception": None,
}
try:
# Normalize UI labels that may contain parenthesized detached-HEAD descriptions.
# Examples: "(HEAD detached from v.0.0.0.14)" -> treat as 'HEAD'
if isinstance(branch_to_promote, str) and branch_to_promote.startswith("("):
log_handler.log_debug(
f"Normalizing promoted branch label '{branch_to_promote}' to 'HEAD'",
func_name=func_name,
)
branch_to_promote = "HEAD"
# Resolve target commit
try:
res = git_commands.log_and_execute(
["git", "rev-parse", "--verify", branch_to_promote],
repo_path,
check=True,
capture=True,
hide_console=True,
)
target_commit = (res.stdout or "").strip()
except Exception as e:
log_handler.log_error(
f"Failed to resolve branch '{branch_to_promote}': {e}",
func_name=func_name,
)
raise
# Ensure 'main' exists or create it if requested.
created_main = False
try:
branches, _ = git_commands.list_branches(repo_path)
except Exception:
branches = []
if main_branch not in branches:
if create_main_if_missing:
try:
git_commands.log_and_execute(
["git", "branch", main_branch, target_commit],
repo_path,
check=True,
)
created_main = True
log_handler.log_info(
f"Created branch '{main_branch}' at {target_commit}",
func_name=func_name,
)
except Exception as e:
log_handler.log_error(
f"Failed to create '{main_branch}': {e}", func_name=func_name
)
raise
else:
raise Exception(f"Branch '{main_branch}' not found or invalid.")
# Optionally create a backup of current main (skip if we just created main)
backup_branch_name = None
if create_backup and not created_main:
import datetime
ts = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
backup_branch_name = f"backup-main-before-promote-{ts}"
try:
git_commands.log_and_execute(
["git", "branch", backup_branch_name, main_branch],
repo_path,
check=True,
)
log_handler.log_info(
f"Created backup branch '{backup_branch_name}' pointing to '{main_branch}'",
func_name=func_name,
)
except Exception as e:
log_handler.log_warning(
f"Failed to create backup branch '{backup_branch_name}': {e}",
func_name=func_name,
)
# Checkout main
try:
git_commands.checkout_branch(repo_path, main_branch)
except Exception as e:
log_handler.log_error(
f"Failed to checkout '{main_branch}': {e}", func_name=func_name
)
raise
# Reset main to target commit (destructive)
try:
git_commands.git_reset_hard(repo_path, target_commit)
except Exception as e:
log_handler.log_error(
f"Failed to reset '{main_branch}' to '{target_commit}': {e}",
func_name=func_name,
)
raise
# Optionally push to remote
push_result = None
if push_to_remote:
push_cmd = ["git", "push"]
if force_with_lease:
push_cmd.append("--force-with-lease")
else:
push_cmd.append("--force")
push_cmd.extend([remote_name, main_branch])
try:
push_result = git_commands.log_and_execute(
push_cmd, repo_path, check=False, capture=True
)
if push_result.returncode == 0:
log_handler.log_info(
f"Pushed '{main_branch}' to '{remote_name}' (forced).",
func_name=func_name,
)
else:
log_handler.log_warning(
f"Push returned RC={push_result.returncode}. Stderr: {(push_result.stderr or '').strip()}",
func_name=func_name,
)
except Exception as e:
log_handler.log_error(
f"Failed to push '{main_branch}' to '{remote_name}': {e}",
func_name=func_name,
)
raise
result_payload["status"] = "success"
result_payload["result"] = {
"backup_branch": backup_branch_name,
"promoted_to": target_commit,
"pushed": bool(push_to_remote),
"push_result": push_result,
}
result_payload["message"] = (
f"Successfully promoted '{branch_to_promote}' to '{main_branch}'."
)
log_handler.log_info(
f"[Worker] {result_payload['message']}", func_name=func_name
)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION promoting branch: {e}", func_name=func_name
)
result_payload["message"] = f"Error promoting branch: {e}"
result_payload["exception"] = e
finally:
try:
results_queue.put(result_payload)
except Exception as qe:
log_handler.log_error(
f"[Worker] Failed to put result in queue for {func_name}: {qe}",
func_name=func_name,
)
log_handler.log_debug(
f"[Worker] Finished: Promote '{branch_to_promote}' to '{main_branch}'",
func_name=func_name,
)
# === Worker per Submodules ===
def run_refresh_submodules_async(
git_commands: GitCommands,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to get the status of all submodules."""
func_name = "run_refresh_submodules_async"
log_handler.log_debug(
f"[Worker] Started: Refresh Submodules for '{repo_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"result": [],
"message": "Submodule status refresh failed.",
"exception": None,
}
try:
submodule_list = git_commands.submodule_status(repo_path)
message = f"Submodule status refreshed ({len(submodule_list)} found)."
result_payload.update(status="success", result=submodule_list, message=message)
except (GitCommandError, Exception) as e:
log_handler.log_exception(
f"[Worker] EXCEPTION refreshing submodules: {e}", func_name=func_name
)
result_payload.update(exception=e, message=f"Error refreshing submodules: {e}")
finally:
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Refresh Submodules", func_name=func_name
)
def run_add_submodule_async(
submodule_handler: SubmoduleHandler,
repo_path: str,
submodule_url: str,
submodule_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to add a new submodule."""
func_name = "run_add_submodule_async"
log_handler.log_debug(
f"[Worker] Started: Add Submodule '{submodule_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Add submodule failed.",
"exception": None,
"committed": False,
}
try:
success, message = submodule_handler.execute_add_submodule(
repo_path, submodule_url, submodule_path
)
result_payload.update(
status="success" if success else "error", message=message, committed=success
)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION adding submodule: {e}", func_name=func_name
)
result_payload.update(exception=e, message=f"Error adding submodule: {e}")
finally:
results_queue.put(result_payload)
log_handler.log_debug(f"[Worker] Finished: Add Submodule", func_name=func_name)
def run_sync_all_submodules_async(
submodule_handler: SubmoduleHandler,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to sync all submodules."""
func_name = "run_sync_all_submodules_async"
log_handler.log_debug(
f"[Worker] Started: Sync All Submodules in '{repo_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Submodule sync failed.",
"exception": None,
"committed": False,
}
try:
success, message, commit_made = submodule_handler.execute_sync_all_submodules(
repo_path
)
result_payload.update(status="success", message=message, committed=commit_made)
except (GitCommandError, ValueError) as e:
log_handler.log_error(
f"[Worker] Caught expected exception during submodule sync: {e}",
func_name=func_name,
)
result_payload.update(
status="error", message=f"Failed to sync submodules: {e}", exception=e
)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION syncing submodules: {e}", func_name=func_name
)
result_payload.update(exception=e, message=f"Error syncing submodules: {e}")
finally:
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Sync All Submodules", func_name=func_name
)
def run_remove_submodule_async(
submodule_handler: SubmoduleHandler,
repo_path: str,
submodule_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to remove a submodule."""
func_name = "run_remove_submodule_async"
log_handler.log_debug(
f"[Worker] Started: Remove Submodule '{submodule_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Remove submodule failed.",
"exception": None,
"committed": False,
}
try:
success, message = submodule_handler.execute_remove_submodule(
repo_path, submodule_path
)
result_payload.update(
status="success" if success else "error", message=message, committed=success
)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION removing submodule: {e}", func_name=func_name
)
result_payload.update(exception=e, message=f"Error removing submodule: {e}")
finally:
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Remove Submodule", func_name=func_name
)
def run_init_submodules_async(
submodule_handler: SubmoduleHandler,
repo_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to initialize missing submodules."""
func_name = "run_init_submodules_async"
log_handler.log_debug(
f"[Worker] Started: Initialize Missing Submodules in '{repo_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Submodule initialization failed.",
"exception": None,
}
try:
success, message = submodule_handler.execute_init_missing_submodules(repo_path)
result_payload.update(status="success" if success else "error", message=message)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION initializing submodules: {e}", func_name=func_name
)
result_payload.update(
exception=e, message=f"Error initializing submodules: {e}"
)
finally:
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Initialize Missing Submodules", func_name=func_name
)
def run_force_clean_submodule_async(
submodule_handler: SubmoduleHandler,
repo_path: str,
submodule_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to perform a deep clean of a submodule's configuration."""
func_name = "run_force_clean_submodule_async"
log_handler.log_debug(
f"[Worker] Started: Force Clean Submodule '{submodule_path}'",
func_name=func_name,
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Submodule clean failed.",
"exception": None,
"committed": False,
}
try:
success, message = submodule_handler.execute_force_clean_submodule(
repo_path, submodule_path
)
result_payload.update(
status="success" if success else "error", message=message, committed=success
)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION during force clean submodule: {e}", func_name=func_name
)
result_payload.update(exception=e, message=f"Error during submodule clean: {e}")
finally:
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Force Clean Submodule", func_name=func_name
)
def run_clean_submodule_async(
submodule_handler: SubmoduleHandler,
repo_path: str,
submodule_path: str,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""Worker to clean the working tree of a single submodule."""
func_name = "run_clean_submodule_async"
log_handler.log_debug(
f"[Worker] Started: Clean Submodule '{submodule_path}'", func_name=func_name
)
result_payload: Dict[str, Any] = {
"status": "error",
"message": "Submodule clean failed.",
}
try:
success, message = submodule_handler.execute_clean_submodule_working_tree(
repo_path, submodule_path
)
result_payload.update(status="success" if success else "error", message=message)
except Exception as e:
log_handler.log_exception(
f"[Worker] EXCEPTION cleaning submodule: {e}", func_name=func_name
)
result_payload.update(message=f"Error cleaning submodule: {e}")
finally:
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Clean Submodule", func_name=func_name
)
def run_check_submodule_updates_async(
git_commands: GitCommands,
repo_path: str,
submodules_info: List[Dict[str, str]],
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""
Worker to check for remote updates for a list of submodules.
"""
func_name = "run_check_submodule_updates_async"
log_handler.log_debug(
f"[Worker] Started: Check for updates for {len(submodules_info)} submodules.",
func_name=func_name,
)
update_statuses = {}
for sub_info in submodules_info:
path = sub_info.get("path")
branch = sub_info.get("tracked_branch")
if not path:
continue
if not branch or branch == "N/A":
update_statuses[path] = "Tracking N/A"
continue
submodule_full_path = os.path.join(repo_path, path)
if not os.path.isdir(submodule_full_path):
update_statuses[path] = "Dir Missing"
continue
try:
git_commands.fetch_in_directory(submodule_full_path)
remote_ref = f"origin/{branch}"
commits_behind = git_commands.get_commit_count_between(
submodule_full_path, "HEAD", remote_ref
)
if commits_behind > 0:
update_statuses[path] = (
f"Update available ({commits_behind} new commits)"
)
elif commits_behind == 0:
update_statuses[path] = "Up-to-date"
else:
update_statuses[path] = "Error checking"
except GitCommandError as e:
log_handler.log_error(
f"[Worker] Failed to check submodule '{path}': {e}", func_name=func_name
)
update_statuses[path] = "Error"
result_payload = {
"status": "success",
"result": update_statuses,
"message": "Submodule update check complete.",
}
results_queue.put(result_payload)
log_handler.log_debug(
f"[Worker] Finished: Submodule update check.", func_name=func_name
)
def run_repository_transformation_async(
git_commands: GitCommands,
config_manager: Any,
source_repo_path: str,
target_repo_name: str,
target_author_name: str,
target_author_email: str,
rename_repo: bool,
results_queue: queue.Queue[Dict[str, Any]],
) -> None:
"""
Asynchronous worker to perform repository transformation using git-filter-repo.
"""
func_name = "run_repository_transformation_async"
source_repo_basename = os.path.basename(source_repo_path)
parent_dir = os.path.dirname(source_repo_path)
target_repo_path = os.path.join(parent_dir, target_repo_name)
log_handler.log_info(f"[Worker] Started: Transformation of '{source_repo_basename}' using git-filter-repo", func_name=func_name)
result_payload: Dict[str, Any] = {
"status": "error", "message": "Transformation failed.", "exception": None, "result": None,
}
mailmap_temp_file: Optional[str] = None
try:
# --- Passo 0: Trova l'autore originale da sostituire ---
log_handler.log_info(f"[Worker] Step 0: Detecting original author from last commit...", func_name=func_name)
try:
# Esegui `git log -1` per ottenere l'autore dell'ultimo commit
log_res = git_commands.log_and_execute(
["git", "log", "-1", "--pretty=format:%an%n%ae"],
source_repo_path, check=True, capture=True
)
original_author_name, original_author_email = log_res.stdout.strip().splitlines()
log_handler.log_info(f"[Worker] Detected original author: {original_author_name} <{original_author_email}>", func_name=func_name)
except (GitCommandError, ValueError, IndexError) as e:
log_handler.log_warning(f"[Worker] Could not detect original author automatically: {e}. Proceeding with a broad replacement.", func_name=func_name)
# Se non riusciamo a rilevarlo, useremo dei placeholder comuni, la logica di fallback di mailmap è robusta
original_author_name = "Original Author" # Placeholder
original_author_email = "original@example.com" # Placeholder, non verrà usato direttamente
# --- Passo 1: Riscrivere la storia con git-filter-repo ---
log_handler.log_info(f"[Worker] Step 1: Rewriting history with git-filter-repo...", func_name=func_name)
# Crea un file mailmap temporaneo
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8', suffix='.txt') as tf:
mailmap_temp_file = tf.name
# Scrivi la regola di mapping. Questa sintassi copre sia la corrispondenza esatta, sia i casi in cui nome o email differiscono
mailmap_line = f"{target_author_name} <{target_author_email}> {original_author_name} <{original_author_email}>\n"
# Aggiungiamo una regola generica per coprire anche altri eventuali autori
mailmap_line += f"{target_author_name} <{target_author_email}>\n"
tf.write(mailmap_line)
log_handler.log_debug(f"[Worker] Created temporary mailmap file at '{mailmap_temp_file}'", func_name=func_name)
# Costruisci il comando per git-filter-repo
filter_repo_command = [
"git-filter-repo",
"--mailmap",
mailmap_temp_file,
"--force" # Necessario perché il repo non è un clone "fresco"
]
# Use a streaming execution so we can log progress lines as they arrive.
log_handler.log_info(f"[Worker] Running git-filter-repo (this may take a while)...", func_name=func_name)
try:
# Prepare startupinfo for Windows to hide console if possible
startupinfo = None
creationflags = 0
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
proc = subprocess.Popen(
filter_repo_command,
cwd=source_repo_path,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding='utf-8',
errors='replace',
startupinfo=startupinfo,
creationflags=creationflags,
)
aggregated_lines = []
if proc.stdout:
for raw_line in proc.stdout:
line = raw_line.rstrip('\n')
aggregated_lines.append(line)
# Log each line so the UI log shows live progress
try:
log_handler.log_message(logging.INFO, f"[git-filter-repo] {line}", func_name=func_name)
except Exception:
# Logging must not break the worker; ignore logging errors
pass
ret = proc.wait()
if ret != 0:
full_output = "\n".join(aggregated_lines)
raise GitCommandError(f"git-filter-repo failed with exit code {ret}", command=filter_repo_command, stderr=full_output)
log_handler.log_info(f"[Worker] History rewritten successfully.", func_name=func_name)
except FileNotFoundError as e:
raise GitCommandError("git-filter-repo not found. Ensure it is installed and in PATH.", command=filter_repo_command) from e
# --- Passo 2: Rimuovere il remote 'origin' esistente (git-filter-repo lo fa già) ---
log_handler.log_info(f"[Worker] Step 2: Verifying old remotes are removed (git-filter-repo should handle this)...", func_name=func_name)
# Non è strettamente necessario, ma è una buona pratica assicurarsene
try:
remotes = git_commands.get_remotes(source_repo_path)
if remotes:
log_handler.log_warning(f"[Worker] Remotes still exist after filtering: {remotes.keys()}. Removing them manually.", func_name=func_name)
for remote_name in list(remotes.keys()):
git_commands.log_and_execute(["git", "remote", "remove", remote_name], source_repo_path, check=True)
except GitCommandError:
pass # Va bene se non ci sono remotes
# --- Passo 3: Rinomina la cartella del repository (opzionale) ---
if rename_repo:
log_handler.log_info(f"[Worker] Step 3: Renaming repository folder to '{target_repo_name}'...", func_name=func_name)
if os.path.exists(target_repo_path):
raise IOError(f"Target directory '{target_repo_path}' already exists. Aborting.")
os.rename(source_repo_path, target_repo_path)
log_handler.log_info(f"[Worker] Folder renamed to '{target_repo_path}'.", func_name=func_name)
# --- Passo 4: Creare un nuovo profilo nel config manager ---
log_handler.log_info(f"[Worker] Step 4: Creating new profile '{target_repo_name}'...", func_name=func_name)
source_profile_name = source_repo_basename
config_manager.add_section(target_repo_name)
source_options = {}
if config_manager.config.has_section(source_profile_name):
source_options = dict(config_manager.config.items(source_profile_name))
for key, value in source_options.items():
config_manager.set_profile_option(target_repo_name, key, value)
config_manager.set_profile_option(target_repo_name, "svn_working_copy_path", target_repo_path)
config_manager.set_profile_option(target_repo_name, "remote_url", "")
# (Opzionale) Rimuovi il vecchio profilo per evitare confusione
config_manager.remove_profile_section(source_profile_name)
config_manager.save_config()
log_handler.log_info(f"[Worker] New profile created and old one removed.", func_name=func_name)
result_payload["status"] = "success"
result_payload["message"] = (
f"Transformation successful!\n\n"
f"1. Repository history rewritten using '{target_author_name}'.\n"
f"2. Folder renamed to '{target_repo_name}'.\n"
f"3. New profile '{target_repo_name}' created (old profile removed).\n\n"
f"NEXT STEPS:\n"
f"- Manually create the '{target_repo_name}' repository on your local Gitea.\n"
f"- Copy its URL into the 'Remote URL' field for the new profile.\n"
f"- Click 'Apply Config' and then 'Push' to upload."
)
result_payload["result"] = {"new_profile_name": target_repo_name}
else:
# No folder rename: only author rewrite performed
log_handler.log_info(f"[Worker] Skipping folder rename and profile creation (author-only mode).", func_name=func_name)
result_payload["status"] = "success"
result_payload["message"] = (
f"Author-only transformation successful!\n\n"
f"Repository history was rewritten using '{target_author_name}'.\n"
f"No folder rename or profile changes were performed as requested.\n\n"
f"NEXT STEPS:\n"
f"- If you want the repository folder renamed or a new profile created, rerun the transformation with a different target repository name."
)
except (GitCommandError, IOError, ValueError, Exception) as e:
log_handler.log_exception(f"[Worker] EXCEPTION during transformation: {e}", func_name=func_name)
result_payload["message"] = f"Transformation failed: {e}"
result_payload["exception"] = e
if 'target_repo_path' in locals() and os.path.exists(target_repo_path) and not os.path.exists(source_repo_path):
try:
os.rename(target_repo_path, source_repo_path)
log_handler.log_warning(f"[Worker] Attempted to revert folder rename.", func_name=func_name)
result_payload["message"] += "\n\nAttempted to roll back folder rename."
except Exception as rollback_e:
log_handler.log_error(f"[Worker] Failed to roll back folder rename: {rollback_e}", func_name=func_name)
finally:
# Pulisci il file temporaneo mailmap
if mailmap_temp_file and os.path.exists(mailmap_temp_file):
try:
os.remove(mailmap_temp_file)
log_handler.log_debug(f"[Worker] Cleaned up temporary mailmap file.", func_name=func_name)
except OSError:
pass
results_queue.put(result_payload)
log_handler.log_info(f"[Worker] Finished: Transformation of '{source_repo_basename}'", func_name=func_name)