SXXXXXXX_GitUtility/gitutility/app.py
2025-05-05 16:03:03 +02:00

4126 lines
187 KiB
Python

# --- FILE: gitsync_tool/app.py ---
import os
import datetime
import tkinter as tk
from tkinter import messagebox, filedialog # Import specifici da tkinter
import logging # Mantenuto per usare i livelli (es. logging.DEBUG)
import re
import threading
import queue
import traceback
import sys
from typing import Callable, List, Dict, Any, Tuple, Optional
# ---<<< MODIFICA IMPORT: Usa percorsi assoluti dal pacchetto >>>---
# Configuration management
from gitutility.config.config_manager import ConfigManager
from gitutility.config.config_manager import (
DEFAULT_PROFILE,
DEFAULT_BACKUP_DIR,
DEFAULT_REMOTE_NAME,
)
# Core logic handlers
from gitutility.core.action_handler import ActionHandler
from gitutility.core.backup_handler import BackupHandler
from gitutility.core.remote_actions import RemoteActionHandler
from gitutility.core.wiki_updater import WikiUpdater
# Command execution wrapper
from gitutility.commands.git_commands import GitCommands, GitCommandError
# Logging system
from gitutility.logging_setup import log_handler # Funzioni per inviare log alla coda
from gitutility.logging_setup.logger_config import (
setup_file_logging,
) # Funzione setup file log
# GUI components (dai nuovi moduli specifici)
from gitutility.gui.main_frame import MainFrame
from gitutility.gui.editors import GitignoreEditorWindow
from gitutility.gui.dialogs import (
CreateTagDialog,
CreateBranchDialog,
CloneFromRemoteDialog,
)
from gitutility.gui.diff_summary_viewer import DiffSummaryWindow
from gitutility.gui.commit_detail_window import CommitDetailWindow
from gitutility.gui.diff_viewer import DiffViewerWindow
# Asynchronous operations
from gitutility.async_tasks import async_workers # Worker functions
from gitutility.async_tasks.async_result_handler import (
AsyncResultHandler,
) # Result processor
# --- Import Version Info FOR THE WRAPPER ITSELF ---
try:
# Use absolute import based on package name
from gitutility import _version as wrapper_version
WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})"
WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}"
except ImportError:
# This might happen if you run the wrapper directly from source
# without generating its _version.py first (if you use that approach for the wrapper itself)
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
WRAPPER_BUILD_INFO = "Wrapper build time unknown"
# --- Constants for Version Generation ---
DEFAULT_VERSION = "0.0.0+unknown"
DEFAULT_COMMIT = "Unknown"
DEFAULT_BRANCH = "Unknown"
# --- End Constants ---
class GitSvnSyncApp:
"""
Main application controller class for the Git Sync Tool.
Orchestrates GUI (MainFrame) and backend actions (ActionHandler,
RemoteActionHandler, GitCommands, etc.) using asynchronous operations
via async_workers and processes results via AsyncResultHandler.
Manages application state and configuration loading/saving.
Uses a centralized logging queue (log_handler) processed here.
"""
# Constants for polling intervals (in milliseconds)
LOG_QUEUE_CHECK_INTERVAL_MS: int = 100
ASYNC_QUEUE_CHECK_INTERVAL_MS: int = 100
def __init__(self, master: tk.Tk):
"""
Initializes the application components, GUI, logging, and starts polling loops.
Args:
master (tk.Tk): The main Tkinter root window.
"""
self.master: tk.Tk = master
master.title(f"Git Utility (Bundle & Remote Manager) - {WRAPPER_APP_VERSION_STRING}")
# Define behavior on window close button press
master.protocol("WM_DELETE_WINDOW", self.on_closing)
# Initial logging (console only, file logging setup later)
print("Initializing GitSvnSyncApp...")
# Use log_handler early (it just puts in queue, processed later)
log_handler.log_debug(
"GitSvnSyncApp initialization started.", func_name="__init__"
)
# --- Initialize Core Backend Components ---
# Instantiate managers and handlers first
try:
# Config manager loads config on init
self.config_manager: ConfigManager = ConfigManager()
self.git_commands: GitCommands = GitCommands()
self.backup_handler: BackupHandler = BackupHandler()
self.wiki_updater: WikiUpdater = WikiUpdater(self.git_commands)
# Action Handlers depend on GitCommands and BackupHandler
self.action_handler: ActionHandler = ActionHandler(
self.git_commands, self.backup_handler
)
self.remote_action_handler: RemoteActionHandler = RemoteActionHandler(
self.git_commands
)
# Internal state variables
self.remote_auth_status: str = (
"unknown" # ok, required, failed, connection_failed, etc.
)
self.current_local_branch: Optional[str] = (
None # Store the currently checked-out branch name
)
print("Core backend components initialized.")
log_handler.log_debug(
"Core backend components initialized successfully.",
func_name="__init__",
)
except Exception as e:
print(
f"FATAL: Failed to initialize core backend components: {e}",
file=sys.stderr,
)
log_handler.log_critical(
f"Failed to initialize core backend components: {e}",
func_name="__init__",
)
# Show error graphically if possible before exiting
self.show_fatal_error(
f"Initialization Error:\n{e}\n\nApplication cannot start."
)
# Ensure window closes even if init fails
self.master.after(10, self.on_closing) # Schedule closing
return # Stop initialization
# --- Initialize Graphical User Interface (GUI) ---
# Create the main application window frame
try:
print("Creating MainFrame GUI...")
log_handler.log_debug("Creating MainFrame GUI.", func_name="__init__")
# Instantiate the MainFrame, passing necessary controller methods as callbacks
# Use keyword arguments for clarity and robustness
self.main_frame: MainFrame = MainFrame(
master=master,
# Profile Callbacks
load_profile_settings_cb=self.load_profile_settings,
save_profile_cb=self.save_profile_settings,
add_profile_cb=self.add_profile,
remove_profile_cb=self.remove_profile,
clone_remote_repo_cb=self.clone_remote_repo,
# Path/Status Callbacks
browse_folder_cb=self.browse_folder,
update_svn_status_cb=self.update_svn_status_indicator,
# Repo/Bundle Callbacks
prepare_svn_for_git_cb=self.prepare_svn_for_git,
create_git_bundle_cb=self.create_git_bundle,
fetch_from_git_bundle_cb=self.fetch_from_git_bundle,
open_gitignore_editor_cb=self.open_gitignore_editor,
# Backup Callbacks
manual_backup_cb=self.manual_backup,
# Commit/Changes Callbacks
commit_changes_cb=self.commit_changes,
refresh_changed_files_cb=self.refresh_changed_files_list,
open_diff_viewer_cb=self.open_diff_viewer, # Diff from changes list
add_selected_file_cb=self.add_selected_file,
# Tags Callbacks
refresh_tags_cb=self.refresh_tag_list,
create_tag_cb=self.create_tag,
checkout_tag_cb=self.checkout_tag,
# Branches Callbacks (Local)
refresh_branches_cb=self.refresh_branch_list,
create_branch_cb=self.create_branch,
checkout_branch_cb=self.checkout_branch,
delete_local_branch_cb=self.delete_local_branch,
merge_local_branch_cb=self.merge_local_branch,
# History/Compare Callbacks
refresh_history_cb=self.refresh_commit_history,
compare_branch_with_current_cb=self.compare_branch_with_current,
view_commit_details_cb=self.view_commit_details, # View commit details
# Remote Callbacks
apply_remote_config_cb=self.apply_remote_config,
check_connection_auth_cb=self.check_connection_auth,
fetch_remote_cb=self.fetch_remote,
pull_remote_cb=self.pull_remote,
push_remote_cb=self.push_remote,
push_tags_remote_cb=self.push_tags_remote,
refresh_remote_status_cb=self.refresh_remote_status,
refresh_remote_branches_cb=self.refresh_remote_branches,
checkout_remote_branch_cb=self.checkout_remote_branch_as_local,
# Dependencies
config_manager_instance=self.config_manager, # Pass instance if needed by GUI
profile_sections_list=self.config_manager.get_profile_sections(), # Pass initial profiles
update_gitea_wiki_cb=self.update_gitea_wiki,
)
print("MainFrame GUI created.")
log_handler.log_debug(
"MainFrame GUI created successfully.", func_name="__init__"
)
except Exception as e:
print(f"FATAL: Failed to initialize MainFrame GUI: {e}", file=sys.stderr)
log_handler.log_exception(
"Failed to initialize MainFrame GUI.", func_name="__init__"
)
self.show_fatal_error(
f"GUI Initialization Error:\n{e}\n\nApplication cannot start."
)
self.master.after(10, self.on_closing)
return # Stop initialization
# --- Setup Logging Processing ---
# Configures file logging and starts the GUI log queue poller
self._setup_logging_processing()
# --- Log Application Start ---
log_handler.log_info(
"Git Utility Tool application starting up.", func_name="__init__"
)
# --- Initial Profile Load ---
# Loads the initially selected profile (default or first) into the GUI fields
# and triggers initial data refreshes.
self._perform_initial_load()
# Log completion of initialization
log_handler.log_info(
"Git Utility Tool initialization complete and ready.", func_name="__init__"
)
# --- Static Method Helper ---
# (Metodo _extract_path_from_status_line INVARIATO)
@staticmethod
def _extract_path_from_status_line(file_status_line: str) -> Optional[str]:
# ... (Codice invariato)
func_name: str = "_extract_path_from_status_line"
try:
line: str = file_status_line.strip("\x00").strip()
if not line or len(line) < 3:
log_handler.log_warning(
f"Invalid/short status line received: '{file_status_line}'",
func_name=func_name,
)
return None
path_part: str = ""
if "->" in line:
path_part = line.split("->")[-1].strip()
else:
match = re.match(r"^[ MARCUD?!]{1,2}\s+(.*)", line)
if match:
path_part = match.group(1)
else:
log_handler.log_warning(
f"Could not match expected status line format: '{line}'",
func_name=func_name,
)
first_space_index: int = line.find(" ")
if first_space_index != -1:
path_part = line[first_space_index:].strip()
else:
return None
if not path_part:
return None
relative_path: str
if (
len(path_part) >= 2
and path_part.startswith('"')
and path_part.endswith('"')
):
relative_path = path_part[1:-1]
else:
relative_path = path_part
try:
if "\\" in relative_path:
decoded_bytes: bytes = (
bytes(relative_path, "utf-8")
.decode("unicode_escape")
.encode("latin-1")
)
decoded_path: str = decoded_bytes.decode("utf-8")
if decoded_path != relative_path and os.path.normpath(decoded_path):
log_handler.log_debug(
f"Path '{relative_path}' decoded to '{decoded_path}'",
func_name=func_name,
)
relative_path = decoded_path
except Exception as decode_err:
log_handler.log_warning(
f"Could not decode potential escape sequences in path '{relative_path}': {decode_err}",
func_name=func_name,
)
if not relative_path:
return None
log_handler.log_debug(
f"Cleaned path from status line: '{relative_path}'", func_name=func_name
)
return relative_path
except Exception as e:
log_handler.log_exception(
f"Error cleaning path from status line '{file_status_line}': {e}",
func_name=func_name,
)
return None
# --- Logging Setup and Processing ---
# (Metodi _setup_logging_processing, _process_log_queue INVARIATI)
def _setup_logging_processing(self):
# ... (Codice invariato)
func_name: str = "_setup_logging_processing"
try:
setup_file_logging(level=logging.DEBUG)
# Log DEBUG+ a file
if hasattr(self, "main_frame") and hasattr(self.main_frame, "log_text"):
log_handler.log_info(
"Starting log queue processing for GUI.", func_name=func_name
)
self.master.after(
self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue
)
else:
print(
"ERROR: Cannot start log queue processing - GUI log widget not found.",
file=sys.stderr,
)
log_handler.log_error(
"Cannot start log queue processing - GUI log widget not found.",
func_name=func_name,
)
except Exception as e:
print(f"ERROR during logging setup: {e}", file=sys.stderr)
log_handler.log_exception(
"Failed to setup logging processing.", func_name=func_name
)
def _process_log_queue(self):
# ... (Codice invariato)
func_name: str = "_process_log_queue"
log_widget: Optional[tk.scrolledtext.ScrolledText] = getattr(
self.main_frame, "log_text", None
)
if not log_widget or not log_widget.winfo_exists():
log_handler.log_warning(
"Log widget not found, stopping queue processing.", func_name=func_name
)
return
processed_count: int = 0
max_proc_per_cycle: int = 50
while not log_handler.log_queue.empty():
if processed_count >= max_proc_per_cycle:
log_handler.log_debug(
f"Processed {max_proc_per_cycle} log entries, pausing.",
func_name=func_name,
)
break
try:
log_entry: dict = log_handler.log_queue.get_nowait()
level: int = log_entry.get("level", logging.INFO)
message: str = log_entry.get("message", "<empty log message>")
level_name: str = log_handler.get_log_level_name(level)
logging.getLogger().log(level, message)
processed_count += 1 # Scrive su file handler
if level >= logging.DEBUG: # Aggiorna GUI
try:
original_state: str = log_widget.cget("state")
log_widget.config(state=tk.NORMAL)
log_widget.insert(tk.END, message + "\n", (level_name,))
log_widget.see(tk.END)
log_widget.config(state=original_state)
except tk.TclError as e_gui:
print(
f"TclError updating log widget: {e_gui} - Message: {message}",
file=sys.stderr,
)
except Exception as e_gui:
print(
f"Error updating log widget: {e_gui} - Message: {message}",
file=sys.stderr,
)
except queue.Empty:
break
except Exception as e_proc:
print(f"Error processing log queue item: {e_proc}", file=sys.stderr)
try:
logging.getLogger().error(
f"Error processing log queue item: {e_proc}"
)
except Exception:
pass
if self.master.winfo_exists():
self.master.after(self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue)
# --- Initial Application Load ---
# (Metodo _perform_initial_load INVARIATO)
def _perform_initial_load(self):
# ... (Codice invariato)
func_name: str = "_perform_initial_load"
log_handler.log_debug("Performing initial profile load.", func_name=func_name)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot perform initial load: MainFrame not ready.", func_name=func_name
)
return
initial_profile: str = self.main_frame.profile_var.get()
if initial_profile:
log_handler.log_debug(
f"Loading initial profile: '{initial_profile}'", func_name=func_name
)
self.load_profile_settings(initial_profile)
else:
log_handler.log_warning(
"No initial profile set (no profiles found?).", func_name=func_name
)
self._clear_and_disable_fields()
self.main_frame.update_status_bar(
"No profiles found. Please add a profile."
)
# --- Application Closing Handler ---
# (Metodo on_closing INVARIATO)
def on_closing(self):
# ... (Codice invariato)
func_name: str = "on_closing"
log_handler.log_info("Application closing initiated.", func_name=func_name)
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
try:
self.main_frame.update_status_bar("Exiting...")
except Exception:
pass
if self.master and self.master.winfo_exists():
self.master.destroy()
log_handler.log_info("Application closed.", func_name=func_name)
# --- Profile Management Callbacks ---
# (Metodi load_profile_settings, save_profile_settings, add_profile, remove_profile INVARIATI rispetto a versione precedente con controllo URL)
def load_profile_settings(self, profile_name: str):
# ... (Codice Invariato - incluso controllo URL vuoto prima di check_connection) ...
func_name: str = "load_profile_settings"
log_handler.log_info(
f"Loading settings for profile: '{profile_name}'", func_name=func_name
)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot load profile: Main frame not available.", func_name=func_name
)
return
self.main_frame.update_status_bar(
f"Processing: Loading profile '{profile_name}'..."
)
if (
not profile_name
or profile_name not in self.config_manager.get_profile_sections()
):
log_handler.log_warning(
f"Profile '{profile_name}' invalid/not found.", func_name=func_name
)
self._clear_and_disable_fields()
if profile_name:
self.main_frame.show_error(
"Profile Load Error", f"Profile '{profile_name}' not found."
)
status_msg: str = (
f"Error: Profile '{profile_name}' not found."
if profile_name
else "No profile selected."
)
self.main_frame.update_status_bar(status_msg)
return
cm: ConfigManager = self.config_manager
keys_with_defaults: dict = cm._get_expected_keys_with_defaults()
settings: dict = {}
for key, default_value in keys_with_defaults.items():
settings[key] = cm.get_profile_option(
profile_name, key, fallback=default_value
)
mf: MainFrame = self.main_frame
repo_path_for_refresh: str = ""
remote_url_loaded: str = ""
try:
mf.svn_path_entry.delete(0, tk.END)
svn_path_value: str = settings.get("svn_working_copy_path", "")
mf.svn_path_entry.insert(0, svn_path_value)
repo_path_for_refresh = svn_path_value
mf.usb_path_entry.delete(0, tk.END)
mf.usb_path_entry.insert(0, settings.get("usb_drive_path", ""))
mf.bundle_name_entry.delete(0, tk.END)
mf.bundle_name_entry.insert(0, settings.get("bundle_name", ""))
mf.bundle_updated_name_entry.delete(0, tk.END)
mf.bundle_updated_name_entry.insert(
0, settings.get("bundle_name_updated", "")
)
mf.autobackup_var.set(
str(settings.get("autobackup", "False")).lower() == "true"
)
mf.backup_dir_var.set(settings.get("backup_dir", DEFAULT_BACKUP_DIR))
mf.backup_exclude_extensions_var.set(
settings.get("backup_exclude_extensions", "")
)
mf.backup_exclude_dirs_var.set(settings.get("backup_exclude_dirs", ""))
mf.toggle_backup_dir()
mf.autocommit_var.set(
str(settings.get("autocommit", "False")).lower() == "true"
)
mf.clear_commit_message()
if mf.commit_message_text.winfo_exists():
current_state = mf.commit_message_text.cget("state")
mf.commit_message_text.config(state=tk.NORMAL)
mf.commit_message_text.insert("1.0", settings.get("commit_message", ""))
mf.commit_message_text.config(state=current_state)
if hasattr(mf, "remote_url_var") and hasattr(mf, "remote_name_var"):
remote_url_loaded = settings.get("remote_url", "")
mf.remote_url_var.set(remote_url_loaded)
mf.remote_name_var.set(settings.get("remote_name", DEFAULT_REMOTE_NAME))
else:
log_handler.log_warning(
"Remote URL/Name widgets not found in GUI during load.",
func_name=func_name,
)
log_handler.log_info(
f"Applied settings from '{profile_name}' to GUI fields.",
func_name=func_name,
)
repo_path_for_refresh = mf.svn_path_entry.get() # Rileggi per sicurezza
self.update_svn_status_indicator(repo_path_for_refresh)
is_ready: bool = self._is_repo_ready(repo_path_for_refresh)
if is_ready:
log_handler.log_info(
"Repo ready, triggering async refreshes.", func_name=func_name
)
self.refresh_tag_list()
self.refresh_branch_list()
self.refresh_commit_history()
self.refresh_changed_files_list()
if remote_url_loaded: # Controlla URL prima di check connessione
log_handler.log_debug(
"Remote URL found, initiating connection check.",
func_name=func_name,
)
self.check_connection_auth()
self.refresh_remote_status()
else:
log_handler.log_info(
"Remote URL is empty. Skipping connection check.",
func_name=func_name,
)
self._update_gui_auth_status("unknown")
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: (Remote not configured)"
)
if hasattr(self.main_frame, "refresh_sync_status_button"):
self.main_frame.refresh_sync_status_button.config(
state=tk.DISABLED
)
mf.update_status_bar(
f"Profile '{profile_name}' loaded (Remote not configured)."
)
else:
log_handler.log_info(
"Repo not ready, clearing dynamic lists.", func_name=func_name
)
self._update_gui_for_not_ready_state()
mf.update_status_bar(
f"Profile '{profile_name}' loaded (Repo not ready)."
)
except Exception as e:
log_handler.log_exception(
f"Error applying settings for '{profile_name}': {e}",
func_name=func_name,
)
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: Error"
)
mf.show_error("Profile Load Error", f"Failed to apply settings:\n{e}")
mf.update_status_bar(f"Error loading profile '{profile_name}'.")
def save_profile_settings(self) -> bool:
""" Saves current GUI field values to the selected profile in the config file. """
func_name: str = "save_profile_settings"
profile_name_from_var: Optional[str] = None
if hasattr(self, "main_frame") and hasattr(self.main_frame, "profile_var"):
try:
profile_name_from_var = self.main_frame.profile_var.get()
print(f"DEBUG [save_profile_settings]: Value from self.main_frame.profile_var.get() is: {repr(profile_name_from_var)}")
log_handler.log_debug(f"Value from self.main_frame.profile_var.get() is: {repr(profile_name_from_var)}", func_name=func_name)
if hasattr(self.main_frame, "profile_dropdown"):
current_combobox_value = self.main_frame.profile_dropdown.get()
print(f"DEBUG [save_profile_settings]: Value from self.main_frame.profile_dropdown.get() is: {repr(current_combobox_value)}")
log_handler.log_debug(f"Value from self.main_frame.profile_dropdown.get() is: {repr(current_combobox_value)}", func_name=func_name)
except Exception as e_get:
print(f"ERROR [save_profile_settings]: Failed to get profile_var value: {e_get}")
log_handler.log_error(f"Failed to get profile_var value: {e_get}", func_name=func_name)
profile_name_from_var = None
else:
print("DEBUG [save_profile_settings]: main_frame or profile_var not found.")
log_handler.log_error("main_frame or profile_var not found during save.", func_name=func_name)
# Usa il valore recuperato (o stringa vuota se None)
profile_name: str = profile_name_from_var if profile_name_from_var is not None else ""
# ---<<< NUOVO DEBUG: Controlla il tipo e il risultato del check >>>---
print(f"DEBUG [save_profile_settings]: Type of profile_name: {type(profile_name)}")
print(f"DEBUG [save_profile_settings]: Result of 'not profile_name' check: {not profile_name}")
log_handler.log_debug(f"Type of profile_name: {type(profile_name)}", func_name=func_name)
log_handler.log_debug(f"Result of 'not profile_name' check: {not profile_name}", func_name=func_name)
# ---<<< FINE NUOVO DEBUG >>>---
# Il controllo originale
if not profile_name:
log_handler.log_warning(
"Save failed: No profile selected (profile_name is empty or evaluates to False).", func_name=func_name # Messaggio leggermente modificato
)
if hasattr(self, "main_frame"):
self.main_frame.update_status_bar("Save failed: No profile selected.")
return False
log_handler.log_info(f"Saving settings for profile: '{profile_name}'", ...) # OK
mf: MainFrame = self.main_frame
cm: ConfigManager = self.config_manager
status_final: str = "STATUS_NOT_SET_YET" # Inizializza a un valore univoco
success: bool = False
exception_details: Optional[str] = None # Per memorizzare l'errore se c'è
try:
# ---<<< DEBUG: Dentro il TRY >>>---
print(f"DEBUG [save_profile_settings]: Entering TRY block.")
settings_to_save: dict = {
"svn_working_copy_path": mf.svn_path_entry.get(),
"usb_drive_path": mf.usb_path_entry.get(),
"bundle_name": mf.bundle_name_entry.get(),
"bundle_name_updated": mf.bundle_updated_name_entry.get(),
"autocommit": str(mf.autocommit_var.get()),
"commit_message": mf.get_commit_message(),
"autobackup": str(mf.autobackup_var.get()),
"backup_dir": mf.backup_dir_var.get(),
"backup_exclude_extensions": mf.backup_exclude_extensions_var.get(),
"backup_exclude_dirs": mf.backup_exclude_dirs_var.get(),
"remote_url": mf.remote_url_var.get(),
"remote_name": mf.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME,
}
print(f"DEBUG [save_profile_settings]: Settings gathered from GUI.") # OK
log_handler.log_debug(f"Settings to save: {settings_to_save}", ...) # OK
for key, value in settings_to_save.items():
cm.set_profile_option(profile_name, key, value)
print(f"DEBUG [save_profile_settings]: Options set in ConfigManager object.") # OK
cm.save_config() # <-- POTENZIALE PUNTO DI ERRORE
# Se arriviamo qui, save_config non ha sollevato eccezioni
print(f"DEBUG [save_profile_settings]: cm.save_config() completed successfully.") # OK
log_handler.log_info(f"Settings saved successfully for '{profile_name}'.", ...) # OK
status_final = f"Profile '{profile_name}' saved." # Imposta messaggio successo
success = True
print(f"DEBUG [save_profile_settings]: TRY block success. status_final = '{status_final}'") # OK
except Exception as e:
# ---<<< DEBUG: Dentro EXCEPT >>>---
print(f"ERROR [save_profile_settings]: EXCEPTION caught in TRY block: {type(e).__name__} - {e}")
exception_details = f"{type(e).__name__}: {e}"
log_handler.log_exception(f"Error saving profile '{profile_name}': {e}", ...) # OK
status_final = f"Error saving profile '{profile_name}'." # Imposta messaggio errore
print(f"DEBUG [save_profile_settings]: EXCEPT block executed. status_final = '{status_final}'") # OK
mf.show_error("Save Error", f"Failed:\n{e}") # OK
success = False
finally:
# ---<<< DEBUG: Dentro FINALLY >>>---
print(f"DEBUG [save_profile_settings]: Entering FINALLY block.")
print(f"DEBUG [save_profile_settings]: Value of status_final BEFORE update_status_bar: '{status_final}'") # <-- CONTROLLA QUESTO
mf.update_status_bar(status_final)
print(f"DEBUG [save_profile_settings]: update_status_bar called with message: '{status_final}'") # OK
if exception_details:
print(f"DEBUG [save_profile_settings]: Exception recorded: {exception_details}")
return success
def add_profile(self):
# ... (Codice invariato)
func_name: str = "add_profile"
log_handler.log_debug("'Add Profile' button clicked.", func_name=func_name)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
self.main_frame.update_status_bar("Adding new profile...")
name: Optional[str] = self.main_frame.ask_new_profile_name()
if name is None:
log_handler.log_info("Add profile cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Add profile cancelled.")
return
name = name.strip()
if not name:
log_handler.log_warning("Add failed: Name empty.", func_name=func_name)
self.main_frame.show_error("Input Error", "Profile name cannot be empty.")
self.main_frame.update_status_bar("Add failed: Empty name.")
return
if name in self.config_manager.get_profile_sections():
log_handler.log_warning(
f"Add failed: '{name}' exists.", func_name=func_name
)
self.main_frame.show_error("Error", f"Profile '{name}' already exists.")
self.main_frame.update_status_bar(f"Add failed: '{name}' exists.")
return
log_handler.log_info(
f"Attempting to add new profile: '{name}'", func_name=func_name
)
status_final: str = "Ready."
try:
defaults: dict = self.config_manager._get_expected_keys_with_defaults()
defaults["bundle_name"] = f"{name}_repo.bundle"
defaults["bundle_name_updated"] = f"{name}_update.bundle"
defaults["svn_working_copy_path"] = ""
defaults["usb_drive_path"] = ""
defaults["remote_url"] = ""
defaults["commit_message"] = f"Initial commit for profile {name}"
self.config_manager.add_section(name)
for key, value in defaults.items():
self.config_manager.set_profile_option(name, key, value)
self.config_manager.save_config()
log_handler.log_info(
f"Profile '{name}' added successfully.", func_name=func_name
)
sections: list = self.config_manager.get_profile_sections()
self.main_frame.update_profile_dropdown(sections)
self.main_frame.profile_var.set(name)
except Exception as e:
log_handler.log_exception(
f"Error adding profile '{name}': {e}", func_name=func_name
)
status_final = f"Error adding profile '{name}'."
self.main_frame.show_error("Add Error", f"Failed:\n{e}")
self.main_frame.update_status_bar(status_final)
def remove_profile(self):
# ... (Codice invariato)
func_name: str = "remove_profile"
log_handler.log_debug("'Remove Profile' button clicked.", func_name=func_name)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
profile: str = self.main_frame.profile_var.get()
if not profile:
log_handler.log_warning(
"Remove failed: No profile selected.", func_name=func_name
)
self.main_frame.show_error("Error", "No profile selected.")
self.main_frame.update_status_bar("Remove failed: No profile.")
return
if profile == DEFAULT_PROFILE:
log_handler.log_warning(
"Attempt remove default denied.", func_name=func_name
)
self.main_frame.show_error(
"Denied", f"Cannot remove default profile ('{DEFAULT_PROFILE}')."
)
self.main_frame.update_status_bar("Cannot remove default.")
return
if self.main_frame.ask_yes_no(
title="Confirm Remove",
message=f"Remove profile '{profile}'?\nThis cannot be undone.",
):
log_handler.log_info(
f"Attempting remove profile: '{profile}'", func_name=func_name
)
self.main_frame.update_status_bar(
f"Processing: Removing profile '{profile}'..."
)
status_final: str = "Ready."
try:
removed: bool = self.config_manager.remove_profile_section(profile)
if removed:
self.config_manager.save_config()
log_handler.log_info(
f"Profile '{profile}' removed.", func_name=func_name
)
status_final = f"Profile '{profile}' removed."
sections: list = self.config_manager.get_profile_sections()
self.main_frame.update_profile_dropdown(
sections
) # Selecting new profile triggers load
else:
log_handler.log_error(
f"Failed remove profile '{profile}' (ConfigManager returned False).",
func_name=func_name,
)
status_final = f"Error removing profile '{profile}'."
self.main_frame.show_error(
"Error", f"Could not remove '{profile}'."
)
self.main_frame.update_status_bar(status_final)
except Exception as e:
log_handler.log_exception(
f"Error removing profile '{profile}': {e}", func_name=func_name
)
status_final = f"Error removing profile '{profile}'."
self.main_frame.show_error("Error", f"Failed:\n{e}")
self.main_frame.update_status_bar(status_final)
else:
log_handler.log_info("Profile removal cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Removal cancelled.")
# --- GUI Interaction & Helper Methods ---
# (Metodi browse_folder, update_svn_status_indicator, _calculate_fetch_button_state, _is_repo_ready, _parse_exclusions,
# _get_and_validate_svn_path, _get_and_validate_usb_path, _clear_and_disable_fields, show_fatal_error,
# show_comparison_summary, _update_gui_for_not_ready_state, _update_gui_for_detached_head,
# _update_gui_for_no_upstream, _reenable_widgets_after_modal, _update_gui_for_status_error,
# _update_gui_auth_status - INVARIATI)
def browse_folder(self, entry_widget: tk.Entry): # Example of unchanged method
# ... (Codice invariato)
func_name: str = "browse_folder"
current_path: str = entry_widget.get()
initial_dir: str = os.path.expanduser("~")
if current_path and os.path.isdir(current_path):
initial_dir = current_path
elif current_path and os.path.exists(os.path.dirname(current_path)):
initial_dir = os.path.dirname(current_path)
log_handler.log_debug(
f"Opening folder browser. Initial: {initial_dir}", func_name=func_name
)
directory: Optional[str] = filedialog.askdirectory(
initialdir=initial_dir, title="Select Directory", parent=self.master
)
if directory:
log_handler.log_debug(
f"Directory selected: {directory}", func_name=func_name
)
entry_widget.delete(0, tk.END)
entry_widget.insert(0, directory)
if (
hasattr(self.main_frame, "svn_path_entry")
and entry_widget == self.main_frame.svn_path_entry
):
self.update_svn_status_indicator(directory)
else:
log_handler.log_debug("Folder browse cancelled.", func_name=func_name)
def update_svn_status_indicator(self, svn_path: str): # Example of unchanged method
# ... (Codice invariato)
func_name: str = "update_svn_status_indicator"
is_valid_dir: bool = bool(svn_path and os.path.isdir(svn_path))
is_repo_ready: bool = is_valid_dir and os.path.exists(
os.path.join(svn_path, ".git")
)
log_handler.log_debug(
f"Updating repo status indicator. Path='{svn_path}', ValidDir={is_valid_dir}, Ready={is_repo_ready}",
func_name=func_name,
)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
mf.update_svn_indicator(is_repo_ready)
repo_ready_state: str = tk.NORMAL if is_repo_ready else tk.DISABLED
valid_dir_state: str = tk.NORMAL if is_valid_dir else tk.DISABLED
prepare_state: str = (
tk.NORMAL if is_valid_dir and not is_repo_ready else tk.DISABLED
)
fetch_button_state: str = self._calculate_fetch_button_state(
mf, svn_path, is_repo_ready
)
remote_url_present = bool(hasattr(self, 'remote_url_var') and self.remote_url_var.get().strip())
wiki_button_state = tk.NORMAL if is_repo_ready and remote_url_present else tk.DISABLED
try: # Apply states
if hasattr(mf, "prepare_svn_button"):
mf.prepare_svn_button.config(state=prepare_state)
if hasattr(mf, "create_bundle_button"):
mf.create_bundle_button.config(state=repo_ready_state)
if hasattr(mf, "fetch_bundle_button"):
mf.fetch_bundle_button.config(state=fetch_button_state)
if hasattr(mf, "edit_gitignore_button"):
mf.edit_gitignore_button.config(state=repo_ready_state)
if hasattr(mf, "manual_backup_button"):
mf.manual_backup_button.config(state=valid_dir_state)
if hasattr(mf, "autocommit_checkbox"):
mf.autocommit_checkbox.config(state=repo_ready_state)
if hasattr(mf, "commit_message_text"):
mf.commit_message_text.config(state=repo_ready_state)
if hasattr(mf, "refresh_changes_button"):
mf.refresh_changes_button.config(state=repo_ready_state)
if hasattr(mf, "commit_button"):
mf.commit_button.config(state=repo_ready_state)
if hasattr(mf, "refresh_tags_button"):
mf.refresh_tags_button.config(state=repo_ready_state)
if hasattr(mf, "create_tag_button"):
mf.create_tag_button.config(state=repo_ready_state)
if hasattr(mf, "checkout_tag_button"):
mf.checkout_tag_button.config(state=repo_ready_state)
if hasattr(mf, "tag_listbox"):
mf.tag_listbox.config(state=repo_ready_state)
if hasattr(mf, "refresh_branches_button"):
mf.refresh_branches_button.config(state=repo_ready_state)
if hasattr(mf, "create_branch_button"):
mf.create_branch_button.config(state=repo_ready_state)
if hasattr(mf, "checkout_branch_button"):
mf.checkout_branch_button.config(state=repo_ready_state)
if hasattr(mf, "branch_listbox"):
mf.branch_listbox.config(state=repo_ready_state)
if hasattr(mf, "refresh_history_button"):
mf.refresh_history_button.config(state=repo_ready_state)
if hasattr(mf, "history_branch_filter_combo"):
combo_state: str = "readonly" if is_repo_ready else tk.DISABLED
mf.history_branch_filter_combo.config(state=combo_state)
# History treeview gestito in set_action_widgets_state
if hasattr(mf, "apply_remote_config_button"):
mf.apply_remote_config_button.config(state=repo_ready_state)
if hasattr(mf, "check_auth_button"):
mf.check_auth_button.config(state=repo_ready_state)
if hasattr(mf, "fetch_button"):
mf.fetch_button.config(state=repo_ready_state)
if hasattr(mf, "pull_button"):
mf.pull_button.config(state=repo_ready_state)
if hasattr(mf, "push_button"):
mf.push_button.config(state=repo_ready_state)
if hasattr(mf, "push_tags_button"):
mf.push_tags_button.config(state=repo_ready_state)
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=repo_ready_state)
if hasattr(mf, "refresh_remote_branches_button"):
mf.refresh_remote_branches_button.config(state=repo_ready_state)
if hasattr(mf, "remote_branches_listbox"):
mf.remote_branches_listbox.config(state=repo_ready_state)
if hasattr(self, "update_wiki_button"):
self.update_wiki_button.config(state=wiki_button_state)
if hasattr(mf, "local_branches_listbox_remote_tab"):
mf.local_branches_listbox_remote_tab.config(state=repo_ready_state)
if hasattr(mf, "refresh_local_branches_button_remote_tab"):
mf.refresh_local_branches_button_remote_tab.config(
state=repo_ready_state
)
if hasattr(mf, "changed_files_listbox"):
if not is_repo_ready:
log_handler.log_debug(
"Repo not ready, clearing changes list.", func_name=func_name
)
mf.update_changed_files_list(["(Repository not ready)"])
except Exception as e:
log_handler.log_error(
f"Error updating widget states: {e}", func_name=func_name
)
def _calculate_fetch_button_state(
self, main_frame: "MainFrame", svn_path: str, is_repo_ready: bool
) -> str: # Example of unchanged method
# ... (Codice invariato)
func_name: str = "_calculate_fetch_button_state"
try:
can_use_svn_dir_for_clone: bool = False
if svn_path:
if os.path.isdir(svn_path):
try:
if not os.listdir(svn_path):
can_use_svn_dir_for_clone = True
except OSError:
pass
else:
parent_dir: str = os.path.dirname(svn_path)
if (parent_dir and os.path.isdir(parent_dir)) or not parent_dir:
can_use_svn_dir_for_clone = True
bundle_file_exists: bool = False
usb_path_str: str = main_frame.usb_path_entry.get().strip()
bundle_fetch_name: str = main_frame.bundle_updated_name_entry.get().strip()
if usb_path_str and bundle_fetch_name and os.path.isdir(usb_path_str):
bundle_full_path: str = os.path.join(usb_path_str, bundle_fetch_name)
if os.path.isfile(bundle_full_path):
bundle_file_exists = True
if is_repo_ready or (can_use_svn_dir_for_clone and bundle_file_exists):
return tk.NORMAL
else:
return tk.DISABLED
except Exception as e:
log_handler.log_error(
f"Error checking fetch button state: {e}", func_name=func_name
)
return tk.DISABLED
def _is_repo_ready(self, repo_path: str) -> bool: # Example of unchanged method
return bool(
repo_path
and os.path.isdir(repo_path)
and os.path.exists(os.path.join(repo_path, ".git"))
)
def _parse_exclusions(
self,
) -> tuple[set[str], set[str]]: # Example of unchanged method
excluded_extensions: set[str] = set()
excluded_dirs: set[str] = {".git", ".svn"}
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return excluded_extensions, excluded_dirs
mf: MainFrame = self.main_frame
ext_string: str = mf.backup_exclude_extensions_var.get()
if ext_string:
for ext in ext_string.split(","):
cleaned_ext: str = ext.strip().lower()
if cleaned_ext:
normalized_ext: str = "." + cleaned_ext.lstrip(".")
excluded_extensions.add(normalized_ext)
dir_string: str = mf.backup_exclude_dirs_var.get()
if dir_string:
for dirname in dir_string.split(","):
cleaned_dir: str = dirname.strip().lower().strip(os.path.sep + "/")
if (
cleaned_dir
and cleaned_dir not in {".", ".."}
and cleaned_dir not in excluded_dirs
):
excluded_dirs.add(cleaned_dir)
log_handler.log_debug(
f"Parsed Exclusions - Exts: {excluded_extensions}, Dirs: {excluded_dirs}",
func_name="_parse_exclusions",
)
return excluded_extensions, excluded_dirs
def _get_and_validate_svn_path(
self, operation_name: str = "Operation"
) -> Optional[str]: # Example of unchanged method
func_name: str = "_get_and_validate_svn_path"
mf = getattr(self, "main_frame", None)
if not mf or not hasattr(mf, "svn_path_entry"):
log_handler.log_error(
f"{operation_name} failed: SVN path entry missing.", func_name=func_name
)
return None
path_str: str = mf.svn_path_entry.get().strip()
if not path_str:
log_handler.log_warning(
f"{operation_name} failed: Working Directory path is empty.",
func_name=func_name,
)
mf.show_error("Input Error", "Working Directory path cannot be empty.")
mf.update_status_bar(f"{operation_name} failed: Path empty.")
return None
abs_path: str = os.path.abspath(path_str)
if not os.path.isdir(abs_path):
log_handler.log_warning(
f"{operation_name} failed: Path is not a valid directory: {abs_path}",
func_name=func_name,
)
mf.show_error("Path Error", f"Path is not a valid directory:\n{abs_path}")
mf.update_status_bar(f"{operation_name} failed: Not a directory.")
return None
log_handler.log_debug(
f"{operation_name}: Using validated Working Directory path: {abs_path}",
func_name=func_name,
)
return abs_path
def _get_and_validate_usb_path(
self, operation_name: str = "Operation"
) -> Optional[str]: # Example of unchanged method
func_name: str = "_get_and_validate_usb_path"
mf = getattr(self, "main_frame", None)
if not mf or not hasattr(mf, "usb_path_entry"):
log_handler.log_error(
f"{operation_name} failed: Bundle Target path entry missing.",
func_name=func_name,
)
return None
path_str: str = mf.usb_path_entry.get().strip()
if not path_str:
log_handler.log_warning(
f"{operation_name} failed: Bundle Target path is empty.",
func_name=func_name,
)
mf.show_error("Input Error", "Bundle Target path cannot be empty.")
mf.update_status_bar(f"{operation_name} failed: Path empty.")
return None
abs_path: str = os.path.abspath(path_str)
if not os.path.isdir(abs_path):
log_handler.log_warning(
f"{operation_name} failed: Path is not a valid directory: {abs_path}",
func_name=func_name,
)
mf.show_error("Path Error", f"Path is not a valid directory:\n{abs_path}")
mf.update_status_bar(f"{operation_name} failed: Not a directory.")
return None
log_handler.log_debug(
f"{operation_name}: Using validated Bundle Target path: {abs_path}",
func_name=func_name,
)
return abs_path
def _clear_and_disable_fields(self): # Example of unchanged method
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Clearing and disabling fields.", func_name="_clear_and_disable_fields"
)
if hasattr(mf, "svn_path_entry"):
mf.svn_path_entry.delete(0, tk.END)
if hasattr(mf, "usb_path_entry"):
mf.usb_path_entry.delete(0, tk.END)
if hasattr(mf, "bundle_name_entry"):
mf.bundle_name_entry.delete(0, tk.END)
if hasattr(mf, "bundle_updated_name_entry"):
mf.bundle_updated_name_entry.delete(0, tk.END)
if hasattr(mf, "clear_commit_message"):
mf.clear_commit_message()
if hasattr(mf, "backup_dir_var"):
mf.backup_dir_var.set("")
if hasattr(mf, "backup_exclude_extensions_var"):
mf.backup_exclude_extensions_var.set("")
if hasattr(mf, "backup_exclude_dirs_var"):
mf.backup_exclude_dirs_var.set("")
if hasattr(mf, "remote_url_var"):
mf.remote_url_var.set("")
if hasattr(mf, "remote_name_var"):
mf.remote_name_var.set("")
if hasattr(mf, "autobackup_var"):
mf.autobackup_var.set(False)
if hasattr(mf, "autocommit_var"):
mf.autocommit_var.set(False)
if hasattr(mf, "toggle_backup_dir"):
mf.toggle_backup_dir()
self._update_gui_for_not_ready_state()
self.update_svn_status_indicator("")
if hasattr(mf, "remove_profile_button"):
mf.remove_profile_button.config(state=tk.DISABLED)
if hasattr(mf, "save_settings_button"):
mf.save_settings_button.config(state=tk.DISABLED)
mf.update_status_bar("No profile selected or repository not ready.")
def show_fatal_error(self, message: str): # Example of unchanged method
log_handler.log_critical(
f"FATAL ERROR: {message}", func_name="show_fatal_error"
)
parent_window: Optional[tk.Tk] = None
try:
if hasattr(self, "master") and self.master and self.master.winfo_exists():
parent_window = self.master
messagebox.showerror("Fatal Error", message, parent=parent_window)
except Exception as e:
print(f"FATAL ERROR (GUI message failed: {e}): {message}", file=sys.stderr)
finally:
self.on_closing()
def show_comparison_summary(
self, ref1: str, ref2: str, repo_path: str, changed_files: List[str]
): # Example of unchanged method
func_name: str = "show_comparison_summary"
log_handler.log_debug(
f"Attempting to show comparison summary: {ref1} vs {ref2}",
func_name=func_name,
)
if not all([ref1, ref2, repo_path, isinstance(changed_files, list)]):
log_handler.log_error(
"Missing data required to show comparison summary.", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Display Error", "Internal error: Missing data for comparison."
)
self._reenable_widgets_after_modal()
return
try:
log_handler.log_debug(
f"Opening DiffSummaryWindow with {len(changed_files)} files.",
func_name=func_name,
)
DiffSummaryWindow(
master=self.master,
git_commands=self.git_commands,
repo_path=repo_path,
ref1=ref1,
ref2=ref2,
changed_files_status=changed_files,
)
log_handler.log_info(
"Diff Summary window closed by user.", func_name=func_name
)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Ready.")
except Exception as e_summary:
log_handler.log_exception(
f"Error opening diff summary window: {e_summary}", func_name=func_name
)
if hasattr(self.main_frame, "show_error") and hasattr(
self.main_frame, "update_status_bar"
):
self.main_frame.show_error(
"Display Error",
f"Could not display comparison results:\n{e_summary}",
)
self.main_frame.update_status_bar("Error displaying comparison.")
finally:
self._reenable_widgets_after_modal()
def _update_gui_for_not_ready_state(self): # Example of unchanged method
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'Repo Not Ready' state.",
func_name="_update_gui_for_not_ready_state",
)
if hasattr(mf, "update_tag_list"):
mf.update_tag_list([("(Repo not ready)", "")])
if hasattr(mf, "update_branch_list"):
mf.update_branch_list([], None)
if hasattr(mf, "update_history_display"):
mf.update_history_display(["(Repo not ready)"])
if hasattr(mf, "update_history_branch_filter"):
mf.update_history_branch_filter([])
if hasattr(mf, "update_changed_files_list"):
mf.update_changed_files_list(["(Repo not ready)"])
if hasattr(mf, "update_remote_branches_list"):
mf.update_remote_branches_list(["(Repo not ready)"])
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(status_text="Sync Status: (Repo not ready)")
def _update_gui_for_detached_head(
self, current_branch_name: Optional[str]
): # Example of unchanged method
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'Detached HEAD' state.",
func_name="_update_gui_for_detached_head",
)
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(
current_branch=current_branch_name,
status_text="Sync Status: (Detached HEAD)",
)
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=tk.DISABLED)
def _update_gui_for_no_upstream(
self, current_branch_name: Optional[str]
): # Example of unchanged method
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'No Upstream' state.",
func_name="_update_gui_for_no_upstream",
)
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(
current_branch=current_branch_name,
status_text=f"Sync Status: Upstream not set",
)
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=tk.DISABLED)
def _reenable_widgets_after_modal(self): # Example of unchanged method
func_name: str = "_reenable_widgets_after_modal"
if hasattr(self, "master") and self.master.winfo_exists():
self.master.after(50, self._reenable_widgets_if_ready)
log_handler.log_debug("Scheduled widget re-enable.", func_name=func_name)
else:
log_handler.log_warning(
"Cannot schedule widget re-enable: Master window destroyed.",
func_name=func_name,
)
def _update_gui_for_status_error(self): # Example of unchanged method
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'Status Error' state.",
func_name="_update_gui_for_status_error",
)
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(status_text="Sync Status: Error getting info")
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=tk.DISABLED)
def _update_gui_auth_status(self, status: str): # Example of unchanged method
self.remote_auth_status = status
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
self.main_frame._update_auth_status_indicator(status)
if status != "ok" and hasattr(
self.main_frame, "update_ahead_behind_status"
):
sync_status_text: str = (
f"Sync Status: ({status.replace('_', ' ').title()})"
)
self.main_frame.update_ahead_behind_status(status_text=sync_status_text)
# --- Async Operation Starter/Handler ---
# (Metodi _start_async_operation, _check_completion_queue e helper INVARIATI)
def _start_async_operation(
self, worker_func: Callable, args_tuple: tuple, context_dict: dict
):
# ... (Codice invariato)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot start async op: Main frame missing.",
func_name="_start_async_operation",
)
return
context_name: str = context_dict.get("context", "unknown_op")
status_msg: str = context_dict.get("status_msg", context_name)
log_handler.log_info(
f"--- Action Triggered: {context_name} (Async Start) ---",
func_name=context_name,
)
self.main_frame.set_action_widgets_state(tk.DISABLED)
self.main_frame.update_status_bar(
message=f"Processing: {status_msg}...",
bg_color=self.main_frame.STATUS_YELLOW,
)
results_queue: queue.Queue = queue.Queue(maxsize=1)
full_args: tuple = args_tuple + (results_queue,)
log_handler.log_debug(
f"Creating worker thread for {context_name}. Worker func: {worker_func.__name__}",
func_name="_start_async_operation",
)
try:
worker_thread = threading.Thread(
target=worker_func, args=full_args, daemon=True
)
log_handler.log_debug(
f"Starting worker thread for {context_name}.",
func_name="_start_async_operation",
)
worker_thread.start()
except Exception as thread_e:
log_handler.log_exception(
f"Failed to start worker thread for {context_name}: {thread_e}",
func_name="_start_async_operation",
)
self.main_frame.show_error(
"Threading Error",
f"Could not start background task for {context_name}.",
)
self.main_frame.update_status_bar(
f"Error starting task: {context_name}",
bg_color=self.main_frame.STATUS_RED,
duration_ms=10000,
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
return
log_handler.log_debug(
f"Scheduling completion check for {context_name}.",
func_name="_start_async_operation",
)
self.master.after(
self.ASYNC_QUEUE_CHECK_INTERVAL_MS,
self._check_completion_queue,
results_queue,
context_dict,
)
def _check_completion_queue(self, results_queue: queue.Queue, context: dict):
# ... (Codice invariato)
task_context: str = context.get("context", "unknown")
func_name: str = "_check_completion_queue"
try:
result_data: Dict[str, Any] = results_queue.get_nowait()
log_handler.log_info(
f"Result received for '{task_context}'. Status: {result_data.get('status')}",
func_name=func_name,
)
should_reenable_now: bool = self._should_reenable_widgets_now(
task_context, result_data.get("status")
)
if not should_reenable_now:
log_handler.log_debug(
f"Postponing widget re-enable for context: {task_context}",
func_name=func_name,
)
if should_reenable_now:
self._reenable_widgets_if_ready()
self._update_status_bar_from_result(task_context, result_data)
self._process_result_with_handler(result_data, context)
except queue.Empty:
self._reschedule_queue_check(results_queue, context)
except Exception as e:
self._handle_queue_check_error(e, task_context)
def _should_reenable_widgets_now(
self, task_context: str, status: Optional[str]
) -> bool: # Example of unchanged method
if task_context == "check_connection" and status == "auth_required":
return False
if task_context == "interactive_auth" and status == "success":
return False
if task_context == "clone_remote" and status == "success":
return False
if (
task_context
in ["checkout_tracking_branch", "checkout_branch", "checkout_tag"]
and status == "success"
):
return False
if (
task_context in ["pull_remote", "merge_local_branch"]
and status == "conflict"
):
return False
if task_context == "compare_branches" and status == "success":
return False
if task_context == "get_commit_details" and status == "success":
return False # Non riabilitare finché finestra dettagli non è chiusa
return True
def _reenable_widgets_if_ready(self): # Example of unchanged method
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
log_handler.log_debug(
"Re-enabling widgets now.", func_name="_reenable_widgets_if_ready"
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
else:
log_handler.log_warning(
"Cannot re-enable widgets, MainFrame missing.",
func_name="_reenable_widgets_if_ready",
)
def _update_status_bar_from_result(
self, task_context: str, result_data: dict
): # Example of unchanged method
status: Optional[str] = result_data.get("status")
message: str = result_data.get("message", "Operation finished.")
skip_update: bool = False
if task_context == "clone_remote" and status == "success":
skip_update = True
if (
task_context
in ["checkout_tracking_branch", "checkout_branch", "checkout_tag"]
and status == "success"
):
skip_update = True
if task_context == "compare_branches" and status == "success":
skip_update = True
if task_context == "get_commit_details" and status == "success":
skip_update = True # Non aggiornare finché finestra dettagli non chiusa
if status in ["conflict", "rejected"]:
skip_update = True # Messaggi specifici gestiti dall'handler
if (
not skip_update
and hasattr(self, "main_frame")
and self.main_frame.winfo_exists()
):
status_color: Optional[str] = None
reset_duration: int = 5000
if status == "success":
status_color = self.main_frame.STATUS_GREEN
elif status == "warning":
status_color = self.main_frame.STATUS_YELLOW
reset_duration = 7000
elif status == "auth_required":
status_color = self.main_frame.STATUS_YELLOW
reset_duration = 15000
# Rimosso conflict/rejected da qui, gestiti da handler specifico
elif status == "error":
status_color = self.main_frame.STATUS_RED
reset_duration = 10000
self.main_frame.update_status_bar(
message, bg_color=status_color, duration_ms=reset_duration
)
def _process_result_with_handler(
self, result_data: dict, context: dict
): # Example of unchanged method
task_context: str = context.get("context", "unknown")
func_name: str = "_process_result_with_handler"
try:
result_handler = AsyncResultHandler(self)
result_handler.process(result_data, context)
log_handler.log_debug(
f"Result processing delegated for context '{task_context}'.",
func_name=func_name,
)
except Exception as handler_e:
log_handler.log_exception(
f"Error during result processing by handler for {task_context}: {handler_e}",
func_name=func_name,
)
if hasattr(self, "main_frame"):
self.main_frame.show_error(
"Processing Error", f"Failed to handle task result:\n{handler_e}"
)
if (
not self._should_reenable_widgets_now(
task_context, result_data.get("status")
)
and hasattr(self.main_frame, "winfo_exists")
and self.main_frame.winfo_exists()
):
log_handler.log_warning(
"Re-enabling widgets after handler error.", func_name=func_name
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
def _reschedule_queue_check(
self, results_queue: queue.Queue, context: dict
): # Example of unchanged method
if hasattr(self, "master") and self.master.winfo_exists():
self.master.after(
self.ASYNC_QUEUE_CHECK_INTERVAL_MS,
self._check_completion_queue,
results_queue,
context,
)
def _handle_queue_check_error(
self, error: Exception, task_context: str
): # Example of unchanged method
func_name: str = "_handle_queue_check_error"
log_handler.log_exception(
f"Critical error checking completion queue for {task_context}: {error}",
func_name=func_name,
)
try:
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
self.main_frame.set_action_widgets_state(tk.NORMAL)
self.main_frame.update_status_bar(
"Error processing async result.",
bg_color=self.main_frame.STATUS_RED,
duration_ms=10000,
)
self._update_gui_for_status_error()
except Exception as recovery_e:
log_handler.log_error(
f"Failed to recover GUI after queue processing error: {recovery_e}",
func_name=func_name,
)
# --- Specific Action Launchers ---
# (Metodi refresh_*, prepare_*, create_*, fetch_*, manual_backup, commit_changes,
# open_gitignore_editor, _handle_gitignore_save, add_selected_file,
# create_tag, checkout_tag, create_branch, checkout_branch, delete_local_branch,
# merge_local_branch, compare_branch_with_current, view_commit_details,
# apply_remote_config, check_connection_auth, fetch_remote, pull_remote,
# push_remote, push_tags_remote, clone_remote_repo, refresh_remote_branches,
# checkout_remote_branch_as_local - INVARIATI rispetto a versione precedente)
# Example: refresh_tag_list
def refresh_tag_list(self):
func_name: str = "refresh_tag_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Tags")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Tags skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state()
return
args: tuple = (self.git_commands, svn_path)
self._start_async_operation(
worker_func=async_workers.run_refresh_tags_async,
args_tuple=args,
context_dict={"context": "refresh_tags", "status_msg": "Refreshing tags"},
)
# ... (Implementa o copia TUTTI gli altri metodi launcher qui, sono invariati nella loro logica interna) ...
# Example: push_remote
def push_remote(self):
func_name: str = "push_remote"
log_handler.log_info(
f"--- Action Triggered: Push Branch to Remote ---", func_name=func_name
)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
svn_path: Optional[str] = self._get_and_validate_svn_path("Push Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Push Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path invalid or not prepared."
)
self.main_frame.update_status_bar("Push failed: Repo not ready.")
return
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
if self.remote_auth_status != "ok":
auth_msg: str = f"Cannot Push to '{remote_name}':\n"
# ... (build auth msg) ...
if self.remote_auth_status == "required":
auth_msg += "Authentication is required..."
elif self.remote_auth_status == "failed":
auth_msg += "Authentication previously failed..."
elif self.remote_auth_status == "connection_failed":
auth_msg += "Connection previously failed..."
else:
auth_msg += "Connection status is unknown or in error..."
log_handler.log_warning(
f"Push Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.",
func_name=func_name,
)
self.main_frame.show_warning("Action Blocked", auth_msg)
self.main_frame.update_status_bar(f"Push failed: {self.remote_auth_status}")
return
try:
if self.git_commands.git_status_has_changes(svn_path):
if not self.main_frame.ask_yes_no(
"Uncommitted Changes",
"There are uncommitted changes.\nPush anyway? (Only committed changes will be pushed)",
):
self.main_frame.update_status_bar(
"Push cancelled (uncommitted changes)."
)
return
except GitCommandError as status_err:
log_handler.log_error(
f"Push aborted: Failed to check status: {status_err}",
func_name=func_name,
)
self.main_frame.show_error(
"Status Error", f"Could not check repo status:\n{status_err}"
)
return
log_handler.log_info(
f"Starting push for current branch to remote '{remote_name}'...",
func_name=func_name,
)
args: tuple = (
self.remote_action_handler,
self.git_commands,
svn_path,
remote_name,
)
self._start_async_operation(
worker_func=async_workers.run_push_remote_async,
args_tuple=args,
context_dict={
"context": "push_remote",
"status_msg": f"Pushing current branch to '{remote_name}'",
"remote_name": remote_name,
},
)
# ... Aggiungi TUTTI gli altri metodi launcher qui ...
# --- Helper per Suggestion Tag ---
# (Metodo _generate_next_tag_suggestion INVARIATO)
def _generate_next_tag_suggestion(self, svn_path: str) -> str:
# ... (Codice invariato)
func_name: str = "_generate_next_tag_suggestion"
log_handler.log_debug("Generating next tag suggestion...", func_name=func_name)
default_suggestion: str = "v.0.0.0.1"
latest_valid_tag: Optional[str] = None
tag_pattern = re.compile(r"^v\.(\d+)\.(\d+)\.(\d+)\.(\d+)$")
try:
tags_data: list[tuple[str, str]] = self.git_commands.list_tags(svn_path)
if not tags_data:
log_handler.log_debug("No existing tags found.", func_name=func_name)
return default_suggestion
for tag_name, _ in tags_data:
match = tag_pattern.match(tag_name)
if match:
latest_valid_tag = tag_name
log_handler.log_debug(
f"Found latest tag matching pattern: {latest_valid_tag}",
func_name=func_name,
)
break
if not latest_valid_tag:
log_handler.log_debug(
"No tags matched pattern. Suggesting default.", func_name=func_name
)
return default_suggestion
match = tag_pattern.match(latest_valid_tag)
if not match:
log_handler.log_error(
f"Internal error: Could not re-match tag {latest_valid_tag}",
func_name=func_name,
)
return default_suggestion
v1, v2, v3, v4 = map(int, match.groups())
limit: int = 99
v4 += 1
if v4 > limit:
v4 = 0
v3 += 1
if v3 > limit:
v3 = 0
v2 += 1
if v2 > limit:
v2 = 0
v1 += 1
next_tag: str = f"v.{v1}.{v2}.{v3}.{v4}"
log_handler.log_debug(
f"Generated suggestion: {next_tag}", func_name=func_name
)
return next_tag
except Exception as e:
log_handler.log_exception(
f"Error generating tag suggestion: {e}", func_name=func_name
)
return default_suggestion
def clone_remote_repo(self):
"""
Handles the 'Clone Remote Repository' action initiated from the GUI.
Opens a dialog to get parameters and starts the asynchronous clone worker.
"""
func_name: str = "clone_remote_repo"
log_handler.log_info(
f"--- Action Triggered: Clone Remote Repository ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot start clone: Main frame not available.", func_name=func_name
)
return
# Show modal dialog using the imported class
dialog = CloneFromRemoteDialog(self.master)
# dialog.result will be None if cancelled, or (url, parent_dir, profile_name_input)
dialog_result: Optional[Tuple[str, str, str]] = dialog.result
if not dialog_result:
log_handler.log_info(
"Clone operation cancelled by user in dialog.", func_name=func_name
)
self.main_frame.update_status_bar("Clone cancelled.")
return
# Extract data from dialog result
remote_url, local_parent_dir, profile_name_input = dialog_result
# --- Derive target directory and profile name, validate paths ---
final_profile_name: str = ""
target_clone_dir: str = ""
repo_name_from_url: str = ""
try:
# Derive repo name from URL (remove .git suffix)
repo_name_from_url = os.path.basename(remote_url)
if repo_name_from_url.lower().endswith(".git"):
repo_name_from_url = repo_name_from_url[:-4]
if not repo_name_from_url:
raise ValueError("Could not derive repository name from URL.")
# Construct full target path for the clone
target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url)
# Normalize the path
target_clone_dir = os.path.abspath(target_clone_dir)
# Determine final profile name (use input or derive from repo name)
if profile_name_input:
final_profile_name = profile_name_input
# Check if proposed profile name already exists
if final_profile_name in self.config_manager.get_profile_sections():
raise ValueError(
f"Profile name '{final_profile_name}' already exists. "
f"Please choose a different name."
)
else:
# Use repo name as base, add counter if it exists
final_profile_name = repo_name_from_url
counter: int = 1
# Check against existing sections from config manager
while final_profile_name in self.config_manager.get_profile_sections():
final_profile_name = f"{repo_name_from_url}_{counter}"
counter += 1
log_handler.log_debug(
f"Derived target clone directory: {target_clone_dir}",
func_name=func_name
)
log_handler.log_debug(
f"Determined profile name: {final_profile_name}",
func_name=func_name
)
# --- CRITICAL CHECK: Target directory must NOT exist for 'git clone' ---
if os.path.exists(target_clone_dir):
error_msg: str = (
f"Clone failed: Target directory already exists:\n{target_clone_dir}\n"
f"Please choose a different parent directory or ensure the target location is empty."
)
log_handler.log_error(error_msg, func_name=func_name)
self.main_frame.show_error("Clone Path Error", error_msg)
self.main_frame.update_status_bar("Clone failed: Target directory exists.")
return # Stop the operation
except ValueError as ve:
# Handle errors deriving names or validating profile name
log_handler.log_error(
f"Clone configuration error: {ve}", func_name=func_name
)
self.main_frame.show_error("Configuration Error", str(ve))
self.main_frame.update_status_bar("Clone failed: Configuration error.")
return
except Exception as e:
# Handle unexpected errors during preparation
log_handler.log_exception(
f"Unexpected error preparing for clone: {e}", func_name=func_name
)
self.main_frame.show_error(
"Internal Error", f"An unexpected error occurred:\n{e}"
)
self.main_frame.update_status_bar("Clone failed: Internal error.")
return
# --- Start Asynchronous Worker ---
log_handler.log_info(
f"Starting clone for '{remote_url}' into '{target_clone_dir}'...",
func_name=func_name
)
# Prepare arguments for the worker function
args: tuple = (self.git_commands, remote_url, target_clone_dir, final_profile_name)
# Start the async operation using the generic starter method
self._start_async_operation(
worker_func=async_workers.run_clone_remote_async, # Worker function for cloning
args_tuple=args,
context_dict={
"context": "clone_remote", # Context identifier for result handler
"status_msg": f"Cloning '{repo_name_from_url}'...", # Message for status bar
"clone_success_data": { # Pass data needed by result handler for profile creation
"profile_name": final_profile_name,
"cloned_path": target_clone_dir,
"remote_url": remote_url,
}
}
)
# --- Local Repo / Bundle / Backup Actions ---
def prepare_svn_for_git(self):
""" Starts async operation to prepare the repository (init, gitignore). """
func_name: str ="prepare_svn_for_git"
svn_path: Optional[str] = self._get_and_validate_svn_path("Prepare Repository")
# Check if path is valid before starting
if not svn_path:
# Error message shown by validation method
self.main_frame.update_status_bar("Prepare failed: Invalid path.")
return
# Check if already prepared (avoid unnecessary work)
if self._is_repo_ready(svn_path):
log_handler.log_info(
"Prepare skipped: Repository already prepared.", func_name=func_name
)
self.main_frame.show_info("Info", "Repository is already prepared.")
# Ensure GUI state reflects readiness
self.update_svn_status_indicator(svn_path)
return
# Prepare arguments
args: tuple = (self.action_handler, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_prepare_async,
args_tuple=args,
context_dict={
"context": "prepare_repo",
"status_msg": "Preparing repository"
}
)
def create_git_bundle(self):
""" Starts async operation to create a Git bundle file. """
func_name: str ="create_git_bundle"
# Gather and validate inputs from GUI
profile: str = self.main_frame.profile_var.get()
svn_path: Optional[str] = self._get_and_validate_svn_path("Create Bundle")
usb_path: Optional[str] = self._get_and_validate_usb_path("Create Bundle")
bundle_name: str = self.main_frame.bundle_name_entry.get().strip()
# Check if all required inputs are present
if not profile or not svn_path or not usb_path or not bundle_name:
log_handler.log_warning(
"Create Bundle cancelled: Missing inputs.", func_name=func_name
)
# Specific error messages shown by validation methods
return
# Check if repository is ready
if not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Create Bundle failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
self.main_frame.update_status_bar("Create Bundle failed: Repo not ready.")
return
# Ensure bundle name has the correct extension
if not bundle_name.lower().endswith(".bundle"):
bundle_name += ".bundle"
bundle_full_path: str = os.path.join(usb_path, bundle_name)
# Save profile settings before starting the operation
if not self.save_profile_settings():
# Ask user if they want to proceed even if saving failed
if not self.main_frame.ask_yes_no(
"Warning",
"Could not save profile settings.\nContinue creating bundle anyway?"
):
self.main_frame.update_status_bar(
"Create Bundle cancelled (profile save failed)."
)
return
# Prepare parameters for the worker
excluded_extensions: set[str]
excluded_dirs: set[str]
excluded_extensions, excluded_dirs = self._parse_exclusions()
backup_enabled: bool = self.main_frame.autobackup_var.get()
backup_dir: str = self.main_frame.backup_dir_var.get()
commit_enabled: bool = self.main_frame.autocommit_var.get()
commit_msg: str = self.main_frame.get_commit_message()
# Prepare arguments tuple for the worker
args: tuple = (
self.action_handler,
svn_path,
bundle_full_path,
profile,
backup_enabled,
backup_dir,
commit_enabled,
commit_msg,
excluded_extensions,
excluded_dirs,
)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_create_bundle_async,
args_tuple=args,
context_dict={
"context": "create_bundle",
"status_msg": f"Creating bundle '{bundle_name}'",
"committed_flag_possible": True, # Context hint for result handler
}
)
def fetch_from_git_bundle(self):
""" Starts async operation to fetch/clone from a Git bundle file. """
func_name: str ="fetch_from_git_bundle"
# Gather and validate inputs
profile: str = self.main_frame.profile_var.get()
# svn_path_str can be a non-existent dir if cloning
svn_path_str: str = self.main_frame.svn_path_entry.get().strip()
usb_path: Optional[str] = self._get_and_validate_usb_path("Fetch Bundle")
bundle_name: str = self.main_frame.bundle_updated_name_entry.get().strip()
# Check for missing inputs
if not profile or not svn_path_str or not usb_path or not bundle_name:
log_handler.log_warning(
"Fetch Bundle cancelled: Missing inputs.", func_name=func_name
)
return
# Construct full bundle path and check if it exists BEFORE starting async op
bundle_full_path: str = os.path.join(usb_path, bundle_name)
if not os.path.isfile(bundle_full_path):
log_handler.log_error(
f"Fetch Bundle failed: Bundle file not found at '{bundle_full_path}'",
func_name=func_name
)
self.main_frame.show_error(
"File Not Found", f"Bundle file not found:\n{bundle_full_path}"
)
self.main_frame.update_status_bar("Fetch failed: Bundle not found.")
return
# Save profile settings before action
if not self.save_profile_settings():
if not self.main_frame.ask_yes_no(
"Warning",
"Could not save profile settings.\nContinue fetching from bundle anyway?"
):
self.main_frame.update_status_bar(
"Fetch cancelled (profile save failed)."
)
return
# Prepare parameters for worker
excluded_extensions: set[str]
excluded_dirs: set[str]
excluded_extensions, excluded_dirs = self._parse_exclusions()
backup_enabled: bool = self.main_frame.autobackup_var.get()
backup_dir: str = self.main_frame.backup_dir_var.get()
# Prepare arguments tuple
args: tuple = (
self.action_handler,
svn_path_str,
bundle_full_path,
profile,
backup_enabled,
backup_dir,
excluded_extensions,
excluded_dirs,
)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_fetch_bundle_async,
args_tuple=args,
context_dict={
"context": "fetch_bundle",
"status_msg": f"Fetching from '{bundle_name}'",
"repo_path": svn_path_str, # Pass path for potential conflict message
}
)
def show_commit_details(self, commit_details: Dict[str, Any]):
"""
Opens the CommitDetailWindow to display details of a specific commit.
Called by the AsyncResultHandler after fetching commit data.
Args:
commit_details (Dict[str, Any]): A dictionary containing commit metadata
(hash, author, date, subject, body)
and a list of changed files.
"""
func_name: str = "show_commit_details"
log_handler.log_debug(
f"Attempting to show commit details for hash: {commit_details.get('hash_full', 'N/A')}",
func_name=func_name
)
# Validazione base dei dati ricevuti
if not isinstance(commit_details, dict) or not commit_details.get('hash_full'):
log_handler.log_error(
"Invalid or incomplete commit details received.", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Display Error", "Internal error: Received invalid commit data."
)
# Riabilita widget se i dati non sono validi
self._reenable_widgets_after_modal()
return
try:
# Crea e mostra la finestra modale CommitDetailWindow
log_handler.log_debug(
f"Opening CommitDetailWindow for commit {commit_details.get('hash_full')[:7]}...",
func_name=func_name
)
CommitDetailWindow(
master=self.master, # Parent è la finestra root
commit_data=commit_details, # Passa il dizionario dei dati
# Passa il callback per aprire il diff di un file specifico
open_diff_callback=self._open_commit_file_diff
)
# Il codice attende qui finché la finestra CommitDetailWindow non viene chiusa
log_handler.log_info("Commit Detail window closed by user.", func_name=func_name)
# Ripristina la status bar dopo la chiusura della finestra
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Ready.")
except Exception as e_detail:
# Gestisci errori durante la creazione/visualizzazione della finestra
log_handler.log_exception(
f"Error opening commit detail window: {e_detail}", func_name=func_name
)
if hasattr(self.main_frame, "show_error") and hasattr(self.main_frame, "update_status_bar"):
self.main_frame.show_error(
"Display Error", f"Could not display commit details:\n{e_detail}"
)
self.main_frame.update_status_bar("Error displaying commit details.")
finally:
# Assicurati che i widget vengano riabilitati dopo che la finestra
# (o il messaggio di errore) è stata chiusa.
self._reenable_widgets_after_modal()
def open_gitignore_editor(self):
""" Opens the .gitignore editor window (Synchronous GUI action). """
# This action is synchronous as it opens a modal dialog
func_name: str = "open_gitignore_editor"
log_handler.log_info(
f"--- Action Triggered: Edit .gitignore ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
self.main_frame.update_status_bar("Processing: Opening .gitignore editor...")
svn_path: Optional[str] = self._get_and_validate_svn_path("Edit .gitignore")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Cannot edit .gitignore: Repo path invalid/not ready.",
func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Select a valid and prepared repository first."
)
self.main_frame.update_status_bar("Edit failed: Repo not ready.")
return
# Construct path and open editor window
gitignore_path: str = os.path.join(svn_path, ".gitignore")
log_handler.log_debug(
f"Target .gitignore path: {gitignore_path}", func_name=func_name
)
status_after_edit: str = "Ready." # Default status after editor closes
try:
log_handler.log_debug("Opening GitignoreEditorWindow...", func_name=func_name)
# Open the modal editor window, passing the callback for successful save
GitignoreEditorWindow(
master=self.master, # Parent window
gitignore_path=gitignore_path,
on_save_success_callback=self._handle_gitignore_save # Method to call on save
)
# Code execution pauses here until the editor window is closed
log_handler.log_debug("Gitignore editor window closed.", func_name=func_name)
# Update status bar only if no async operation was started by the callback
if not self.main_frame.status_bar_var.get().startswith("Processing"):
self.main_frame.update_status_bar(status_after_edit)
except Exception as e:
# Handle errors opening the editor
log_handler.log_exception(
f"Error opening or running .gitignore editor: {e}", func_name=func_name
)
status_after_edit = "Error opening .gitignore editor."
self.main_frame.show_error("Editor Error", f"Could not open editor:\n{e}")
self.main_frame.update_status_bar(status_after_edit)
def manual_backup(self):
""" Starts async operation for creating a manual backup ZIP. """
func_name: str ="manual_backup"
# Gather and validate inputs
profile: str = self.main_frame.profile_var.get()
svn_path: Optional[str] = self._get_and_validate_svn_path(f"Manual Backup ({profile})")
backup_dir_str: str = self.main_frame.backup_dir_var.get().strip()
# Check required inputs
if not profile or not svn_path:
# Error already shown by validation method
return
if not backup_dir_str:
log_handler.log_warning(
"Manual backup failed: Backup directory is empty.", func_name=func_name
)
self.main_frame.show_error(
"Input Error", "Backup directory cannot be empty for manual backup."
)
self.main_frame.update_status_bar("Manual backup failed: Backup dir empty.")
return
# Validate backup directory path
backup_dir_abs: str = os.path.abspath(backup_dir_str)
# Check if path exists and is not a directory (create_zip_backup handles creation)
if os.path.exists(backup_dir_abs) and not os.path.isdir(backup_dir_abs):
log_handler.log_error(
f"Manual backup failed: Backup path exists but is not a directory: {backup_dir_abs}",
func_name=func_name
)
self.main_frame.show_error(
"Path Error", f"Backup path exists but is not a directory:\n{backup_dir_abs}"
)
self.main_frame.update_status_bar(
"Manual backup failed: Invalid backup path."
)
return
# Save profile settings before backup
if not self.save_profile_settings():
if not self.main_frame.ask_yes_no(
"Warning", "Could not save profile settings.\nContinue backup anyway?"
):
self.main_frame.update_status_bar(
"Backup cancelled (profile save failed)."
)
return
# Prepare parameters for worker
excluded_extensions: set[str]
excluded_dirs: set[str]
excluded_extensions, excluded_dirs = self._parse_exclusions()
# Prepare arguments tuple
args: tuple = (
self.backup_handler,
svn_path,
backup_dir_abs,
profile,
excluded_extensions,
excluded_dirs
)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_manual_backup_async,
args_tuple=args,
context_dict={"context": "manual_backup", "status_msg": "Creating manual backup"}
)
# --- Git Actions (Commit, Tag, Branch, etc.) ---
def commit_changes(self):
""" Starts async operation to commit staged changes with GUI message. """
func_name: str = "commit_changes"
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Commit")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Commit failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Commit failed: Repo not ready.")
return
# Validate commit message
commit_msg: str = self.main_frame.get_commit_message()
if not commit_msg:
log_handler.log_warning(
"Commit failed: Commit message is empty.", func_name=func_name
)
self.main_frame.show_error("Input Error", "Commit message cannot be empty.")
self.main_frame.update_status_bar("Commit failed: Empty message.")
return
# Prepare args and start async operation
args: tuple = (self.action_handler, svn_path, commit_msg)
self._start_async_operation(
worker_func=async_workers.run_commit_async,
args_tuple=args,
context_dict={
"context": "commit",
"status_msg": "Committing changes",
"committed_flag_possible": True # Hint for result handler
}
)
def refresh_changed_files_list(self):
""" Starts async operation to refresh the list of changed files. """
func_name: str ="refresh_changed_files_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Changes skipped: Repo not ready.", func_name=func_name
)
# GUI update handled by _update_gui_for_not_ready_state if needed
return
# Prepare arguments
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_changes_async,
args_tuple=args,
context_dict={
"context": "refresh_changes",
"status_msg": "Refreshing changed files"
}
)
def open_diff_viewer(self, file_status_line: str):
"""
Opens the Diff Viewer window for a file from the 'changed files' list.
Compares Working Directory vs HEAD by default for changed files.
Uses helper to extract path. This is a synchronous GUI action.
"""
func_name: str = "open_diff_viewer"
log_handler.log_debug(
f"Received file_status_line: {repr(file_status_line)}", func_name=func_name
)
log_handler.log_info(
f"--- Action Triggered: Open Diff Viewer for Changed File ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Open Diff Viewer")
if not svn_path:
# Error message shown by validation method
self.main_frame.update_status_bar(
"Error: Cannot open diff (invalid repo path)."
)
return
# Extract the relative path using the helper method
relative_path: Optional[str] = self._extract_path_from_status_line(
file_status_line
)
log_handler.log_debug(
f"Extracted relative_path via helper: {repr(relative_path)}",
func_name=func_name
)
# Check if path extraction was successful
if not relative_path:
log_handler.log_error(
f"Could not extract valid path from status line: {file_status_line}",
func_name=func_name
)
self.main_frame.show_error(
"Path Error",
f"Could not parse file path from selected line:\n{file_status_line}"
)
self.main_frame.update_status_bar("Error: Invalid selection for diff.")
return
# --- Check Status Code (e.g., prevent diff for deleted files vs WD) ---
status_code: str = file_status_line.strip('\x00').strip()[:2].strip()
# Prevent showing diff for files marked as Deleted (' D') against Working Dir
if status_code == 'D':
msg: str = (
f"Cannot show Working Dir vs HEAD diff for a deleted file:\n"
f"{relative_path}"
)
log_handler.log_info(msg, func_name=func_name)
self.main_frame.show_info("Diff Not Applicable", msg)
self.main_frame.update_status_bar(
"Ready (Diff not applicable for deleted file)."
)
return
# Add checks for other non-diffable statuses if needed (e.g., '??', '!!')
if status_code in ['??', '!!']:
msg: str = (
f"Cannot show diff for file with status '{status_code}':\n"
f"{relative_path}\n\n"
f"(Untracked or Ignored files cannot be diffed against HEAD)."
)
log_handler.log_info(
f"Diff not applicable for status '{status_code}'.", func_name=func_name
)
self.main_frame.show_info("Diff Not Applicable", msg)
self.main_frame.update_status_bar("Ready (Diff not applicable).")
return
# --- Open DiffViewerWindow ---
log_handler.log_debug(
f"Opening DiffViewerWindow for '{relative_path}' (Working Dir vs HEAD)",
func_name=func_name
)
status_final: str = "Ready." # Default status after closing viewer
try:
# Instantiate and display the modal DiffViewerWindow
DiffViewerWindow(
master=self.master, # Parent window
git_commands=self.git_commands, # Pass GitCommands instance
repo_path=svn_path,
relative_file_path=relative_path, # Use the cleaned path
ref1='WORKING_DIR', # Compare working directory...
ref2='HEAD' # ...against HEAD commit
)
# Code execution pauses here until the DiffViewerWindow is closed
log_handler.log_debug("Diff viewer window closed.", func_name=func_name)
status_final = "Ready."
except Exception as e:
# Handle errors opening the diff viewer window
log_handler.log_exception(
f"Error opening or running diff viewer: {e}", func_name=func_name
)
status_final = "Error: Failed to open diff viewer."
self.main_frame.show_error(
"Diff Viewer Error", f"Could not display diff:\n{e}"
)
finally:
# Update status bar after the window closes or an error occurs,
# but only if another async operation hasn't started in the meantime.
if hasattr(self, "main_frame") and not self.main_frame.status_bar_var.get().startswith("Processing"):
self.main_frame.update_status_bar(status_final)
def add_selected_file(self, file_status_line: str):
""" Starts async operation to add a selected untracked file ('??') to staging. """
func_name: str = "add_selected_file"
log_handler.log_info(
f"--- Action Triggered: Add File '{file_status_line}' (Async) ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Add File")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Add file failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Add failed: Repo not ready.")
return
# Extract relative path from the status line (should start with '??')
relative_path: Optional[str] = None
try:
line: str = file_status_line.strip("\x00").strip()
# Only proceed if the status indicates an untracked file
if line.startswith("??"):
# Extract path after '?? ' handling potential quotes
path_raw: str = line[2:].lstrip()
if len(path_raw) >= 2 and path_raw.startswith('"') and path_raw.endswith('"'):
relative_path = path_raw[1:-1]
else:
relative_path = path_raw
else:
# Show error if trying to add a non-untracked file
log_handler.log_error(
f"Cannot add non-untracked file: {line}", func_name=func_name
)
self.main_frame.show_error(
"Invalid Action",
f"Cannot 'Add' file with status '{line[:2]}'.\nUse commit for modified/staged files."
)
self.main_frame.update_status_bar("Add failed: Not an untracked file.")
return
# Check if path extraction failed
if not relative_path:
raise ValueError("Extracted relative path is empty.")
except Exception as e:
# Handle errors parsing the path
log_handler.log_error(
f"Error parsing path for add from line '{file_status_line}': {e}",
func_name=func_name
)
self.main_frame.show_error(
"Parsing Error", f"Cannot parse file path from:\n{file_status_line}"
)
self.main_frame.update_status_bar("Add failed: Parse error.")
return
# Prepare args and start the add worker
args: tuple = (self.git_commands, svn_path, relative_path)
base_filename: str = os.path.basename(relative_path) # For status message
self._start_async_operation(
worker_func=async_workers.run_add_file_async,
args_tuple=args,
context_dict={
"context": "add_file",
"status_msg": f"Adding '{base_filename}'"
}
)
def create_tag(self):
""" Handles tag creation: shows dialog, suggests name, starts async operation. """
func_name: str = "create_tag"
log_handler.log_info(
f"--- Action Triggered: Create Tag ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Create Tag")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Create Tag failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Create Tag failed: Repo not ready.")
return
# Generate suggested tag name and show dialog
self.main_frame.update_status_bar("Processing: Generating tag suggestion...")
suggested_name: str = self._generate_next_tag_suggestion(svn_path)
self.main_frame.update_status_bar("Ready for tag input.")
dialog = CreateTagDialog(self.master, suggested_tag_name=suggested_name)
tag_info: Optional[tuple[str, str]] = dialog.result # (name, message) or None
# If user provided input, start async operation
if tag_info:
tag_name, tag_message = tag_info
log_handler.log_info(f"User provided tag: '{tag_name}'", func_name=func_name)
# Prepare args
args: tuple = (self.action_handler, svn_path, tag_name, tag_message)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_create_tag_async,
args_tuple=args,
context_dict={
"context": "create_tag",
"status_msg": f"Creating tag '{tag_name}'",
"committed_flag_possible": True # Annotated tag creates commit object
}
)
else:
# User cancelled the dialog
log_handler.log_info("Tag creation cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Cancelled.")
def checkout_tag(self):
""" Handles tag checkout: confirms with user, starts async operation. """
func_name: str = "checkout_tag"
log_handler.log_info(
f"--- Action Triggered: Checkout Tag ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Tag")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Checkout Tag failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Checkout Tag failed: Repo not ready.")
return
# Get selected tag from GUI listbox
tag_name: Optional[str] = self.main_frame.get_selected_tag()
if not tag_name:
# Show error if no tag is selected
self.main_frame.show_error(
"Selection Error", "No tag selected from the list."
)
self.main_frame.update_status_bar("Checkout failed: No tag selected.")
return
# Confirm with user due to 'detached HEAD' state implications
confirmation_message: str = (
f"Checkout tag '{tag_name}'?\n\n"
f"Warning: This will put your repository in a 'detached HEAD' state. "
f"You can look around, make experimental changes and commit them, "
f"but they won't belong to any branch. "
f"Use 'Checkout Branch' to return to a branch."
)
if not self.main_frame.ask_yes_no("Confirm Checkout Tag", confirmation_message):
# User cancelled
log_handler.log_info("Tag checkout cancelled by user.", func_name=func_name)
self.main_frame.update_status_bar("Cancelled.")
return
# Prepare args and start async operation
args: tuple = (self.action_handler, svn_path, tag_name)
self._start_async_operation(
worker_func=async_workers.run_checkout_tag_async,
args_tuple=args,
context_dict={
"context": "checkout_tag",
"status_msg": f"Checking out tag '{tag_name}'"
}
)
def create_branch(self):
""" Handles branch creation: shows dialog, starts async operation. """
func_name: str = "create_branch"
log_handler.log_info(
f"--- Action Triggered: Create Branch ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Create Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Create Branch failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Create Branch failed: Repo not ready.")
return
# Show dialog to get new branch name
self.main_frame.update_status_bar("Ready for branch name input.")
dialog = CreateBranchDialog(self.master)
branch_name: Optional[str] = dialog.result # Name or None if cancelled
# If user provided a name, start async operation
if branch_name:
log_handler.log_info(
f"User provided branch name: '{branch_name}'", func_name=func_name
)
# Prepare args
args: tuple = (self.action_handler, svn_path, branch_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_create_branch_async,
args_tuple=args,
context_dict={
"context": "create_branch",
"status_msg": f"Creating branch '{branch_name}'",
"new_branch_name": branch_name # Pass name for potential checkout later
}
)
else:
# User cancelled the dialog
log_handler.log_info("Branch creation cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Cancelled.")
def checkout_branch(
self,
branch_to_checkout: Optional[str] = None,
repo_path_override: Optional[str] = None
):
"""
Handles checkout of an existing local branch.
Confirms if triggered by button, starts async operation.
Can be called directly with a branch name (e.g., after creation).
"""
func_name: str = "checkout_branch"
target_description: str = branch_to_checkout if branch_to_checkout else "Selected Branch"
log_handler.log_info(
f"--- Action Triggered: Checkout Branch (Target: {target_description}) ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Determine repository path (use override if provided)
svn_path: Optional[str] = repo_path_override or self._get_and_validate_svn_path("Checkout Branch")
# Validate repo path and readiness
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Checkout Branch failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Checkout Branch failed: Repo not ready.")
return
# Determine target branch and if confirmation is needed
target_branch: Optional[str] = branch_to_checkout
needs_confirmation: bool = False
if not target_branch:
# If no branch name passed, get selection from GUI and require confirmation
target_branch = self.main_frame.get_selected_branch()
needs_confirmation = True
# Validate if a branch was determined
if not target_branch:
self.main_frame.show_error(
"Selection Error", "No branch selected from the list."
)
self.main_frame.update_status_bar("Checkout failed: No branch selected.")
return
# Ask for confirmation only if triggered by button press (not directly called)
if needs_confirmation:
if not self.main_frame.ask_yes_no(
"Confirm Checkout Branch", f"Switch to branch '{target_branch}'?"
):
log_handler.log_info(
"Branch checkout cancelled by user.", func_name=func_name
)
self.main_frame.update_status_bar("Cancelled.")
return
# Prepare args and start async operation
args: tuple = (self.action_handler, svn_path, target_branch)
self._start_async_operation(
worker_func=async_workers.run_checkout_branch_async,
args_tuple=args,
context_dict={
"context": "checkout_branch",
"status_msg": f"Checking out branch '{target_branch}'"
}
)
def delete_local_branch(self, branch_name: str, force: bool):
""" Handles the request to delete a local branch (with confirmation). """
func_name: str = "delete_local_branch"
action_description: str = "Force delete" if force else "Delete"
log_handler.log_info(
f"--- Action Triggered: {action_description} Local Branch '{branch_name}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path(f"{action_description} Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
f"{action_description} Branch skipped: Repo not ready.",
func_name=func_name
)
self.main_frame.update_status_bar(f"{action_description} failed: Repo not ready.")
return
# --- User Confirmation ---
confirm_message: str = f"Are you sure you want to delete the local branch '{branch_name}'?"
dialog_title: str = "Confirm Delete Branch"
ask_function: Callable = self.main_frame.ask_yes_no # Default confirmation
# Add warning for force delete
if force:
confirm_message += "\n\nWARNING: Force delete will discard any unmerged changes on this branch!"
dialog_title = "Confirm Force Delete Branch"
# Ask user for confirmation
if not ask_function(dialog_title, confirm_message):
log_handler.log_info(
"Local branch deletion cancelled by user.", func_name=func_name
)
self.main_frame.update_status_bar("Delete branch cancelled.")
return
# --- Start Async Worker ---
log_handler.log_info(
f"Starting {action_description.lower()} for local branch '{branch_name}'...",
func_name=func_name
)
# Prepare arguments
args: tuple = (self.action_handler, svn_path, branch_name, force)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_delete_local_branch_async,
args_tuple=args,
context_dict={
"context": "delete_local_branch",
"status_msg": f"{action_description} branch '{branch_name}'",
"branch_name": branch_name, # Pass name for result handling
"force": force # Pass force flag
}
)
def merge_local_branch(self, branch_to_merge: str):
""" Handles the request to merge a local branch into the current branch. """
func_name: str = "merge_local_branch"
log_handler.log_info(
f"--- Action Triggered: Merge Local Branch '{branch_to_merge}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path(f"Merge Branch '{branch_to_merge}'")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Merge Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.update_status_bar("Merge failed: Repo not ready.")
return
# Get current branch for validation and confirmation message
current_branch: Optional[str] = None
try:
current_branch = self.git_commands.get_current_branch_name(svn_path)
if not current_branch:
raise ValueError("Could not determine the current branch (Detached HEAD?).")
if current_branch == branch_to_merge:
raise ValueError("Cannot merge a branch into itself.")
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Merge aborted during pre-check: {e}", func_name=func_name)
self.main_frame.show_error("Merge Error", f"Cannot start merge:\n{e}")
self.main_frame.update_status_bar("Merge failed: Pre-check error.")
return
# --- User Confirmation ---
confirm_message: str = f"Merge branch '{branch_to_merge}' into current branch '{current_branch}'?"
# Default: allow fast-forward merges without forcing a merge commit
no_ff_option: bool = False
if not self.main_frame.ask_yes_no("Confirm Merge", confirm_message):
log_handler.log_info(
"Local branch merge cancelled by user.", func_name=func_name
)
self.main_frame.update_status_bar("Merge cancelled.")
return
# --- Start Async Worker ---
log_handler.log_info(
f"Starting merge of '{branch_to_merge}' into '{current_branch}'...",
func_name=func_name
)
# Prepare arguments (pass git_commands for worker's internal checks)
args: tuple = (
self.action_handler, self.git_commands, svn_path, branch_to_merge, no_ff_option
)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_merge_local_branch_async,
args_tuple=args,
context_dict={
"context": "merge_local_branch",
"status_msg": f"Merging '{branch_to_merge}' into '{current_branch}'",
"branch_merged_into": current_branch,
"branch_merged_from": branch_to_merge,
"repo_path": svn_path, # Needed for conflict message
}
)
def compare_branch_with_current(self, other_branch_ref: str):
""" Handles comparing a selected branch (local or remote) with the current branch. """
func_name: str = "compare_branch_with_current"
log_handler.log_info(
f"--- Action Triggered: Compare Branch '{other_branch_ref}' with Current ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path(f"Compare Branch '{other_branch_ref}'")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Compare Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.update_status_bar("Compare failed: Repo not ready.")
return
# Get current branch for comparison
current_branch: Optional[str] = None
try:
current_branch = self.git_commands.get_current_branch_name(svn_path)
if not current_branch:
raise ValueError("Cannot compare: Currently in detached HEAD state.")
# Prevent comparing a branch with itself
if current_branch == other_branch_ref:
log_handler.log_warning(
"Compare Branch skipped: Cannot compare a branch with itself.",
func_name=func_name
)
self.main_frame.show_info("Compare Info", "Cannot compare a branch with itself.")
return
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Compare aborted during pre-check: {e}", func_name=func_name)
self.main_frame.show_error("Compare Error", f"Cannot start compare:\n{e}")
return
# --- Start Async Worker ---
log_handler.log_info(
f"Starting comparison between '{current_branch}' and '{other_branch_ref}'...",
func_name=func_name
)
# Prepare arguments: ref1 is current, ref2 is the other branch
args: tuple = (self.git_commands, svn_path, current_branch, other_branch_ref)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_compare_branches_async,
args_tuple=args,
context_dict={
"context": "compare_branches",
"status_msg": f"Comparing '{current_branch}' vs '{other_branch_ref}'",
"ref1": current_branch, # Pass refs for summary window
"ref2": other_branch_ref,
"repo_path": svn_path, # Pass repo path for summary window
}
)
def view_commit_details(self, history_line: str):
"""
Callback triggered by double-clicking a line in the history view.
Extracts the commit hash and starts an async worker to fetch details.
"""
func_name: str = "view_commit_details"
log_handler.log_info(
f"--- Action Triggered: View Commit Details for line: '{history_line}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("View Commit Details")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"View Commit Details skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
return
# --- Estrarre l'Hash del Commit dalla Riga di Log ---
# Assumiamo che il formato (%h) sia all'inizio della riga, seguito da spazio
commit_hash_short: Optional[str] = None
try:
parts: List[str] = history_line.split(maxsplit=1) # Divide solo al primo spazio
if parts and len(parts[0]) > 0: # Assumendo che l'hash sia la prima parte
commit_hash_short = parts[0]
# Potremmo validare che sia un hash valido (es. 7+ caratteri esadecimali)
if not re.match(r"^[0-9a-fA-F]{7,}$", commit_hash_short):
raise ValueError(f"Extracted part '{commit_hash_short}' doesn't look like a commit hash.")
else:
raise ValueError("Could not split history line to find hash.")
log_handler.log_debug(f"Extracted commit hash: {commit_hash_short}", func_name=func_name)
except Exception as e:
log_handler.log_error(
f"Could not extract commit hash from history line '{history_line}': {e}",
func_name=func_name
)
self.main_frame.show_error(
"Parsing Error", f"Could not identify commit hash in selected line:\n{history_line}"
)
return
# --- Start Async Worker to Get Commit Details ---
log_handler.log_info(
f"Fetching details for commit '{commit_hash_short}'...", func_name=func_name
)
# Prepare arguments for the worker
args: tuple = (self.git_commands, svn_path, commit_hash_short)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_get_commit_details_async, # NUOVO WORKER
args_tuple=args,
context_dict={
"context": "get_commit_details",
"status_msg": f"Loading details for commit {commit_hash_short}",
"commit_hash": commit_hash_short # Passa l'hash nel contesto
}
)
def refresh_branch_list(self):
""" Starts async operation to refresh the local branch list in the GUI. """
func_name: str ="refresh_branch_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Branches")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Branches skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state()
return
# Prepare arguments
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_branches_async, # Worker for local branches
args_tuple=args,
context_dict={"context": "refresh_branches", "status_msg": "Refreshing branches"}
)
def refresh_commit_history(self):
""" Starts async operation to refresh the commit history display. """
func_name: str ="refresh_commit_history"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh History")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh History skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state()
return
# Determine branch filter from GUI
branch_filter: Optional[str] = None
log_scope: str = "All History"
if hasattr(self.main_frame, "history_branch_filter_var"):
filter_selection: str = self.main_frame.history_branch_filter_var.get()
# Use filter only if a specific branch/tag is selected
if filter_selection and filter_selection != "-- All History --":
branch_filter = filter_selection
log_scope = f"'{branch_filter}'"
# Prepare arguments
args: tuple = (self.git_commands, svn_path, branch_filter, log_scope)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_history_async,
args_tuple=args,
context_dict={
"context": "refresh_history",
"status_msg": f"Refreshing history for {log_scope}"
}
)
def refresh_changed_files_list(self):
""" Starts async operation to refresh the list of changed files. """
func_name: str ="refresh_changed_files_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Changes skipped: Repo not ready.", func_name=func_name
)
# GUI update handled by _update_gui_for_not_ready_state if needed
return
# Prepare arguments
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_changes_async,
args_tuple=args,
context_dict={
"context": "refresh_changes",
"status_msg": "Refreshing changed files"
}
)
# --- Remote Action Launchers ---
def apply_remote_config(self):
""" Callback for 'Apply Config' button. Starts async worker. """
func_name: str = "apply_remote_config"
log_handler.log_info(
f"--- Action Triggered: Apply Remote Config ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot apply config: Main frame missing.", func_name=func_name
)
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Apply Remote Config")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Apply config skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Apply config failed: Repo not ready.")
return
# Get remote URL and name from GUI
remote_url: str = self.main_frame.remote_url_var.get().strip()
remote_name: str = self.main_frame.remote_name_var.get().strip()
# Validate remote URL
if not remote_url:
log_handler.log_warning(
"Apply config failed: Remote URL is empty.", func_name=func_name
)
self.main_frame.show_error("Input Error", "Remote URL cannot be empty.")
self.main_frame.update_status_bar("Apply config failed: URL empty.")
return
# Use default remote name if empty
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
log_handler.log_info(
f"Remote name empty, using default: '{remote_name}'",
func_name=func_name
)
self.main_frame.remote_name_var.set(remote_name)
# Save profile settings BEFORE applying to Git config
if not self.save_profile_settings():
if not self.main_frame.ask_yes_no(
"Warning",
"Could not save profile settings.\nContinue applying remote config anyway?"
):
self.main_frame.update_status_bar(
"Apply config cancelled (profile save failed)."
)
return
# Prepare args and start async operation
args: tuple = (self.remote_action_handler, svn_path, remote_name, remote_url)
self._start_async_operation(
worker_func=async_workers.run_apply_remote_config_async,
args_tuple=args,
context_dict={
"context": "apply_remote_config",
"status_msg": f"Applying config for remote '{remote_name}'"
}
)
def check_connection_auth(self):
""" Callback for 'Check Connection & Auth' button. """
func_name: str = "check_connection_auth"
log_handler.log_info(
f"--- Action Triggered: Check Connection & Auth ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Check Connection")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Check Connection skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
# Reset auth status indicator if repo not ready
self._update_gui_auth_status("unknown")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
log_handler.log_info(
f"Checking connection/auth for remote '{remote_name}'...",
func_name=func_name
)
# Update GUI indicator to 'checking' state
self._update_gui_auth_status("checking")
# Prepare args and start async operation
args: tuple = (self.git_commands, svn_path, remote_name)
self._start_async_operation(
worker_func=async_workers.run_check_connection_async,
args_tuple=args,
context_dict={
"context": "check_connection",
"status_msg": f"Checking remote '{remote_name}'",
"remote_name_checked": remote_name, # Pass context for result handler
"repo_path_checked": svn_path,
}
)
def fetch_remote(self):
""" Starts the asynchronous 'git fetch' operation. """
func_name: str = "fetch_remote"
log_handler.log_info(
f"--- Action Triggered: Fetch Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Fetch Remote")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Fetch Remote skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Fetch failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Optional: Check auth status before fetching (consider if needed)
# if self.remote_auth_status != 'ok': ... (warning/confirmation) ...
log_handler.log_info(
f"Starting fetch for remote '{remote_name}'...", func_name=func_name
)
# Prepare args and start async operation
args: tuple = (self.remote_action_handler, svn_path, remote_name)
self._start_async_operation(
worker_func=async_workers.run_fetch_remote_async,
args_tuple=args,
context_dict={
"context": "fetch_remote",
"status_msg": f"Fetching from remote '{remote_name}'"
}
)
def pull_remote(self):
""" Starts the asynchronous 'git pull' operation for the current branch. """
func_name: str = "pull_remote"
log_handler.log_info(
f"--- Action Triggered: Pull Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Pull Remote")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Pull Remote skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Pull failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Check authentication/connection status before attempting pull
if self.remote_auth_status != "ok":
auth_msg: str = f"Cannot Pull from '{remote_name}':\n"
if self.remote_auth_status == "required": auth_msg += "Authentication is required. Use 'Check Connection / Auth' first."
elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed. Use 'Check Connection / Auth' to retry."
elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed. Check URL and network."
else: auth_msg += "Connection status is unknown or in error. Use 'Check Connection / Auth' first."
log_handler.log_warning(f"Pull Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name)
self.main_frame.show_warning("Action Blocked", auth_msg)
self.main_frame.update_status_bar(f"Pull failed: {self.remote_auth_status}")
return
# Worker will get current branch name internally
log_handler.log_info(
f"Starting pull for remote '{remote_name}'...", func_name=func_name
)
# Prepare args (pass GitCommands for the worker to get current branch)
args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_pull_remote_async,
args_tuple=args,
context_dict={
"context": "pull_remote",
"status_msg": f"Pulling from remote '{remote_name}'",
"repo_path": svn_path, # Pass context for conflict messages
"remote_name": remote_name,
}
)
def push_remote(self):
""" Starts the asynchronous 'git push' operation for the current branch. """
func_name: str = "push_remote"
log_handler.log_info(
f"--- Action Triggered: Push Branch to Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Push Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Push Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Push failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Check authentication/connection status before attempting push
if self.remote_auth_status != "ok":
auth_msg: str = f"Cannot Push to '{remote_name}':\n"
# ... (Build specific auth message as in pull_remote) ...
if self.remote_auth_status == "required": auth_msg += "Authentication is required..." # Shortened for brevity
elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed..."
elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed..."
else: auth_msg += "Connection status is unknown or in error..."
log_handler.log_warning(
f"Push Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.",
func_name=func_name
)
self.main_frame.show_warning("Action Blocked", auth_msg)
self.main_frame.update_status_bar(f"Push failed: {self.remote_auth_status}")
return
# Optional: Check for uncommitted changes before push
try:
if self.git_commands.git_status_has_changes(svn_path):
if not self.main_frame.ask_yes_no(
"Uncommitted Changes",
"There are uncommitted changes in your working directory.\n"
"Push anyway? (Only committed changes will be pushed)"
):
self.main_frame.update_status_bar(
"Push cancelled by user (uncommitted changes)."
)
return
except GitCommandError as status_err:
# Handle error during status check
log_handler.log_error(
f"Push aborted: Failed to check repository status before push: {status_err}",
func_name=func_name
)
self.main_frame.show_error(
"Status Error", f"Could not check repo status:\n{status_err}"
)
return
log_handler.log_info(
f"Starting push for current branch to remote '{remote_name}'...",
func_name=func_name
)
# Worker will get current branch name
# Prepare args (pass GitCommands for the worker)
args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_push_remote_async,
args_tuple=args,
context_dict={
"context": "push_remote",
"status_msg": f"Pushing current branch to remote '{remote_name}'",
"remote_name": remote_name, # Pass context for result messages
}
)
def push_tags_remote(self):
""" Starts the asynchronous 'git push --tags' operation. """
func_name: str = "push_tags_remote"
log_handler.log_info(
f"--- Action Triggered: Push Tags to Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Push Tags")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Push Tags skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Push tags failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Check authentication/connection status
if self.remote_auth_status != "ok":
auth_msg: str = f"Cannot Push Tags to '{remote_name}':\n"
# ... (Build specific auth message as in push_remote) ...
log_handler.log_warning(
f"Push Tags skipped: Auth/Connection status is '{self.remote_auth_status}'.",
func_name=func_name
)
self.main_frame.show_warning("Action Blocked", auth_msg)
self.main_frame.update_status_bar(f"Push tags failed: {self.remote_auth_status}")
return
# Confirm with user before pushing all local tags
if not self.main_frame.ask_yes_no(
"Confirm Push Tags",
f"Push all local tags to remote '{remote_name}'?\n"
f"(Existing tags on the remote with the same name will "
f"NOT be overwritten unless forced, which this action does not do)."
):
self.main_frame.update_status_bar("Push tags cancelled by user.")
return
log_handler.log_info(
f"Starting push tags to remote '{remote_name}'...", func_name=func_name
)
# Prepare args
args: tuple = (self.remote_action_handler, svn_path, remote_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_push_tags_async,
args_tuple=args,
context_dict={
"context": "push_tags_remote",
"status_msg": f"Pushing tags to remote '{remote_name}'",
"remote_name": remote_name, # Pass context for messages
}
)
def clone_remote_repo(self):
""" Handles the 'Clone from Remote...' action: shows dialog, validates, starts worker. """
func_name: str = "clone_remote_repo"
log_handler.log_info(
f"--- Action Triggered: Clone Remote Repository ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot start clone: Main frame not available.", func_name=func_name
)
return
# Show modal dialog to get clone parameters
dialog = CloneFromRemoteDialog(self.master)
# Result is None if cancelled, or (url, parent_dir, profile_name_input)
dialog_result: Optional[tuple[str, str, str]] = dialog.result
if not dialog_result:
log_handler.log_info(
"Clone operation cancelled by user in dialog.", func_name=func_name
)
self.main_frame.update_status_bar("Clone cancelled.")
return
# Extract data from dialog result
remote_url, local_parent_dir, profile_name_input = dialog_result
# --- Derive target directory and profile name, validate paths ---
final_profile_name: str = ""
target_clone_dir: str = ""
repo_name_from_url: str = ""
try:
# Derive repo name from URL (remove .git suffix)
repo_name_from_url = os.path.basename(remote_url)
if repo_name_from_url.endswith(".git"):
repo_name_from_url = repo_name_from_url[:-4]
if not repo_name_from_url:
raise ValueError("Could not derive repository name from URL.")
# Construct full target path for the clone
target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url)
# Normalize the path
target_clone_dir = os.path.abspath(target_clone_dir)
# Determine final profile name (use input or derive from repo name)
if profile_name_input:
final_profile_name = profile_name_input
# Check if proposed profile name already exists
if final_profile_name in self.config_manager.get_profile_sections():
raise ValueError(
f"Profile name '{final_profile_name}' already exists. "
f"Please choose a different name."
)
else:
# Use repo name as base, add counter if it exists
final_profile_name = repo_name_from_url
counter: int = 1
while final_profile_name in self.config_manager.get_profile_sections():
final_profile_name = f"{repo_name_from_url}_{counter}"
counter += 1
log_handler.log_debug(
f"Derived target clone directory: {target_clone_dir}",
func_name=func_name
)
log_handler.log_debug(
f"Determined profile name: {final_profile_name}",
func_name=func_name
)
# --- CRITICAL CHECK: Target directory must NOT exist ---
if os.path.exists(target_clone_dir):
error_msg: str = (
f"Clone failed: Target directory already exists:\n{target_clone_dir}\n"
f"Please choose a different parent directory or ensure the target is clear."
)
log_handler.log_error(error_msg, func_name=func_name)
self.main_frame.show_error("Clone Path Error", error_msg)
self.main_frame.update_status_bar("Clone failed: Target directory exists.")
return # Stop the operation
except ValueError as ve:
# Handle errors deriving names or validating profile name
log_handler.log_error(
f"Clone configuration error: {ve}", func_name=func_name
)
self.main_frame.show_error("Configuration Error", str(ve))
self.main_frame.update_status_bar("Clone failed: Configuration error.")
return
except Exception as e:
# Handle unexpected errors during preparation
log_handler.log_exception(
f"Unexpected error preparing for clone: {e}", func_name=func_name
)
self.main_frame.show_error(
"Internal Error", f"An unexpected error occurred:\n{e}"
)
self.main_frame.update_status_bar("Clone failed: Internal error.")
return
# --- Start Asynchronous Worker ---
log_handler.log_info(
f"Starting clone for '{remote_url}' into '{target_clone_dir}'...",
func_name=func_name
)
# Prepare arguments
args: tuple = (self.git_commands, remote_url, target_clone_dir, final_profile_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_clone_remote_async,
args_tuple=args,
context_dict={
"context": "clone_remote",
"status_msg": f"Cloning '{repo_name_from_url}'...", # Use repo name
"clone_success_data": { # Pass data needed for profile creation
"profile_name": final_profile_name,
"cloned_path": target_clone_dir,
"remote_url": remote_url,
}
}
)
def refresh_remote_status(self):
"""
Starts the asynchronous check for ahead/behind status of the current
branch against its upstream counterpart.
"""
func_name: str = "refresh_remote_status"
log_handler.log_info(
f"--- Action Triggered: Refresh Remote Sync Status ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Sync Status")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Refresh Status skipped: Repo not ready.", func_name=func_name
)
# Update GUI to show 'not ready' state
self._update_gui_for_not_ready_state()
return
# --- Get Current Branch and Upstream ---
current_branch: Optional[str] = None
upstream_branch: Optional[str] = None
try:
# Get the name of the current local branch
current_branch = self.git_commands.get_current_branch_name(svn_path)
if current_branch:
# If on a branch, get its configured upstream
upstream_branch = self.git_commands.get_branch_upstream(
svn_path, current_branch
)
else:
# Handle detached HEAD state
log_handler.log_warning(
"Refresh Status: Cannot get status, currently in detached HEAD state.",
func_name=func_name
)
self._update_gui_for_detached_head(current_branch)
return # Cannot check sync status in detached HEAD
# Handle case where upstream is not configured
if not upstream_branch:
log_handler.log_info(
f"Refresh Status: No upstream configured for branch '{current_branch}'.",
func_name=func_name
)
self._update_gui_for_no_upstream(current_branch)
return # Cannot check sync status without upstream
# Enable refresh button if we have branch and upstream (might be disabled)
if hasattr(self.main_frame, "refresh_sync_status_button"):
self.main_frame.refresh_sync_status_button.config(state=tk.NORMAL)
except Exception as e:
# Handle errors getting branch/upstream info
log_handler.log_exception(
f"Error getting branch/upstream before status check: {e}",
func_name=func_name
)
self._update_gui_for_status_error() # Update GUI to show error state
return
# --- Start Async Worker ---
log_handler.log_info(
f"Checking ahead/behind status for '{current_branch}' vs '{upstream_branch}'...",
func_name=func_name
)
# Update GUI label to "Checking..."
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
current_branch=current_branch, status_text="Sync Status: Checking..."
)
# Prepare arguments for the worker
args: tuple = (self.git_commands, svn_path, current_branch, upstream_branch)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_get_ahead_behind_async, # Worker function
args_tuple=args,
context_dict={
"context": "get_ahead_behind",
"status_msg": f"Checking sync status for '{current_branch}'",
"local_branch": current_branch, # Pass context for result handler
"upstream_branch": upstream_branch,
}
)
def refresh_branch_list(self):
""" Starts async operation to refresh the local branch list in the GUI. """
func_name: str ="refresh_branch_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Branches")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Branches skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state()
return
# Prepare arguments
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_branches_async, # Worker for local branches
args_tuple=args,
context_dict={"context": "refresh_branches", "status_msg": "Refreshing branches"}
)
def refresh_commit_history(self):
""" Starts async operation to refresh the commit history display. """
func_name: str ="refresh_commit_history"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh History")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh History skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state()
return
# Determine branch filter from GUI
branch_filter: Optional[str] = None
log_scope: str = "All History"
if hasattr(self.main_frame, "history_branch_filter_var"):
filter_selection: str = self.main_frame.history_branch_filter_var.get()
# Use filter only if a specific branch/tag is selected
if filter_selection and filter_selection != "-- All History --":
branch_filter = filter_selection
log_scope = f"'{branch_filter}'"
# Prepare arguments
args: tuple = (self.git_commands, svn_path, branch_filter, log_scope)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_history_async,
args_tuple=args,
context_dict={
"context": "refresh_history",
"status_msg": f"Refreshing history for {log_scope}"
}
)
def refresh_changed_files_list(self):
""" Starts async operation to refresh the list of changed files. """
func_name: str ="refresh_changed_files_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Changes skipped: Repo not ready.", func_name=func_name
)
# GUI update handled by _update_gui_for_not_ready_state if needed
return
# Prepare arguments
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_changes_async,
args_tuple=args,
context_dict={
"context": "refresh_changes",
"status_msg": "Refreshing changed files"
}
)
def refresh_remote_branches(self):
""" Starts the asynchronous refresh of the remote branches list. """
func_name: str = "refresh_remote_branches"
log_handler.log_info(
f"--- Action Triggered: Refresh Remote Branches List ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Remote Branches")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Refresh Remote Branches skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state() # Update GUI lists
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
log_handler.log_info(
f"Starting refresh of remote branches list for '{remote_name}'...",
func_name=func_name
)
# Update GUI list to show "Loading..."
if hasattr(self.main_frame, "update_remote_branches_list"):
self.main_frame.update_remote_branches_list(["(Loading...)"])
# Prepare args and start async operation
args: tuple = (self.git_commands, svn_path, remote_name)
self._start_async_operation(
worker_func=async_workers.run_refresh_remote_branches_async,
args_tuple=args,
context_dict={
"context": "refresh_remote_branches",
"status_msg": f"Refreshing remote branches for '{remote_name}'"
}
)
def checkout_remote_branch_as_local(
self, remote_branch_full: str, local_branch_suggestion: str
):
"""
Handles checkout of a remote branch as a new local tracking branch.
Checks if local branch exists, confirms if needed, starts async worker.
"""
func_name: str = "checkout_remote_branch_as_local"
log_handler.log_info(
f"--- Action Triggered: Checkout Remote Branch '{remote_branch_full}' as Local '{local_branch_suggestion}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Remote Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Checkout Remote Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
# No status bar update here as it's usually from a menu click
return
try:
# Check if a local branch with the suggested name already exists
local_branches: list[str]
current: Optional[str]
local_branches, current = self.git_commands.list_branches(svn_path)
if local_branch_suggestion in local_branches:
# If it exists, ask user if they want to checkout the existing one instead
log_handler.log_warning(
f"Local branch '{local_branch_suggestion}' already exists.",
func_name=func_name
)
if self.main_frame.ask_yes_no(
"Branch Exists",
f"A local branch named '{local_branch_suggestion}' already exists.\n\n"
f"Do you want to check out the existing local branch instead?"
):
# User wants to checkout existing local branch
log_handler.log_info(
f"User chose to checkout existing local branch '{local_branch_suggestion}'.",
func_name=func_name
)
# Call the standard checkout function, passing the path override
self.checkout_branch(
branch_to_checkout=local_branch_suggestion,
repo_path_override=svn_path
)
else:
# User cancelled
log_handler.log_info(
"Checkout cancelled because local branch exists.",
func_name=func_name
)
self.main_frame.update_status_bar("Checkout cancelled.")
# Exit in either case (another async task started or cancelled)
return
# If local branch doesn't exist, proceed to create tracking branch
log_handler.log_info(
f"Starting checkout of '{remote_branch_full}' as new local branch '{local_branch_suggestion}'...",
func_name=func_name
)
# Prepare args and start async operation
args: tuple = (
self.action_handler,
svn_path,
local_branch_suggestion,
remote_branch_full,
)
self._start_async_operation(
worker_func=async_workers.run_checkout_tracking_branch_async,
args_tuple=args,
context_dict={
"context": "checkout_tracking_branch",
"status_msg": f"Checking out '{local_branch_suggestion}' tracking '{remote_branch_full}'"
}
)
except Exception as e:
# Handle errors during local branch check or starting the worker
log_handler.log_exception(
f"Error preparing for tracking branch checkout: {e}",
func_name=func_name
)
if hasattr(self, "main_frame"):
self.main_frame.show_error(
"Checkout Error", f"Could not start checkout operation:\n{e}"
)
self.main_frame.update_status_bar("Checkout failed: Internal error.")
# --- Simplified Queue Checking Method (Refactored) ---
def _check_completion_queue(self, results_queue: queue.Queue, context: dict):
"""
Checks result queue, updates status bar briefly, delegates processing.
"""
task_context: str = context.get('context', 'unknown')
func_name: str = "_check_completion_queue"
try:
# Get result without blocking
result_data: Dict[str, Any] = results_queue.get_nowait()
log_handler.log_info(
f"Result received for '{task_context}'. Status: {result_data.get('status')}",
func_name=func_name
)
# Determine if widgets should be re-enabled immediately
should_reenable_now: bool = self._should_reenable_widgets_now(
task_context, result_data.get('status')
)
if not should_reenable_now:
log_handler.log_debug(
f"Postponing widget re-enable for context: {task_context}",
func_name=func_name
)
# Re-enable widgets now if appropriate
if should_reenable_now:
self._reenable_widgets_if_ready()
# Update status bar with generic message (may be overridden by handler)
self._update_status_bar_from_result(task_context, result_data)
# Process the result using the dedicated handler
self._process_result_with_handler(result_data, context)
except queue.Empty:
# Queue empty, reschedule check
self._reschedule_queue_check(results_queue, context)
except Exception as e:
# Critical error during queue check or basic handling
self._handle_queue_check_error(e, task_context)
# --- Helper methods for _check_completion_queue ---
def _should_reenable_widgets_now(self, task_context: str, status: Optional[str]) -> bool:
""" Determines if GUI widgets should be re-enabled immediately. """
# Don't re-enable immediately if waiting for user interaction or follow-up task
if task_context == "check_connection" and status == 'auth_required': return False
if task_context == "interactive_auth" and status == 'success': return False
if task_context == 'clone_remote' and status == 'success': return False
if task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success': return False
if task_context in ['pull_remote', 'merge_local_branch'] and status == 'conflict': return False
if task_context == 'compare_branches' and status == 'success': return False
# Default: Re-enable immediately
return True
def _reenable_widgets_if_ready(self):
""" Safely re-enables action widgets if the main frame exists. """
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
log_handler.log_debug(
"Re-enabling widgets now.", func_name="_reenable_widgets_if_ready"
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
else:
log_handler.log_warning(
"Cannot re-enable widgets, MainFrame missing.",
func_name="_reenable_widgets_if_ready"
)
def _update_status_bar_from_result(self, task_context: str, result_data: dict):
""" Updates status bar based on result, unless specific conditions apply. """
status: Optional[str] = result_data.get('status')
message: str = result_data.get('message', "Operation finished.")
# Conditions where status bar update is skipped or handled differently
skip_update: bool = False
if (task_context == 'clone_remote' and status == 'success'): skip_update = True
if (task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success'): skip_update = True
if (task_context == 'compare_branches' and status == 'success'): skip_update = True
if status in ['conflict', 'rejected']: skip_update = True
# Update status bar if not skipped and main frame exists
if not skip_update and hasattr(self, "main_frame") and self.main_frame.winfo_exists():
status_color: Optional[str] = None
reset_duration: int = 5000 # Default reset time
# Determine color and duration based on status
if status == 'success': status_color = self.main_frame.STATUS_GREEN
elif status == 'warning': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 7000
elif status == 'auth_required': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 15000
elif status == 'conflict': status_color = self.main_frame.STATUS_RED; reset_duration = 15000
elif status == 'rejected': status_color = self.main_frame.STATUS_RED; reset_duration = 15000
elif status == 'error': status_color = self.main_frame.STATUS_RED; reset_duration = 10000
# Call the GUI update method
self.main_frame.update_status_bar(
message, bg_color=status_color, duration_ms=reset_duration
)
def _process_result_with_handler(self, result_data: dict, context: dict):
""" Instantiates and calls the AsyncResultHandler to process the result. """
task_context: str = context.get('context', 'unknown')
func_name: str = "_process_result_with_handler"
try:
# Create handler instance, passing the current app instance
result_handler = AsyncResultHandler(self)
# Delegate detailed processing
result_handler.process(result_data, context)
log_handler.log_debug(
f"Result processing delegated to handler for context '{task_context}'.",
func_name=func_name
)
except Exception as handler_e:
# Handle errors occurring *within* the result handler
log_handler.log_exception(
f"Error during result processing by handler for {task_context}: {handler_e}",
func_name=func_name
)
# Show error to user
if hasattr(self, "main_frame"):
self.main_frame.show_error(
"Processing Error", f"Failed to handle task result:\n{handler_e}"
)
# Ensure widgets are re-enabled if handler fails unexpectedly
if not self._should_reenable_widgets_now(task_context, result_data.get('status')) and \
hasattr(self.main_frame, "winfo_exists") and self.main_frame.winfo_exists():
log_handler.log_warning(
"Re-enabling widgets after handler error.", func_name=func_name
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
def _reschedule_queue_check(self, results_queue: queue.Queue, context: dict):
""" Reschedules the check for the completion queue if the app is running. """
if hasattr(self, "master") and self.master.winfo_exists():
self.master.after(
self.ASYNC_QUEUE_CHECK_INTERVAL_MS,
self._check_completion_queue,
results_queue,
context
)
def _handle_queue_check_error(self, error: Exception, task_context: str):
""" Handles critical errors during the queue check process itself. """
func_name: str = "_handle_queue_check_error"
log_handler.log_exception(
f"Critical error checking completion queue for {task_context}: {error}",
func_name=func_name
)
try: # Attempt GUI recovery
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
# Re-enable widgets
self.main_frame.set_action_widgets_state(tk.NORMAL)
# Show error in status bar
self.main_frame.update_status_bar(
"Error processing async result.",
bg_color=self.main_frame.STATUS_RED,
duration_ms=10000
)
# Reset other relevant GUI states (e.g., sync status)
self._update_gui_for_status_error()
except Exception as recovery_e:
# Log error during recovery attempt
log_handler.log_error(
f"Failed to recover GUI after queue processing error: {recovery_e}",
func_name=func_name
)
def _handle_gitignore_save(self):
"""
Callback executed after .gitignore is saved successfully by the editor.
Starts an asynchronous task to check for and untrack files if necessary.
"""
func_name: str = "_handle_gitignore_save"
log_handler.log_info(
"Callback: .gitignore saved. Starting async untrack check.",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Untrack Check after Gitignore Save")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_error(
"Cannot start untrack check: Invalid/Not ready path.",
func_name=func_name
)
self.main_frame.update_status_bar(
"Error: Untrack check failed (invalid path)."
)
return
# Prepare args and start the untrack worker
args: tuple = (self.action_handler, svn_path)
self._start_async_operation(
worker_func=async_workers.run_untrack_async,
args_tuple=args,
context_dict={
"context": "_handle_gitignore_save", # Context identifies origin
"status_msg": "Checking files to untrack",
"committed_flag_possible": True # Untracking involves a commit
}
)
def _open_commit_file_diff(
self,
commit_hash: str,
file_status: str,
file_path: str,
old_file_path: Optional[str] = None
):
"""
Opens the DiffViewerWindow to show the changes for a specific file
within a given commit compared to its parent.
Args:
commit_hash (str): The full hash of the commit being viewed.
file_status (str): The status character (A, M, D, R, T...).
file_path (str): The relative path of the file in the commit.
old_file_path (Optional[str]): The old path (for Renamed files).
"""
func_name: str = "_open_commit_file_diff"
log_handler.log_info(
f"Requesting diff for file '{file_path}' in commit '{commit_hash[:7]}'",
func_name=func_name
)
# Validazione repo path corrente
svn_path: Optional[str] = self._get_and_validate_svn_path("Open Commit File Diff")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_error(
"Cannot open diff: Repository path invalid or not ready.", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Error", "Cannot open diff: Repository not available.")
return
# Determina i riferimenti per il diff viewer
ref1: str = f"{commit_hash}^" # Genitore del commit
ref2: str = commit_hash # Il commit stesso
# --- Gestione Casi Speciali ---
# 1. File Aggiunto (A) o Copiato (C): Confronta con "stato vuoto" (come fare?)
# Git diff <parent> <commit> -- <file> mostra l'intero file come aggiunto.
# Possiamo passare i ref standard, DiffViewerWindow lo mostrerà correttamente.
# 2. File Cancellato (D): Confronta il file nel parent con "stato vuoto".
# DiffViewerWindow dovrebbe mostrare il file come cancellato.
# Per Deleted files, il path da usare è quello *prima* della cancellazione.
# 3. File Rinominato (R): Dobbiamo usare old_file_path per ref1 e file_path per ref2?
# No, `git diff commit^ commit -- file` gestisce la rinomina. Usiamo il *nuovo* path.
# 4. Commit Iniziale (Root Commit): Non ha genitore (`commit^` fallisce).
# Dobbiamo rilevarlo. Possiamo provare `git rev-parse commit^`. Se fallisce,
# è un root commit. In tal caso, mostriamo il file solo in ref2 (vs "empty").
# Per semplicità iniziale, potremmo anche solo mostrare un errore se `git show`
# per `commit^:path` fallisce.
path_to_diff: str = file_path
if file_status == 'D' and old_file_path:
# Per file cancellati, in realtà vogliamo vedere il contenuto *prima*
# ma il diff tra parent e commit con il path nuovo non funziona.
# È più corretto mostrare il file come esisteva nel parent.
# Potremmo aprire DiffViewer in modo speciale solo con ref1?
# O semplicemente non permettere il diff per file cancellati in questa vista?
# Per ora, mostriamo un messaggio e non apriamo il diff per 'D'.
log_handler.log_info(
f"Diff view skipped for deleted file '{file_path}' in commit '{commit_hash[:7]}'.",
func_name=func_name
)
if hasattr(self.main_frame, "show_info"):
self.main_frame.show_info(
"Diff Not Applicable",
f"Cannot show diff for file deleted in this commit:\n{file_path}"
)
return
# --- Apri DiffViewerWindow ---
log_handler.log_debug(
f"Opening DiffViewerWindow: Ref1='{ref1}', Ref2='{ref2}', Path='{path_to_diff}'",
func_name=func_name
)
try:
# Istanzia e mostra la finestra di diff modale
DiffViewerWindow(
master=self.master, # Usa la finestra root come parent
git_commands=self.git_commands,
repo_path=svn_path,
relative_file_path=path_to_diff, # Usa il path corretto
ref1=ref1, # Commit genitore
ref2=ref2 # Commit selezionato
)
log_handler.log_debug("Commit file diff viewer closed.", func_name=func_name)
# Non serve aggiornare status bar qui, la finestra CommitDetail è ancora aperta
except Exception as e_diff:
log_handler.log_exception(
f"Error opening commit file diff viewer: {e_diff}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Diff Viewer Error", f"Could not display file changes:\n{e_diff}"
)
def update_gitea_wiki(self):
"""Starts the asynchronous task to update the Gitea wiki."""
func_name = "update_gitea_wiki"
log_handler.log_info(f"--- Action Triggered: Update Gitea Wiki ---", func_name=func_name)
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
svn_path = self._get_and_validate_svn_path("Update Gitea Wiki")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning("Update Wiki skipped: Repo not ready.", func_name=func_name)
self.main_frame.show_error("Action Failed", "Repository path is invalid or not prepared.")
self.main_frame.update_status_bar("Update Wiki failed: Repo not ready.")
return
# Recupera l'URL remoto dal profilo corrente
remote_url = self.main_frame.remote_url_var.get().strip()
if not remote_url:
log_handler.log_warning("Update Wiki skipped: Remote URL not configured.", func_name=func_name)
self.main_frame.show_error("Action Failed", "Remote URL is not configured in the current profile.")
self.main_frame.update_status_bar("Update Wiki failed: Remote URL missing.")
return
# Opzionale: recupera nomi file da config (o usa default)
# wiki_en_target = self.config_manager.get_profile_option(profile, "wiki_en_filename", fallback=None)
# wiki_it_target = self.config_manager.get_profile_option(profile, "wiki_it_filename", fallback=None)
# Chiedi conferma all'utente
if not self.main_frame.ask_yes_no("Confirm Wiki Update",
"Update Gitea Wiki pages from local 'doc/' folder?\n"
"(This will clone the wiki, commit, and push changes.)"):
self.main_frame.update_status_bar("Wiki update cancelled.")
return
# Prepara argomenti per il worker
args = (
self.wiki_updater,
svn_path,
remote_url,
# Passa altri argomenti se hai reso i nomi file configurabili
# wiki_en_target,
# wiki_it_target,
)
# Avvia l'operazione asincrona
self._start_async_operation(
worker_func=async_workers.run_update_wiki_async, # Devi creare questo worker
args_tuple=args,
context_dict={
"context": "update_wiki",
"status_msg": "Updating Gitea Wiki...",
}
)
# --- Application Entry Point ---
# Questa parte verrà spostata in __main__.py
# if __name__ == "__main__":
# # Il codice per avviare l'app ora è in gitsync_tool/__main__.py
# pass
# --- END OF FILE gitsync_tool/app.py ---