SXXXXXXX_GitUtility/GitUtility.py
2025-04-28 11:30:35 +02:00

3877 lines
177 KiB
Python

# --- FILE: GitUtility.py ---
import os
import datetime
import tkinter as tk
from tkinter import messagebox
from tkinter import filedialog
# Import moduli logging e utilità
import logging # Mantenuto per i livelli
import re
import threading
import queue
import traceback
import sys
from typing import Callable
from typing import List
from typing import Dict
from typing import Any
from typing import Tuple
from typing import Optional # PEP8 compliance for Optional type hints
# Import application modules
try:
# Configuration management
from config_manager import ConfigManager
from config_manager import DEFAULT_PROFILE
from config_manager import DEFAULT_BACKUP_DIR
from config_manager import DEFAULT_REMOTE_NAME
# Action handlers and backend logic
from action_handler import ActionHandler
from backup_handler import BackupHandler
from git_commands import GitCommands
from git_commands import GitCommandError
from remote_actions import RemoteActionHandler
# Logging system
import log_handler
from logger_config import setup_file_logging # For file logger setup
# GUI components
from gui import MainFrame
from gui import GitignoreEditorWindow
from gui import CreateTagDialog
from gui import CreateBranchDialog
from gui import CloneFromRemoteDialog
from diff_viewer import DiffViewerWindow
from diff_summary_viewer import DiffSummaryWindow
from commit_detail_window import CommitDetailWindow
# Asynchronous operations
import async_workers
from async_result_handler import AsyncResultHandler # For processing results
except ImportError as e:
# Critical error handling if imports fail
critical_msg: str = f"Critical Error: Failed to import required application modules: {e}"
print(f"FATAL IMPORT ERROR: {critical_msg}", file=sys.stderr)
# Attempt to show a graphical error message as a fallback
try:
root_fallback = tk.Tk()
root_fallback.withdraw() # Hide the empty root window
messagebox.showerror(
"Startup Error",
f"Failed to load components:\n{e}\n\nApplication cannot start.",
)
root_fallback.destroy()
except Exception:
# Ignore errors in the graphical fallback itself
pass
# Exit the application forcefully if core components are missing
sys.exit(1)
class GitSvnSyncApp:
"""
Main application controller class for the Git Sync Tool.
Orchestrates GUI and backend actions using asynchronous operations
and a centralized logging queue. Initializes and connects components.
"""
# Constants for polling intervals (in milliseconds)
LOG_QUEUE_CHECK_INTERVAL_MS: int = 100
ASYNC_QUEUE_CHECK_INTERVAL_MS: int = 100
def __init__(self, master: tk.Tk):
"""
Initializes the application components and GUI.
Args:
master (tk.Tk): The main Tkinter root window.
"""
self.master: tk.Tk = master
master.title("Git Sync Tool (Bundle & Remote Manager)")
# Define behavior on window close button press
master.protocol("WM_DELETE_WINDOW", self.on_closing)
# Initial logging (console)
print("Initializing GitSvnSyncApp...")
log_handler.log_debug("GitSvnSyncApp initialization started.", func_name="__init__")
# --- Initialize Core Backend Components ---
try:
# Instantiate core handlers and managers
self.config_manager: ConfigManager = ConfigManager(None)
self.git_commands: GitCommands = GitCommands(None)
self.backup_handler: BackupHandler = BackupHandler(None)
self.action_handler: ActionHandler = ActionHandler(
self.git_commands, self.backup_handler
)
self.remote_action_handler: RemoteActionHandler = RemoteActionHandler(
self.git_commands
)
# Internal state variables
self.remote_auth_status: str = "unknown"
self.current_local_branch: Optional[str] = None # Use Optional
print("Core components initialized.")
log_handler.log_debug("Core components initialized successfully.", func_name="__init__")
except Exception as e:
# Handle critical errors during backend initialization
print(f"FATAL: Failed to initialize core components: {e}", file=sys.stderr)
log_handler.log_critical(f"Failed to initialize core components: {e}", func_name="__init__")
self.show_fatal_error(
f"Initialization Error:\n{e}\n\nApplication cannot start."
)
# Attempt to close gracefully if backend fails
self.master.after(10, self.on_closing)
return # Stop initialization
# --- Initialize Graphical User Interface (GUI) ---
try:
print("Creating MainFrame GUI...")
log_handler.log_debug("Creating MainFrame GUI.", func_name="__init__")
# Instantiate the main GUI frame, passing necessary callbacks and data
self.main_frame: MainFrame = MainFrame(
master=master,
# Callbacks mapping GUI actions to controller methods
load_profile_settings_cb=self.load_profile_settings,
save_profile_cb=self.save_profile_settings,
add_profile_cb=self.add_profile,
remove_profile_cb=self.remove_profile,
browse_folder_cb=self.browse_folder,
update_svn_status_cb=self.update_svn_status_indicator,
prepare_svn_for_git_cb=self.prepare_svn_for_git,
create_git_bundle_cb=self.create_git_bundle,
fetch_from_git_bundle_cb=self.fetch_from_git_bundle,
manual_backup_cb=self.manual_backup,
open_gitignore_editor_cb=self.open_gitignore_editor,
commit_changes_cb=self.commit_changes,
refresh_changed_files_cb=self.refresh_changed_files_list,
open_diff_viewer_cb=self.open_diff_viewer,
add_selected_file_cb=self.add_selected_file,
refresh_tags_cb=self.refresh_tag_list,
create_tag_cb=self.create_tag,
checkout_tag_cb=self.checkout_tag,
refresh_branches_cb=self.refresh_branch_list, # For local branches
create_branch_cb=self.create_branch,
checkout_branch_cb=self.checkout_branch, # For local branches
refresh_history_cb=self.refresh_commit_history,
apply_remote_config_cb=self.apply_remote_config,
check_connection_auth_cb=self.check_connection_auth,
fetch_remote_cb=self.fetch_remote,
pull_remote_cb=self.pull_remote,
push_remote_cb=self.push_remote,
push_tags_remote_cb=self.push_tags_remote,
refresh_remote_status_cb=self.refresh_remote_status,
clone_remote_repo_cb=self.clone_remote_repo,
refresh_remote_branches_cb=self.refresh_remote_branches, # For remote branches
checkout_remote_branch_cb=self.checkout_remote_branch_as_local,
delete_local_branch_cb=self.delete_local_branch,
merge_local_branch_cb=self.merge_local_branch,
compare_branch_with_current_cb=self.compare_branch_with_current,
config_manager_instance=self.config_manager,
profile_sections_list=self.config_manager.get_profile_sections(),
view_commit_details_cb=self.view_commit_details,
)
print("MainFrame GUI created.")
log_handler.log_debug("MainFrame GUI created successfully.", func_name="__init__")
except Exception as e:
# Handle critical errors during GUI initialization
print(f"FATAL: Failed to initialize MainFrame GUI: {e}", file=sys.stderr)
log_handler.log_exception("Failed to initialize MainFrame GUI.", func_name="__init__")
self.show_fatal_error(f"GUI Initialization Error:\n{e}\n\nApplication cannot start.")
self.master.after(10, self.on_closing) # Attempt to close
return # Stop initialization
# --- Setup Logging Processing ---
self._setup_logging_processing()
# --- Log Application Start ---
log_handler.log_info("Git Sync Tool application starting.", func_name="__init__")
# --- Initial Profile Load ---
# Loads the initially selected profile (default or first) into the GUI
self._perform_initial_load()
log_handler.log_info("Git Sync Tool initialization complete.", func_name="__init__")
# --- Static Method Helper ---
@staticmethod
def _extract_path_from_status_line(file_status_line: str) -> Optional[str]:
"""
Extracts a clean relative path from a git status line.
Handles various statuses including renames ('R old -> new').
Returns the 'new' path for renames/copies, or the main path otherwise.
Returns None if parsing fails.
"""
func_name: str = "_extract_path_from_status_line"
try:
# Clean NUL characters and leading/trailing whitespace
line: str = file_status_line.strip('\x00').strip()
# Basic validation for minimum length (e.g., " M a")
if not line or len(line) < 3:
log_handler.log_warning(
f"Invalid/short status line received: '{file_status_line}'",
func_name=func_name
)
return None
path_part: str = ""
# Handle rename/copy format: "XY old -> new"
if "->" in line:
# Take the part after the last "->"
path_part = line.split("->")[-1].strip()
else:
# Handle other statuses: "XY path" or "?? path"
# Regex captures 1 or 2 status chars, space, then the rest
match = re.match(r"^[ MARCUD?!]{1,2}\s+(.*)", line)
if match:
# Group 1 contains the path part
path_part = match.group(1)
else:
# Fallback if regex doesn't match (unusual format?)
log_handler.log_warning(
f"Could not match expected status line format: '{line}'",
func_name=func_name
)
# Simple fallback: take everything after the first space
first_space_index: int = line.find(' ')
if first_space_index != -1:
path_part = line[first_space_index:].strip()
else:
# No space found, cannot extract path
return None
# If no path part was extracted, return None
if not path_part:
return None
# Remove potential surrounding quotes
relative_path: str
if len(path_part) >= 2 and path_part.startswith('"') and path_part.endswith('"'):
relative_path = path_part[1:-1]
else:
relative_path = path_part
# Attempt to decode octal escape sequences (common with core.quotepath=true)
try:
# Only attempt if escape characters seem present
if '\\' in relative_path:
# Decode assuming UTF-8 escaped bytes
# Note: This encoding/decoding dance is necessary for Python's unicode_escape
decoded_bytes: bytes = bytes(relative_path, "utf-8").decode("unicode_escape").encode("latin-1")
decoded_path: str = decoded_bytes.decode("utf-8")
# Use decoded path only if it changed and looks valid
# (This is heuristic, might need refinement based on actual Git output)
if decoded_path != relative_path and os.path.normpath(decoded_path):
log_handler.log_debug(
f"Path '{relative_path}' decoded to '{decoded_path}'",
func_name=func_name
)
relative_path = decoded_path
except Exception as decode_err:
# Log warning but proceed with the undecoded path if decoding fails
log_handler.log_warning(
f"Could not decode potential escape sequences in path '{relative_path}': {decode_err}",
func_name=func_name
)
# Final check if path is empty after processing
if not relative_path:
return None
log_handler.log_debug(
f"Cleaned path from status line: '{relative_path}'",
func_name=func_name
)
return relative_path
except Exception as e:
# Log any unexpected error during path extraction
log_handler.log_exception(
f"Error cleaning path from status line '{file_status_line}': {e}",
func_name=func_name
)
return None # Return None on error
# --- Logging Setup and Processing ---
def _setup_logging_processing(self):
"""
Configures file logging and starts the log queue processing loop
to update the GUI log area.
"""
func_name: str = "_setup_logging_processing"
try:
# 1. Configure file logging (writes to LOG_FILE)
# The level set here determines the minimum level for the file log.
setup_file_logging(level=logging.DEBUG) # Log DEBUG+ to file
# 2. Start polling the shared log queue to update the GUI
if hasattr(self, "main_frame") and hasattr(self.main_frame, "log_text"):
log_handler.log_info(
"Starting log queue processing for GUI.",
func_name=func_name
)
# Schedule the first check of the log queue
self.master.after(
self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue
)
else:
# Log error if the GUI log widget isn't ready
print(
"ERROR: Cannot start log queue processing - GUI log widget not found.",
file=sys.stderr
)
log_handler.log_error(
"Cannot start log queue processing - GUI log widget not found.",
func_name=func_name
)
except Exception as e:
# Log error if logging setup fails
print(f"ERROR during logging setup: {e}", file=sys.stderr)
log_handler.log_exception(
"Failed to setup logging processing.", func_name=func_name
)
def _process_log_queue(self):
"""
Processes messages from the shared log queue, writes them to the
root logger (which has the file handler), and updates the GUI log widget.
"""
func_name: str = "_process_log_queue"
# Safely get the log widget reference
log_widget: Optional[tk.scrolledtext.ScrolledText] = getattr(self.main_frame, "log_text", None)
# Stop processing if the main window or log widget is destroyed
if not log_widget or not log_widget.winfo_exists():
log_handler.log_warning(
"Log widget not found, stopping queue processing.", func_name=func_name
)
return
processed_count: int = 0
# Limit processing per cycle to keep GUI responsive
max_proc_per_cycle: int = 50
# Process messages currently in the queue
while not log_handler.log_queue.empty():
# Pause processing if limit is reached for this cycle
if processed_count >= max_proc_per_cycle:
log_handler.log_debug(
f"Processed {max_proc_per_cycle} log entries, pausing.",
func_name=func_name
)
break
try:
# Get log entry without blocking
log_entry: dict = log_handler.log_queue.get_nowait()
level: int = log_entry.get("level", logging.INFO)
message: str = log_entry.get("message", "<empty log message>")
level_name: str = log_handler.get_log_level_name(level)
# 1. Write to root logger (handled by configured file handler)
logging.getLogger().log(level, message)
processed_count += 1
# 2. Update GUI log widget if level is appropriate (DEBUG+)
if level >= logging.DEBUG:
try:
# Temporarily enable widget, insert text with tag, scroll, disable
original_state: str = log_widget.cget("state")
log_widget.config(state=tk.NORMAL)
log_widget.insert(tk.END, message + "\n", (level_name,))
log_widget.see(tk.END) # Auto-scroll to the end
log_widget.config(state=original_state)
except tk.TclError as e_gui:
# Handle specific Tkinter errors (e.g., widget destroyed)
print(
f"TclError updating log widget: {e_gui} - Message: {message}",
file=sys.stderr
)
except Exception as e_gui:
# Handle other errors updating the GUI
print(
f"Error updating log widget: {e_gui} - Message: {message}",
file=sys.stderr
)
except queue.Empty:
# Queue is empty, exit loop for this cycle
break
except Exception as e_proc:
# Handle unexpected errors during queue processing
print(f"Error processing log queue item: {e_proc}", file=sys.stderr)
# Attempt to log the processing error itself
try:
logging.getLogger().error(f"Error processing log queue item: {e_proc}")
except Exception:
pass # Ignore errors during fallback logging
# Reschedule the next check if the main window still exists
if self.master.winfo_exists():
self.master.after(self.LOG_QUEUE_CHECK_INTERVAL_MS, self._process_log_queue)
# --- Initial Application Load ---
def _perform_initial_load(self):
"""
Loads the initially selected profile (default or first available)
into the GUI when the application starts.
"""
func_name: str = "_perform_initial_load"
log_handler.log_debug("Performing initial profile load.", func_name=func_name)
# Ensure the main GUI frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot perform initial load: MainFrame not ready.", func_name=func_name
)
return
# Get the profile currently selected in the dropdown
initial_profile: str = self.main_frame.profile_var.get()
# If a profile is selected, load its settings
if initial_profile:
log_handler.log_debug(
f"Loading initial profile: '{initial_profile}'", func_name=func_name
)
# Call the standard method to load settings into the GUI
self.load_profile_settings(initial_profile)
else:
# Handle the case where no profiles exist in the config file
log_handler.log_warning(
"No initial profile set (no profiles found?).", func_name=func_name
)
# Clear GUI fields and disable actions
self._clear_and_disable_fields()
# Update status bar to inform the user
self.main_frame.update_status_bar("No profiles found. Please add a profile.")
# --- Application Closing Handler ---
def on_closing(self):
""" Handles the window close event (e.g., clicking the 'X' button). """
func_name: str = "on_closing"
log_handler.log_info("Application closing initiated.", func_name=func_name)
# Attempt to update status bar before closing (might fail if GUI is broken)
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
try:
self.main_frame.update_status_bar("Exiting...")
except Exception:
pass # Ignore errors during shutdown status update
# Destroy the main Tkinter window
if self.master and self.master.winfo_exists():
self.master.destroy()
log_handler.log_info("Application closed.", func_name=func_name)
# Note: Daemon threads should terminate automatically upon main thread exit.
# --- Profile Management Callbacks ---
def load_profile_settings(self, profile_name: str):
"""
Loads settings for the selected profile name into the GUI fields
and triggers necessary refreshes or updates based on the loaded path.
"""
func_name: str = "load_profile_settings"
log_handler.log_info(
f"Loading settings for profile: '{profile_name}'", func_name=func_name
)
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot load profile: Main frame not available.", func_name=func_name
)
return
# Update status bar to indicate loading
self.main_frame.update_status_bar(
f"Processing: Loading profile '{profile_name}'..."
)
# Validate the profile name
if not profile_name or profile_name not in self.config_manager.get_profile_sections():
log_handler.log_warning(
f"Profile '{profile_name}' invalid/not found.", func_name=func_name
)
# Clear fields and disable actions if profile is invalid
self._clear_and_disable_fields()
# Show error message if a specific (invalid) profile was requested
if profile_name:
self.main_frame.show_error(
"Profile Load Error", f"Profile '{profile_name}' not found."
)
# Update status bar
status_msg: str = f"Error: Profile '{profile_name}' not found." if profile_name else "No profile selected."
self.main_frame.update_status_bar(status_msg)
return
# Get settings from ConfigManager
cm: ConfigManager = self.config_manager
keys_with_defaults: dict = cm._get_expected_keys_with_defaults()
settings: dict = {}
for key, default_value in keys_with_defaults.items():
settings[key] = cm.get_profile_option(
profile_name, key, fallback=default_value
)
# Apply settings to GUI widgets
mf: MainFrame = self.main_frame
repo_path_for_refresh: str = ""
try:
# Load paths and bundle names
mf.svn_path_entry.delete(0, tk.END)
svn_path_value: str = settings.get("svn_working_copy_path", "")
mf.svn_path_entry.insert(0, svn_path_value)
repo_path_for_refresh = svn_path_value # Store path for later checks
mf.usb_path_entry.delete(0, tk.END)
mf.usb_path_entry.insert(0, settings.get("usb_drive_path", ""))
mf.bundle_name_entry.delete(0, tk.END)
mf.bundle_name_entry.insert(0, settings.get("bundle_name", ""))
mf.bundle_updated_name_entry.delete(0, tk.END)
mf.bundle_updated_name_entry.insert(
0, settings.get("bundle_name_updated", "")
)
# Load backup settings
mf.autobackup_var.set(
str(settings.get("autobackup", "False")).lower() == "true"
)
mf.backup_dir_var.set(
settings.get("backup_dir", DEFAULT_BACKUP_DIR)
)
mf.backup_exclude_extensions_var.set(
settings.get("backup_exclude_extensions", "")
)
mf.backup_exclude_dirs_var.set(
settings.get("backup_exclude_dirs", "")
)
mf.toggle_backup_dir() # Update state of backup dir entry
# Load commit settings
mf.autocommit_var.set(
str(settings.get("autocommit", "False")).lower() == "true"
)
mf.clear_commit_message() # Clear existing message
if mf.commit_message_text.winfo_exists():
# Safely insert new message
current_state = mf.commit_message_text.cget("state")
mf.commit_message_text.config(state=tk.NORMAL)
mf.commit_message_text.insert("1.0", settings.get("commit_message", ""))
mf.commit_message_text.config(state=current_state) # Restore original state
# Load remote repository settings
if hasattr(mf, "remote_url_var") and hasattr(mf, "remote_name_var"):
remote_url_loaded = settings.get("remote_url", "") # Ottieni l'URL
mf.remote_url_var.set(remote_url_loaded)
mf.remote_name_var.set(
settings.get("remote_name", DEFAULT_REMOTE_NAME)
)
else:
# Log if remote widgets are unexpectedly missing
log_handler.log_warning(
"Remote URL/Name widgets not found in GUI during load.",
func_name=func_name
)
log_handler.log_info(
f"Applied settings from '{profile_name}' to GUI fields.",
func_name=func_name
)
# --- Update Repo Status and Trigger Refreshes ---
# Update the SVN status indicator based on the loaded path
self.update_svn_status_indicator(repo_path_for_refresh)
# Check if the repository is ready (valid Git repo)
is_ready: bool = self._is_repo_ready(repo_path_for_refresh)
if is_ready:
# If ready, trigger asynchronous refreshes for repo data
log_handler.log_info(
"Repo ready, triggering async refreshes.", func_name=func_name
)
self.refresh_tag_list()
self.refresh_branch_list() # Refreshes local branches
self.refresh_commit_history()
self.refresh_changed_files_list()
if remote_url_loaded: # Controlla se l'URL caricato NON è vuoto
log_handler.log_debug("Remote URL found, initiating connection check.", func_name=func_name)
self.check_connection_auth() # Check auth/conn status
self.refresh_remote_status() # Check ahead/behind status (richiede upstream valido)
else:
# Se l'URL è vuoto, non tentare la connessione
log_handler.log_info("Remote URL is empty. Skipping connection check and remote status update.", func_name=func_name)
# Imposta lo stato auth/sync come sconosciuto/non configurato
self._update_gui_auth_status("unknown") # O un nuovo stato 'not_configured'? 'unknown' va bene per ora.
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
status_text="Sync Status: (Remote not configured)"
)
# Assicurati che il bottone refresh sync sia disabilitato
if hasattr(self.main_frame, "refresh_sync_status_button"):
self.main_frame.refresh_sync_status_button.config(state=tk.DISABLED)
# Aggiorna la status bar
mf.update_status_bar(
f"Profile '{profile_name}' loaded (Remote not configured)."
)
else:
# If not ready, clear dynamic GUI lists
log_handler.log_info(
"Repo not ready, clearing dynamic lists.", func_name=func_name
)
self._update_gui_for_not_ready_state() # Use helper method
# Set final status bar message for this case
mf.update_status_bar(
f"Profile '{profile_name}' loaded (Repo not ready)."
)
except Exception as e:
# Handle errors during settings application
log_handler.log_exception(
f"Error applying settings for '{profile_name}': {e}",
func_name=func_name
)
# Update sync status label to indicate error
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(status_text="Sync Status: Error")
# Show error popup and update status bar
mf.show_error("Profile Load Error", f"Failed to apply settings:\n{e}")
mf.update_status_bar(f"Error loading profile '{profile_name}'.")
def save_profile_settings(self) -> bool:
""" Saves current GUI field values to the selected profile in the config file. """
func_name: str = "save_profile_settings"
# Get currently selected profile name
profile_name: str = self.main_frame.profile_var.get()
# Validate profile selection
if not profile_name:
log_handler.log_warning(
"Save failed: No profile selected.", func_name=func_name
)
if hasattr(self, "main_frame"):
self.main_frame.update_status_bar("Save failed: No profile selected.")
return False # Indicate failure
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot save profile: Main frame not available.", func_name=func_name
)
return False # Indicate failure
log_handler.log_info(
f"Saving settings for profile: '{profile_name}'", func_name=func_name
)
mf: MainFrame = self.main_frame
cm: ConfigManager = self.config_manager
status_final: str = "Ready." # Default status message
success: bool = False
try:
# Gather all settings from GUI widgets
settings_to_save: dict = {
"svn_working_copy_path": mf.svn_path_entry.get(),
"usb_drive_path": mf.usb_path_entry.get(),
"bundle_name": mf.bundle_name_entry.get(),
"bundle_name_updated": mf.bundle_updated_name_entry.get(),
"autocommit": str(mf.autocommit_var.get()), # Convert bool to string
"commit_message": mf.get_commit_message(),
"autobackup": str(mf.autobackup_var.get()), # Convert bool to string
"backup_dir": mf.backup_dir_var.get(),
"backup_exclude_extensions": mf.backup_exclude_extensions_var.get(),
"backup_exclude_dirs": mf.backup_exclude_dirs_var.get(),
"remote_url": mf.remote_url_var.get(),
"remote_name": mf.remote_name_var.get().strip() or DEFAULT_REMOTE_NAME, # Use default if empty
}
# Log the settings being saved (optional, for debugging)
log_handler.log_debug(
f"Settings to save: {settings_to_save}", func_name=func_name
)
# Save each setting using ConfigManager
for key, value in settings_to_save.items():
# set_profile_option handles string conversion and section creation
cm.set_profile_option(profile_name, key, value)
# Write the updated configuration to the .ini file
cm.save_config()
log_handler.log_info(
f"Settings saved successfully for '{profile_name}'.",
func_name=func_name
)
status_final = f"Profile '{profile_name}' saved."
success = True # Indicate success
except Exception as e:
# Handle errors during saving process
log_handler.log_exception(
f"Error saving profile '{profile_name}': {e}", func_name=func_name
)
status_final = f"Error saving profile '{profile_name}'."
mf.show_error("Save Error", f"Failed:\n{e}")
success = False # Indicate failure
finally:
# Update the status bar regardless of success/failure
mf.update_status_bar(status_final)
return success # Return success status
def add_profile(self):
""" Handles adding a new profile (dialog and config update). """
func_name: str = "add_profile"
log_handler.log_debug("'Add Profile' button clicked.", func_name=func_name)
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Update status bar and prompt user for new profile name
self.main_frame.update_status_bar("Adding new profile...")
name: Optional[str] = self.main_frame.ask_new_profile_name() # Returns str or None
# Handle cancellation
if name is None: # Check for None explicitly
log_handler.log_info("Add profile cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Add profile cancelled.")
return
# Validate the entered name
name = name.strip() # Remove leading/trailing whitespace
if not name:
# Show error if name is empty after stripping
log_handler.log_warning("Add failed: Name empty.", func_name=func_name)
self.main_frame.show_error("Input Error", "Profile name cannot be empty.")
self.main_frame.update_status_bar("Add failed: Empty name.")
return
if name in self.config_manager.get_profile_sections():
# Show error if profile name already exists
log_handler.log_warning(
f"Add failed: '{name}' exists.", func_name=func_name
)
self.main_frame.show_error("Error", f"Profile '{name}' already exists.")
self.main_frame.update_status_bar(f"Add failed: '{name}' exists.")
return
# Proceed with adding the new profile
log_handler.log_info(
f"Attempting to add new profile: '{name}'", func_name=func_name
)
status_final: str = "Ready." # Default status message
try:
# Get default settings from ConfigManager
defaults: dict = self.config_manager._get_expected_keys_with_defaults()
# Customize some defaults for a new profile
defaults["bundle_name"] = f"{name}_repo.bundle"
defaults["bundle_name_updated"] = f"{name}_update.bundle"
defaults["svn_working_copy_path"] = "" # Start with empty paths
defaults["usb_drive_path"] = ""
defaults["remote_url"] = "" # Start with empty remote URL
defaults["commit_message"] = f"Initial commit for profile {name}" # Example commit message
# Add the new section and set default options
self.config_manager.add_section(name) # Creates section if needed
for key, value in defaults.items():
# Use set_profile_option which handles string conversion
self.config_manager.set_profile_option(name, key, value)
# Save the configuration file
self.config_manager.save_config()
log_handler.log_info(
f"Profile '{name}' added successfully.", func_name=func_name
)
# Update the GUI: add profile to dropdown and select it
sections: list = self.config_manager.get_profile_sections()
self.main_frame.update_profile_dropdown(sections)
# Setting the variable triggers load_profile_settings via trace
self.main_frame.profile_var.set(name)
# Status bar will be updated by load_profile_settings
except Exception as e:
# Handle errors during profile addition
log_handler.log_exception(
f"Error adding profile '{name}': {e}", func_name=func_name
)
status_final = f"Error adding profile '{name}'."
self.main_frame.show_error("Add Error", f"Failed:\n{e}")
self.main_frame.update_status_bar(status_final)
def remove_profile(self):
""" Handles removing the selected profile after user confirmation. """
func_name: str = "remove_profile"
log_handler.log_debug("'Remove Profile' button clicked.", func_name=func_name)
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Get the currently selected profile
profile: str = self.main_frame.profile_var.get()
# Validate profile selection
if not profile:
log_handler.log_warning(
"Remove failed: No profile selected.", func_name=func_name
)
self.main_frame.show_error("Error", "No profile selected.")
self.main_frame.update_status_bar("Remove failed: No profile.")
return
# Prevent removing the default profile
if profile == DEFAULT_PROFILE:
log_handler.log_warning(
"Attempt remove default denied.", func_name=func_name
)
self.main_frame.show_error(
"Denied", f"Cannot remove default profile ('{DEFAULT_PROFILE}')."
)
self.main_frame.update_status_bar("Cannot remove default.")
return
# Confirm removal with the user
if self.main_frame.ask_yes_no(
title="Confirm Remove",
message=f"Remove profile '{profile}'?\nThis cannot be undone."
):
# User confirmed, proceed with removal
log_handler.log_info(
f"Attempting remove profile: '{profile}'", func_name=func_name
)
self.main_frame.update_status_bar(
f"Processing: Removing profile '{profile}'..."
)
status_final: str = "Ready." # Default status message
try:
# Call ConfigManager to remove the profile section
removed: bool = self.config_manager.remove_profile_section(profile)
if removed:
# Save the configuration file after successful removal
self.config_manager.save_config()
log_handler.log_info(
f"Profile '{profile}' removed.", func_name=func_name
)
status_final = f"Profile '{profile}' removed."
# Update the profile dropdown and select another profile (e.g., default)
sections: list = self.config_manager.get_profile_sections()
self.main_frame.update_profile_dropdown(sections)
# Selecting another profile will trigger load_profile_settings
else:
# Handle rare case where remove_profile_section returns False
log_handler.log_error(
f"Failed remove profile '{profile}' (ConfigManager returned False).",
func_name=func_name
)
status_final = f"Error removing profile '{profile}'."
self.main_frame.show_error(
"Error", f"Could not remove '{profile}'. ConfigManager denied."
)
self.main_frame.update_status_bar(status_final)
except Exception as e:
# Handle errors during profile removal
log_handler.log_exception(
f"Error removing profile '{profile}': {e}", func_name=func_name
)
status_final = f"Error removing profile '{profile}'."
self.main_frame.show_error("Error", f"Failed:\n{e}")
self.main_frame.update_status_bar(status_final)
else:
# User cancelled the removal
log_handler.log_info("Profile removal cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Removal cancelled.")
# --- GUI Interaction & Helper Methods ---
def browse_folder(self, entry_widget: tk.Entry):
""" Opens a directory chooser dialog and updates the given Entry widget. """
func_name: str = "browse_folder"
current_path: str = entry_widget.get()
# Determine a sensible initial directory for the dialog
initial_dir: str = os.path.expanduser("~") # Default to user's home
if current_path and os.path.isdir(current_path):
# If current path is a valid directory, use it
initial_dir = current_path
elif current_path and os.path.exists(os.path.dirname(current_path)):
# If current path is not a dir, but its parent exists, use the parent
initial_dir = os.path.dirname(current_path)
log_handler.log_debug(
f"Opening folder browser. Initial: {initial_dir}", func_name=func_name
)
# Show the directory selection dialog
directory: Optional[str] = filedialog.askdirectory( # Returns str or None
initialdir=initial_dir,
title="Select Directory",
parent=self.master, # Make dialog modal to the main window
)
# If a directory was selected, update the entry widget
if directory:
log_handler.log_debug(
f"Directory selected: {directory}", func_name=func_name
)
entry_widget.delete(0, tk.END) # Clear current content
entry_widget.insert(0, directory) # Insert selected path
# If the SVN path was changed, trigger status update
if hasattr(self.main_frame, "svn_path_entry") and entry_widget == self.main_frame.svn_path_entry:
self.update_svn_status_indicator(directory)
else:
# User cancelled the dialog
log_handler.log_debug("Folder browse cancelled.", func_name=func_name)
def update_svn_status_indicator(self, svn_path: str):
"""
Checks the status of the directory specified by svn_path.
Updates the GUI indicator (Green/Red circle) and enables/disables
relevant action widgets based on whether it's a valid Git repository.
"""
func_name: str = "update_svn_status_indicator"
# Check if path is a valid directory
is_valid_dir: bool = bool(svn_path and os.path.isdir(svn_path))
# Check if it's a valid Git repo (contains a .git directory)
is_repo_ready: bool = is_valid_dir and os.path.exists(os.path.join(svn_path, ".git"))
log_handler.log_debug(
f"Updating repo status indicator. Path='{svn_path}', ValidDir={is_valid_dir}, Ready={is_repo_ready}",
func_name=func_name
)
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
# Update the colored indicator and its tooltip in the GUI
mf.update_svn_indicator(is_repo_ready)
# --- Determine State for Various Widgets ---
# State based on whether it's a prepared Git repo
repo_ready_state: str = tk.NORMAL if is_repo_ready else tk.DISABLED
# State based only on whether it's a valid directory path
valid_dir_state: str = tk.NORMAL if is_valid_dir else tk.DISABLED
# State for 'Prepare' button: enabled only if it's a valid dir BUT NOT already a repo
prepare_state: str = tk.NORMAL if is_valid_dir and not is_repo_ready else tk.DISABLED
# State for 'Fetch from Bundle' button (depends on repo state or bundle existence)
fetch_button_state: str = self._calculate_fetch_button_state(mf, svn_path, is_repo_ready)
# --- Apply States to Widgets ---
# Use try-except to prevent errors if a widget doesn't exist
try:
# Repository Tab
if hasattr(mf, "prepare_svn_button"):
mf.prepare_svn_button.config(state=prepare_state)
if hasattr(mf, "create_bundle_button"):
mf.create_bundle_button.config(state=repo_ready_state)
if hasattr(mf, "fetch_bundle_button"):
mf.fetch_bundle_button.config(state=fetch_button_state)
if hasattr(mf, "edit_gitignore_button"):
mf.edit_gitignore_button.config(state=repo_ready_state)
# Backup Tab
if hasattr(mf, "manual_backup_button"):
mf.manual_backup_button.config(state=valid_dir_state) # Only needs valid dir
# Commit/Changes Tab
if hasattr(mf, "autocommit_checkbox"):
mf.autocommit_checkbox.config(state=repo_ready_state)
if hasattr(mf, "commit_message_text"):
mf.commit_message_text.config(state=repo_ready_state)
if hasattr(mf, "refresh_changes_button"):
mf.refresh_changes_button.config(state=repo_ready_state)
if hasattr(mf, "commit_button"):
mf.commit_button.config(state=repo_ready_state)
# Tags Tab
if hasattr(mf, "refresh_tags_button"):
mf.refresh_tags_button.config(state=repo_ready_state)
if hasattr(mf, "create_tag_button"):
mf.create_tag_button.config(state=repo_ready_state)
if hasattr(mf, "checkout_tag_button"):
mf.checkout_tag_button.config(state=repo_ready_state)
if hasattr(mf, "tag_listbox"):
mf.tag_listbox.config(state=repo_ready_state)
# Branches (Local Ops) Tab
if hasattr(mf, "refresh_branches_button"):
mf.refresh_branches_button.config(state=repo_ready_state)
if hasattr(mf, "create_branch_button"):
mf.create_branch_button.config(state=repo_ready_state)
if hasattr(mf, "checkout_branch_button"):
mf.checkout_branch_button.config(state=repo_ready_state)
if hasattr(mf, "branch_listbox"):
mf.branch_listbox.config(state=repo_ready_state)
# History Tab
if hasattr(mf, "refresh_history_button"):
mf.refresh_history_button.config(state=repo_ready_state)
if hasattr(mf, "history_branch_filter_combo"):
combo_state: str = "readonly" if is_repo_ready else tk.DISABLED
mf.history_branch_filter_combo.config(state=combo_state)
if hasattr(mf, "history_text"):
mf.history_text.config(state=repo_ready_state)
# Remote Tab Widgets
if hasattr(mf, "apply_remote_config_button"):
mf.apply_remote_config_button.config(state=repo_ready_state)
if hasattr(mf, "check_auth_button"):
mf.check_auth_button.config(state=repo_ready_state)
if hasattr(mf, "fetch_button"):
mf.fetch_button.config(state=repo_ready_state)
if hasattr(mf, "pull_button"):
mf.pull_button.config(state=repo_ready_state)
if hasattr(mf, "push_button"):
mf.push_button.config(state=repo_ready_state)
if hasattr(mf, "push_tags_button"):
mf.push_tags_button.config(state=repo_ready_state)
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=repo_ready_state)
if hasattr(mf, "refresh_remote_branches_button"):
mf.refresh_remote_branches_button.config(state=repo_ready_state)
if hasattr(mf, "remote_branches_listbox"):
mf.remote_branches_listbox.config(state=repo_ready_state)
if hasattr(mf, "local_branches_listbox_remote_tab"):
mf.local_branches_listbox_remote_tab.config(state=repo_ready_state)
if hasattr(mf, "refresh_local_branches_button_remote_tab"):
mf.refresh_local_branches_button_remote_tab.config(state=repo_ready_state)
# Clear Changed Files list only if repo is NOT ready
if hasattr(mf, "changed_files_listbox"):
if not is_repo_ready:
log_handler.log_debug(
"Repo not ready, clearing changes list via status update.",
func_name=func_name
)
mf.update_changed_files_list(["(Repository not ready)"])
# else: If repo is ready, let the async refresh handle the list content
except Exception as e:
# Log errors if updating widget states fails
log_handler.log_error(
f"Error updating widget states based on repo status: {e}",
func_name=func_name
)
def _calculate_fetch_button_state(self, main_frame: 'MainFrame', svn_path: str, is_repo_ready: bool) -> str:
"""
Determines the state (NORMAL/DISABLED) for the 'Fetch from Bundle' button.
Enabled if the repo is ready (for fetch/merge) OR if the target directory
is usable and the specified bundle file exists (for cloning).
"""
func_name: str = "_calculate_fetch_button_state"
try:
# Check if the svn_path directory can be used as a clone destination
can_use_svn_dir_for_clone: bool = False
if svn_path:
if os.path.isdir(svn_path):
# Usable if it's an empty directory
try:
if not os.listdir(svn_path):
can_use_svn_dir_for_clone = True
except OSError:
# Ignore permission errors etc. when checking listdir
pass
else:
# If not a directory, check if the parent directory exists
# (so the target directory could potentially be created)
parent_dir: str = os.path.dirname(svn_path)
if parent_dir and os.path.isdir(parent_dir):
can_use_svn_dir_for_clone = True
elif not parent_dir: # Path is just a name in the current dir
can_use_svn_dir_for_clone = True
# Check if the specified bundle file exists in the USB/Target path
bundle_file_exists: bool = False
usb_path_str: str = main_frame.usb_path_entry.get().strip()
bundle_fetch_name: str = main_frame.bundle_updated_name_entry.get().strip()
# Check if USB path and bundle name are set and USB path is a directory
if usb_path_str and bundle_fetch_name and os.path.isdir(usb_path_str):
bundle_full_path: str = os.path.join(usb_path_str, bundle_fetch_name)
# Check if the bundle file actually exists
if os.path.isfile(bundle_full_path):
bundle_file_exists = True
# Enable the button if either condition is met
if is_repo_ready or (can_use_svn_dir_for_clone and bundle_file_exists):
return tk.NORMAL
else:
return tk.DISABLED
except Exception as e:
# Log errors during state calculation and default to disabled
log_handler.log_error(
f"Error checking fetch button state: {e}", func_name=func_name
)
return tk.DISABLED
def _is_repo_ready(self, repo_path: str) -> bool:
""" Checks if the given path points to a valid Git repository (.git exists). """
# Check if path is non-empty, is a directory, and contains a '.git' subdirectory
is_valid_git_repo: bool = bool(
repo_path
and os.path.isdir(repo_path)
and os.path.exists(os.path.join(repo_path, ".git"))
)
return is_valid_git_repo
def _parse_exclusions(self) -> tuple[set[str], set[str]]:
"""
Parses comma-separated exclusion strings from the GUI variables
into sets of lowercase extensions and directory base names.
Always includes '.git' and '.svn' in directory exclusions.
"""
# Initialize sets
excluded_extensions: set[str] = set()
# Always exclude .git and .svn directories from backups
excluded_dirs: set[str] = {".git", ".svn"}
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return excluded_extensions, excluded_dirs
mf: MainFrame = self.main_frame
# Parse excluded extensions
ext_string: str = mf.backup_exclude_extensions_var.get()
if ext_string:
# Split by comma, strip whitespace, convert to lowercase
for ext in ext_string.split(","):
cleaned_ext: str = ext.strip().lower()
if cleaned_ext:
# Ensure extension starts with a dot
normalized_ext: str = "." + cleaned_ext.lstrip(".")
excluded_extensions.add(normalized_ext)
# Parse excluded directory names
dir_string: str = mf.backup_exclude_dirs_var.get()
if dir_string:
# Split by comma, strip whitespace/slashes, convert to lowercase
for dirname in dir_string.split(","):
# Remove leading/trailing whitespace and path separators
cleaned_dir: str = dirname.strip().lower().strip(os.path.sep + "/")
# Add if valid name and not already included
if cleaned_dir and cleaned_dir not in {".", ".."} and cleaned_dir not in excluded_dirs:
excluded_dirs.add(cleaned_dir)
log_handler.log_debug(
f"Parsed Exclusions - Exts: {excluded_extensions}, Dirs: {excluded_dirs}",
func_name="_parse_exclusions",
)
return excluded_extensions, excluded_dirs
def _get_and_validate_svn_path(self, operation_name: str = "Operation") -> Optional[str]:
"""
Gets the Working Directory path from the GUI and validates that it's
an existing directory. Returns the absolute path or None if invalid.
"""
func_name: str = "_get_and_validate_svn_path"
# Ensure main frame and entry widget exist
if not hasattr(self, "main_frame") or not hasattr(mf := self.main_frame, "svn_path_entry"):
log_handler.log_error(
f"{operation_name} failed: SVN path entry widget missing.",
func_name=func_name
)
return None
# Get path from entry and remove leading/trailing whitespace
path_str: str = mf.svn_path_entry.get().strip()
# Check if the path string is empty
if not path_str:
log_handler.log_warning(
f"{operation_name} failed: Working Directory path is empty.",
func_name=func_name
)
mf.show_error("Input Error", "Working Directory path cannot be empty.")
mf.update_status_bar(f"{operation_name} failed: Path empty.")
return None
# Convert to absolute path and check if it's a valid directory
abs_path: str = os.path.abspath(path_str)
if not os.path.isdir(abs_path):
log_handler.log_warning(
f"{operation_name} failed: Path is not a valid directory: {abs_path}",
func_name=func_name
)
mf.show_error(
"Path Error",
f"The specified path is not a valid directory:\n{abs_path}"
)
mf.update_status_bar(f"{operation_name} failed: Not a directory.")
return None
# Path is valid, log and return the absolute path
log_handler.log_debug(
f"{operation_name}: Using validated Working Directory path: {abs_path}",
func_name=func_name
)
return abs_path
def _get_and_validate_usb_path(self, operation_name: str = "Operation") -> Optional[str]:
"""
Gets the Bundle Target Directory path from the GUI and validates that it's
an existing directory. Returns the absolute path or None if invalid.
"""
func_name: str = "_get_and_validate_usb_path"
# Ensure main frame and entry widget exist
if not hasattr(self, "main_frame") or not hasattr(mf := self.main_frame, "usb_path_entry"):
log_handler.log_error(
f"{operation_name} failed: Bundle Target path entry widget missing.",
func_name=func_name
)
return None
# Get path from entry and remove leading/trailing whitespace
path_str: str = mf.usb_path_entry.get().strip()
# Check if the path string is empty
if not path_str:
log_handler.log_warning(
f"{operation_name} failed: Bundle Target path is empty.",
func_name=func_name
)
mf.show_error("Input Error", "Bundle Target path cannot be empty.")
mf.update_status_bar(f"{operation_name} failed: Path empty.")
return None
# Convert to absolute path and check if it's a valid directory
abs_path: str = os.path.abspath(path_str)
if not os.path.isdir(abs_path):
log_handler.log_warning(
f"{operation_name} failed: Path is not a valid directory: {abs_path}",
func_name=func_name
)
mf.show_error(
"Path Error",
f"The specified path is not a valid directory:\n{abs_path}"
)
mf.update_status_bar(f"{operation_name} failed: Not a directory.")
return None
# Path is valid, log and return the absolute path
log_handler.log_debug(
f"{operation_name}: Using validated Bundle Target path: {abs_path}",
func_name=func_name
)
return abs_path
def _clear_and_disable_fields(self):
"""
Clears most GUI input fields and list displays, and disables action buttons.
Typically used when no profile is selected or the repository is not ready.
"""
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Clearing and disabling fields.", func_name="_clear_and_disable_fields"
)
# --- Clear Text Entries and Variables ---
if hasattr(mf, "svn_path_entry"): mf.svn_path_entry.delete(0, tk.END)
if hasattr(mf, "usb_path_entry"): mf.usb_path_entry.delete(0, tk.END)
if hasattr(mf, "bundle_name_entry"): mf.bundle_name_entry.delete(0, tk.END)
if hasattr(mf, "bundle_updated_name_entry"): mf.bundle_updated_name_entry.delete(0, tk.END)
if hasattr(mf, "clear_commit_message"): mf.clear_commit_message()
if hasattr(mf, "backup_dir_var"): mf.backup_dir_var.set("")
if hasattr(mf, "backup_exclude_extensions_var"): mf.backup_exclude_extensions_var.set("")
if hasattr(mf, "backup_exclude_dirs_var"): mf.backup_exclude_dirs_var.set("")
if hasattr(mf, "remote_url_var"): mf.remote_url_var.set("")
if hasattr(mf, "remote_name_var"): mf.remote_name_var.set("")
if hasattr(mf, "autobackup_var"): mf.autobackup_var.set(False)
if hasattr(mf, "autocommit_var"): mf.autocommit_var.set(False)
# Update widgets linked to variables (like backup dir entry state)
if hasattr(mf, "toggle_backup_dir"): mf.toggle_backup_dir()
# --- Clear Dynamic Lists using helper ---
self._update_gui_for_not_ready_state()
# --- Disable Action Widgets ---
# Calling update_svn_status_indicator with empty path handles disabling most widgets
self.update_svn_status_indicator("")
# Ensure profile buttons (except Add/Clone) are also disabled
if hasattr(mf, "remove_profile_button"):
mf.remove_profile_button.config(state=tk.DISABLED)
if hasattr(mf, "save_settings_button"):
mf.save_settings_button.config(state=tk.DISABLED)
# Set final status bar message
mf.update_status_bar("No profile selected or repository not ready.")
def show_fatal_error(self, message: str):
""" Displays a fatal error message box and attempts to close the application. """
log_handler.log_critical(
f"FATAL ERROR: {message}", func_name="show_fatal_error"
)
try:
# Try to use the main window as parent for the message box
parent_window: Optional[tk.Tk] = None
if hasattr(self, "master") and self.master and self.master.winfo_exists():
parent_window = self.master
messagebox.showerror("Fatal Error", message, parent=parent_window)
except Exception as e:
# Fallback to printing error if GUI fails
print(f"FATAL ERROR (GUI message failed: {e}): {message}", file=sys.stderr)
finally:
# Always attempt to close the application after a fatal error
self.on_closing()
def show_comparison_summary(self, ref1: str, ref2: str, repo_path: str, changed_files: List[str]):
"""
Opens the DiffSummaryWindow to display comparison results.
Called by the AsyncResultHandler.
"""
func_name: str = "show_comparison_summary"
log_handler.log_debug(
f"Attempting to show comparison summary: {ref1} vs {ref2}",
func_name=func_name
)
# Verifica che i dati necessari siano presenti
if not all([ref1, ref2, repo_path, isinstance(changed_files, list)]):
log_handler.log_error(
"Missing data required to show comparison summary.", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Display Error", "Internal error: Missing data for comparison."
)
# Assicurati di riabilitare i widget anche in caso di errore qui
self._reenable_widgets_after_modal() # Usa l'helper per riabilitare
return
try:
# Crea e mostra la finestra modale
log_handler.log_debug(
f"Opening DiffSummaryWindow with {len(changed_files)} files.",
func_name=func_name
)
# Assicurati che DiffSummaryWindow sia importato correttamente all'inizio del file
DiffSummaryWindow(
master=self.master, # Parent è la finestra root
git_commands=self.git_commands, # Passa l'istanza dei comandi
repo_path=repo_path,
ref1=ref1,
ref2=ref2,
changed_files_status=changed_files # Passa la lista
)
# Il codice attende qui finché la finestra non viene chiusa
log_handler.log_info("Diff Summary window closed by user.", func_name=func_name)
# Ripristina la status bar (la finestra era modale)
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Ready.")
except Exception as e_summary:
# Gestisci errori durante la creazione/visualizzazione della finestra
log_handler.log_exception(
f"Error opening diff summary window: {e_summary}", func_name=func_name
)
if hasattr(self.main_frame, "show_error") and hasattr(self.main_frame, "update_status_bar"):
self.main_frame.show_error(
"Display Error", f"Could not display comparison results:\n{e_summary}"
)
self.main_frame.update_status_bar("Error displaying comparison.")
finally:
# Assicurati che i widget vengano riabilitati dopo che la finestra
# (o il messaggio di errore) è stata chiusa.
self._reenable_widgets_after_modal() # Usa l'helper per riabilitare
# --- Helper Methods for Updating GUI in Specific States ---
def _update_gui_for_not_ready_state(self):
""" Centralizes GUI updates needed when the repository is not ready. """
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'Repo Not Ready' state.",
func_name="_update_gui_for_not_ready_state"
)
# Clear all dynamic list displays
if hasattr(mf, "update_tag_list"):
mf.update_tag_list([("(Repo not ready)", "")])
if hasattr(mf, "update_branch_list"):
mf.update_branch_list([], None) # Clears both local lists
if hasattr(mf, "update_history_display"):
mf.update_history_display(["(Repo not ready)"])
if hasattr(mf, "update_history_branch_filter"):
mf.update_history_branch_filter([])
if hasattr(mf, "update_changed_files_list"):
mf.update_changed_files_list(["(Repo not ready)"])
if hasattr(mf, "update_remote_branches_list"):
mf.update_remote_branches_list(["(Repo not ready)"])
# Update sync status label
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(status_text="Sync Status: (Repo not ready)")
# Optional: Update status bar explicitly if needed
# mf.update_status_bar("Ready (Repo not ready).")
def _update_gui_for_detached_head(self, current_branch_name: Optional[str]):
""" Centralizes GUI updates for detached HEAD state. """
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'Detached HEAD' state.",
func_name="_update_gui_for_detached_head"
)
# Update sync status label
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(
current_branch=current_branch_name, # Pass None here
status_text="Sync Status: (Detached HEAD)"
)
# Disable sync status refresh button when detached
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=tk.DISABLED)
def _update_gui_for_no_upstream(self, current_branch_name: Optional[str]):
""" Centralizes GUI updates when no upstream is set for the current branch. """
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'No Upstream' state.",
func_name="_update_gui_for_no_upstream"
)
# Update sync status label
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(
current_branch=current_branch_name,
status_text=f"Sync Status: Upstream not set"
)
# Disable sync status refresh button if no upstream is configured
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=tk.DISABLED)
def _reenable_widgets_after_modal(self):
"""
Schedules widget re-enabling after a short delay.
Useful after modal windows close or errors are handled.
"""
func_name: str = "_reenable_widgets_after_modal"
# ---<<< CORREZIONE: Usa self.master e self._reenable_widgets_if_ready >>>---
# Verifica se la finestra master (attributo di GitSvnSyncApp) esiste
if hasattr(self, "master") and self.master.winfo_exists():
# Usa self.master.after per schedulare la chiamata a self._reenable_widgets_if_ready
self.master.after(50, self._reenable_widgets_if_ready)
log_handler.log_debug("Scheduled widget re-enable.", func_name=func_name)
# ---<<< FINE CORREZIONE >>>---
else:
log_handler.log_warning(
"Cannot schedule widget re-enable: Master window destroyed.",
func_name=func_name
)
def _update_gui_for_status_error(self):
""" Centralizes GUI updates when there's an error getting status info. """
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
mf: MainFrame = self.main_frame
log_handler.log_debug(
"Updating GUI for 'Status Error' state.",
func_name="_update_gui_for_status_error"
)
# Update sync status label
if hasattr(mf, "update_ahead_behind_status"):
mf.update_ahead_behind_status(status_text="Sync Status: Error getting info")
# Disable sync status refresh button on error
if hasattr(mf, "refresh_sync_status_button"):
mf.refresh_sync_status_button.config(state=tk.DISABLED)
# --- Helper to update authentication status (internal state + GUI) ---
def _update_gui_auth_status(self, status: str):
"""
Updates the internal authentication status state variable and calls
the MainFrame method to update the visual indicator. Also updates
the sync status label if auth/connection fails.
"""
# Update internal state tracking authentication status
self.remote_auth_status = status
# Update the GUI indicator if the main frame exists
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
# Call the method within MainFrame to update the indicator's text/color
self.main_frame._update_auth_status_indicator(status)
# If connection or authentication failed, update the sync status label too
if status != "ok":
if hasattr(self.main_frame, "update_ahead_behind_status"):
# Provide a user-friendly status text based on the error type
# Replace underscores and capitalize for display
sync_status_text: str = f"Sync Status: ({status.replace('_', ' ').title()})"
self.main_frame.update_ahead_behind_status(status_text=sync_status_text)
def _start_async_operation(
self,
worker_func: Callable, # The worker function from async_workers.py
args_tuple: tuple, # Arguments for the worker function (excluding queue)
context_dict: dict # Context information for result handling
):
"""
Generic helper to start an async operation in a separate thread.
Disables GUI action widgets and updates the status bar during execution.
Args:
worker_func (Callable): The target function to run in the thread.
args_tuple (tuple): A tuple containing arguments for the worker_func.
context_dict (dict): A dictionary containing context information
(like 'context', 'status_msg') for the task
and its result processing.
"""
# Ensure main frame is available
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot start async op: Main frame missing.",
func_name="_start_async_operation"
)
return
# Extract context details for logging and status updates
context_name: str = context_dict.get("context", "unknown_op")
# Status message displayed to the user while processing
status_msg: str = context_dict.get("status_msg", context_name)
log_handler.log_info(
f"--- Action Triggered: {context_name} (Async Start) ---",
func_name=context_name
)
# --- Prepare GUI for Async Operation ---
# Disable main action widgets to prevent concurrent operations
self.main_frame.set_action_widgets_state(tk.DISABLED)
# Update status bar to indicate processing with a yellow background
self.main_frame.update_status_bar(
message=f"Processing: {status_msg}...",
bg_color=self.main_frame.STATUS_YELLOW # Use yellow for processing
)
# --- Setup Communication Queue ---
# Create a queue for this specific operation to receive the result
# Maxsize=1 is sufficient as we expect only one result per operation
results_queue: queue.Queue = queue.Queue(maxsize=1)
# --- Prepare Worker Thread ---
# Combine the provided arguments with the results queue for the worker
full_args: tuple = args_tuple + (results_queue,)
# Create and start the worker thread
log_handler.log_debug(
f"Creating worker thread for {context_name}. Worker func: {worker_func.__name__}",
func_name="_start_async_operation"
)
try:
worker_thread = threading.Thread(
target=worker_func,
args=full_args,
daemon=True # Set as daemon so it exits if main app closes unexpectedly
)
log_handler.log_debug(
f"Starting worker thread for {context_name}.",
func_name="_start_async_operation"
)
worker_thread.start()
except Exception as thread_e:
# Handle errors during thread creation/start
log_handler.log_exception(
f"Failed to start worker thread for {context_name}: {thread_e}",
func_name="_start_async_operation"
)
# Show error to user and re-enable GUI immediately
self.main_frame.show_error(
"Threading Error", f"Could not start background task for {context_name}."
)
self.main_frame.update_status_bar(
f"Error starting task: {context_name}",
bg_color=self.main_frame.STATUS_RED,
duration_ms=10000
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
return # Stop if thread cannot be started
# --- Schedule Result Check ---
# Schedule the _check_completion_queue method to run after a short delay
# to check for results from the worker thread.
log_handler.log_debug(
f"Scheduling completion check for {context_name}.",
func_name="_start_async_operation"
)
self.master.after(
self.ASYNC_QUEUE_CHECK_INTERVAL_MS,
self._check_completion_queue, # Method to call for checking
results_queue, # Queue to check
context_dict # Pass original context for handling
)
# --- Specific Action Launchers (Call _start_async_operation) ---
# --- Refresh Actions ---
def refresh_tag_list(self):
""" Starts async operation to refresh the tag list in the GUI. """
func_name: str ="refresh_tag_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Tags")
# Check if repo is ready before starting async task
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Tags skipped: Repo not ready.", func_name=func_name
)
# Update GUI immediately if repo not ready
self._update_gui_for_not_ready_state()
return
# Prepare arguments for the worker function
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_tags_async,
args_tuple=args,
context_dict={"context": "refresh_tags", "status_msg": "Refreshing tags"}
)
def refresh_remote_status(self):
"""
Starts the asynchronous check for ahead/behind status of the current
branch against its upstream counterpart.
"""
func_name: str = "refresh_remote_status"
log_handler.log_info(
f"--- Action Triggered: Refresh Remote Sync Status ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Sync Status")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Refresh Status skipped: Repo not ready.", func_name=func_name
)
# Update GUI to show 'not ready' state
self._update_gui_for_not_ready_state()
return
# --- Get Current Branch and Upstream ---
current_branch: Optional[str] = None
upstream_branch: Optional[str] = None
try:
# Get the name of the current local branch
current_branch = self.git_commands.get_current_branch_name(svn_path)
if current_branch:
# If on a branch, get its configured upstream
upstream_branch = self.git_commands.get_branch_upstream(
svn_path, current_branch
)
else:
# Handle detached HEAD state
log_handler.log_warning(
"Refresh Status: Cannot get status, currently in detached HEAD state.",
func_name=func_name
)
self._update_gui_for_detached_head(current_branch)
return # Cannot check sync status in detached HEAD
# Handle case where upstream is not configured
if not upstream_branch:
log_handler.log_info(
f"Refresh Status: No upstream configured for branch '{current_branch}'.",
func_name=func_name
)
self._update_gui_for_no_upstream(current_branch)
return # Cannot check sync status without upstream
# Enable refresh button if we have branch and upstream (might be disabled)
if hasattr(self.main_frame, "refresh_sync_status_button"):
self.main_frame.refresh_sync_status_button.config(state=tk.NORMAL)
except Exception as e:
# Handle errors getting branch/upstream info
log_handler.log_exception(
f"Error getting branch/upstream before status check: {e}",
func_name=func_name
)
self._update_gui_for_status_error() # Update GUI to show error state
return
# --- Start Async Worker ---
log_handler.log_info(
f"Checking ahead/behind status for '{current_branch}' vs '{upstream_branch}'...",
func_name=func_name
)
# Update GUI label to "Checking..."
if hasattr(self.main_frame, "update_ahead_behind_status"):
self.main_frame.update_ahead_behind_status(
current_branch=current_branch, status_text="Sync Status: Checking..."
)
# Prepare arguments for the worker
args: tuple = (self.git_commands, svn_path, current_branch, upstream_branch)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_get_ahead_behind_async, # Worker function
args_tuple=args,
context_dict={
"context": "get_ahead_behind",
"status_msg": f"Checking sync status for '{current_branch}'",
"local_branch": current_branch, # Pass context for result handler
"upstream_branch": upstream_branch,
}
)
def refresh_branch_list(self):
""" Starts async operation to refresh the local branch list in the GUI. """
func_name: str ="refresh_branch_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Branches")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Branches skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state()
return
# Prepare arguments
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_branches_async, # Worker for local branches
args_tuple=args,
context_dict={"context": "refresh_branches", "status_msg": "Refreshing branches"}
)
def refresh_commit_history(self):
""" Starts async operation to refresh the commit history display. """
func_name: str ="refresh_commit_history"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh History")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh History skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state()
return
# Determine branch filter from GUI
branch_filter: Optional[str] = None
log_scope: str = "All History"
if hasattr(self.main_frame, "history_branch_filter_var"):
filter_selection: str = self.main_frame.history_branch_filter_var.get()
# Use filter only if a specific branch/tag is selected
if filter_selection and filter_selection != "-- All History --":
branch_filter = filter_selection
log_scope = f"'{branch_filter}'"
# Prepare arguments
args: tuple = (self.git_commands, svn_path, branch_filter, log_scope)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_history_async,
args_tuple=args,
context_dict={
"context": "refresh_history",
"status_msg": f"Refreshing history for {log_scope}"
}
)
def refresh_changed_files_list(self):
""" Starts async operation to refresh the list of changed files. """
func_name: str ="refresh_changed_files_list"
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Changed Files")
# Check if repo is ready
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_debug(
"Refresh Changes skipped: Repo not ready.", func_name=func_name
)
# GUI update handled by _update_gui_for_not_ready_state if needed
return
# Prepare arguments
args: tuple = (self.git_commands, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_refresh_changes_async,
args_tuple=args,
context_dict={
"context": "refresh_changes",
"status_msg": "Refreshing changed files"
}
)
# --- Local Repo / Bundle / Backup Actions ---
def prepare_svn_for_git(self):
""" Starts async operation to prepare the repository (init, gitignore). """
func_name: str ="prepare_svn_for_git"
svn_path: Optional[str] = self._get_and_validate_svn_path("Prepare Repository")
# Check if path is valid before starting
if not svn_path:
# Error message shown by validation method
self.main_frame.update_status_bar("Prepare failed: Invalid path.")
return
# Check if already prepared (avoid unnecessary work)
if self._is_repo_ready(svn_path):
log_handler.log_info(
"Prepare skipped: Repository already prepared.", func_name=func_name
)
self.main_frame.show_info("Info", "Repository is already prepared.")
# Ensure GUI state reflects readiness
self.update_svn_status_indicator(svn_path)
return
# Prepare arguments
args: tuple = (self.action_handler, svn_path)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_prepare_async,
args_tuple=args,
context_dict={
"context": "prepare_repo",
"status_msg": "Preparing repository"
}
)
def create_git_bundle(self):
""" Starts async operation to create a Git bundle file. """
func_name: str ="create_git_bundle"
# Gather and validate inputs from GUI
profile: str = self.main_frame.profile_var.get()
svn_path: Optional[str] = self._get_and_validate_svn_path("Create Bundle")
usb_path: Optional[str] = self._get_and_validate_usb_path("Create Bundle")
bundle_name: str = self.main_frame.bundle_name_entry.get().strip()
# Check if all required inputs are present
if not profile or not svn_path or not usb_path or not bundle_name:
log_handler.log_warning(
"Create Bundle cancelled: Missing inputs.", func_name=func_name
)
# Specific error messages shown by validation methods
return
# Check if repository is ready
if not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Create Bundle failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not prepared.")
self.main_frame.update_status_bar("Create Bundle failed: Repo not ready.")
return
# Ensure bundle name has the correct extension
if not bundle_name.lower().endswith(".bundle"):
bundle_name += ".bundle"
bundle_full_path: str = os.path.join(usb_path, bundle_name)
# Save profile settings before starting the operation
if not self.save_profile_settings():
# Ask user if they want to proceed even if saving failed
if not self.main_frame.ask_yes_no(
"Warning",
"Could not save profile settings.\nContinue creating bundle anyway?"
):
self.main_frame.update_status_bar(
"Create Bundle cancelled (profile save failed)."
)
return
# Prepare parameters for the worker
excluded_extensions: set[str]
excluded_dirs: set[str]
excluded_extensions, excluded_dirs = self._parse_exclusions()
backup_enabled: bool = self.main_frame.autobackup_var.get()
backup_dir: str = self.main_frame.backup_dir_var.get()
commit_enabled: bool = self.main_frame.autocommit_var.get()
commit_msg: str = self.main_frame.get_commit_message()
# Prepare arguments tuple for the worker
args: tuple = (
self.action_handler,
svn_path,
bundle_full_path,
profile,
backup_enabled,
backup_dir,
commit_enabled,
commit_msg,
excluded_extensions,
excluded_dirs,
)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_create_bundle_async,
args_tuple=args,
context_dict={
"context": "create_bundle",
"status_msg": f"Creating bundle '{bundle_name}'",
"committed_flag_possible": True, # Context hint for result handler
}
)
def fetch_from_git_bundle(self):
""" Starts async operation to fetch/clone from a Git bundle file. """
func_name: str ="fetch_from_git_bundle"
# Gather and validate inputs
profile: str = self.main_frame.profile_var.get()
# svn_path_str can be a non-existent dir if cloning
svn_path_str: str = self.main_frame.svn_path_entry.get().strip()
usb_path: Optional[str] = self._get_and_validate_usb_path("Fetch Bundle")
bundle_name: str = self.main_frame.bundle_updated_name_entry.get().strip()
# Check for missing inputs
if not profile or not svn_path_str or not usb_path or not bundle_name:
log_handler.log_warning(
"Fetch Bundle cancelled: Missing inputs.", func_name=func_name
)
return
# Construct full bundle path and check if it exists BEFORE starting async op
bundle_full_path: str = os.path.join(usb_path, bundle_name)
if not os.path.isfile(bundle_full_path):
log_handler.log_error(
f"Fetch Bundle failed: Bundle file not found at '{bundle_full_path}'",
func_name=func_name
)
self.main_frame.show_error(
"File Not Found", f"Bundle file not found:\n{bundle_full_path}"
)
self.main_frame.update_status_bar("Fetch failed: Bundle not found.")
return
# Save profile settings before action
if not self.save_profile_settings():
if not self.main_frame.ask_yes_no(
"Warning",
"Could not save profile settings.\nContinue fetching from bundle anyway?"
):
self.main_frame.update_status_bar(
"Fetch cancelled (profile save failed)."
)
return
# Prepare parameters for worker
excluded_extensions: set[str]
excluded_dirs: set[str]
excluded_extensions, excluded_dirs = self._parse_exclusions()
backup_enabled: bool = self.main_frame.autobackup_var.get()
backup_dir: str = self.main_frame.backup_dir_var.get()
# Prepare arguments tuple
args: tuple = (
self.action_handler,
svn_path_str,
bundle_full_path,
profile,
backup_enabled,
backup_dir,
excluded_extensions,
excluded_dirs,
)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_fetch_bundle_async,
args_tuple=args,
context_dict={
"context": "fetch_bundle",
"status_msg": f"Fetching from '{bundle_name}'",
"repo_path": svn_path_str, # Pass path for potential conflict message
}
)
def show_commit_details(self, commit_details: Dict[str, Any]):
"""
Opens the CommitDetailWindow to display details of a specific commit.
Called by the AsyncResultHandler after fetching commit data.
Args:
commit_details (Dict[str, Any]): A dictionary containing commit metadata
(hash, author, date, subject, body)
and a list of changed files.
"""
func_name: str = "show_commit_details"
log_handler.log_debug(
f"Attempting to show commit details for hash: {commit_details.get('hash_full', 'N/A')}",
func_name=func_name
)
# Validazione base dei dati ricevuti
if not isinstance(commit_details, dict) or not commit_details.get('hash_full'):
log_handler.log_error(
"Invalid or incomplete commit details received.", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Display Error", "Internal error: Received invalid commit data."
)
# Riabilita widget se i dati non sono validi
self._reenable_widgets_after_modal()
return
try:
# Crea e mostra la finestra modale CommitDetailWindow
log_handler.log_debug(
f"Opening CommitDetailWindow for commit {commit_details.get('hash_full')[:7]}...",
func_name=func_name
)
CommitDetailWindow(
master=self.master, # Parent è la finestra root
commit_data=commit_details, # Passa il dizionario dei dati
# Passa il callback per aprire il diff di un file specifico
open_diff_callback=self._open_commit_file_diff
)
# Il codice attende qui finché la finestra CommitDetailWindow non viene chiusa
log_handler.log_info("Commit Detail window closed by user.", func_name=func_name)
# Ripristina la status bar dopo la chiusura della finestra
if hasattr(self.main_frame, "update_status_bar"):
self.main_frame.update_status_bar("Ready.")
except Exception as e_detail:
# Gestisci errori durante la creazione/visualizzazione della finestra
log_handler.log_exception(
f"Error opening commit detail window: {e_detail}", func_name=func_name
)
if hasattr(self.main_frame, "show_error") and hasattr(self.main_frame, "update_status_bar"):
self.main_frame.show_error(
"Display Error", f"Could not display commit details:\n{e_detail}"
)
self.main_frame.update_status_bar("Error displaying commit details.")
finally:
# Assicurati che i widget vengano riabilitati dopo che la finestra
# (o il messaggio di errore) è stata chiusa.
self._reenable_widgets_after_modal()
def _open_commit_file_diff(
self,
commit_hash: str,
file_status: str,
file_path: str,
old_file_path: Optional[str] = None
):
"""
Opens the DiffViewerWindow to show the changes for a specific file
within a given commit compared to its parent.
Args:
commit_hash (str): The full hash of the commit being viewed.
file_status (str): The status character (A, M, D, R, T...).
file_path (str): The relative path of the file in the commit.
old_file_path (Optional[str]): The old path (for Renamed files).
"""
func_name: str = "_open_commit_file_diff"
log_handler.log_info(
f"Requesting diff for file '{file_path}' in commit '{commit_hash[:7]}'",
func_name=func_name
)
# Validazione repo path corrente
svn_path: Optional[str] = self._get_and_validate_svn_path("Open Commit File Diff")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_error(
"Cannot open diff: Repository path invalid or not ready.", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error("Error", "Cannot open diff: Repository not available.")
return
# Determina i riferimenti per il diff viewer
ref1: str = f"{commit_hash}^" # Genitore del commit
ref2: str = commit_hash # Il commit stesso
# --- Gestione Casi Speciali ---
# 1. File Aggiunto (A) o Copiato (C): Confronta con "stato vuoto" (come fare?)
# Git diff <parent> <commit> -- <file> mostra l'intero file come aggiunto.
# Possiamo passare i ref standard, DiffViewerWindow lo mostrerà correttamente.
# 2. File Cancellato (D): Confronta il file nel parent con "stato vuoto".
# DiffViewerWindow dovrebbe mostrare il file come cancellato.
# Per Deleted files, il path da usare è quello *prima* della cancellazione.
# 3. File Rinominato (R): Dobbiamo usare old_file_path per ref1 e file_path per ref2?
# No, `git diff commit^ commit -- file` gestisce la rinomina. Usiamo il *nuovo* path.
# 4. Commit Iniziale (Root Commit): Non ha genitore (`commit^` fallisce).
# Dobbiamo rilevarlo. Possiamo provare `git rev-parse commit^`. Se fallisce,
# è un root commit. In tal caso, mostriamo il file solo in ref2 (vs "empty").
# Per semplicità iniziale, potremmo anche solo mostrare un errore se `git show`
# per `commit^:path` fallisce.
path_to_diff: str = file_path
if file_status == 'D' and old_file_path:
# Per file cancellati, in realtà vogliamo vedere il contenuto *prima*
# ma il diff tra parent e commit con il path nuovo non funziona.
# È più corretto mostrare il file come esisteva nel parent.
# Potremmo aprire DiffViewer in modo speciale solo con ref1?
# O semplicemente non permettere il diff per file cancellati in questa vista?
# Per ora, mostriamo un messaggio e non apriamo il diff per 'D'.
log_handler.log_info(
f"Diff view skipped for deleted file '{file_path}' in commit '{commit_hash[:7]}'.",
func_name=func_name
)
if hasattr(self.main_frame, "show_info"):
self.main_frame.show_info(
"Diff Not Applicable",
f"Cannot show diff for file deleted in this commit:\n{file_path}"
)
return
# --- Apri DiffViewerWindow ---
log_handler.log_debug(
f"Opening DiffViewerWindow: Ref1='{ref1}', Ref2='{ref2}', Path='{path_to_diff}'",
func_name=func_name
)
try:
# Istanzia e mostra la finestra di diff modale
DiffViewerWindow(
master=self.master, # Usa la finestra root come parent
git_commands=self.git_commands,
repo_path=svn_path,
relative_file_path=path_to_diff, # Usa il path corretto
ref1=ref1, # Commit genitore
ref2=ref2 # Commit selezionato
)
log_handler.log_debug("Commit file diff viewer closed.", func_name=func_name)
# Non serve aggiornare status bar qui, la finestra CommitDetail è ancora aperta
except Exception as e_diff:
log_handler.log_exception(
f"Error opening commit file diff viewer: {e_diff}", func_name=func_name
)
if hasattr(self.main_frame, "show_error"):
self.main_frame.show_error(
"Diff Viewer Error", f"Could not display file changes:\n{e_diff}"
)
def manual_backup(self):
""" Starts async operation for creating a manual backup ZIP. """
func_name: str ="manual_backup"
# Gather and validate inputs
profile: str = self.main_frame.profile_var.get()
svn_path: Optional[str] = self._get_and_validate_svn_path(f"Manual Backup ({profile})")
backup_dir_str: str = self.main_frame.backup_dir_var.get().strip()
# Check required inputs
if not profile or not svn_path:
# Error already shown by validation method
return
if not backup_dir_str:
log_handler.log_warning(
"Manual backup failed: Backup directory is empty.", func_name=func_name
)
self.main_frame.show_error(
"Input Error", "Backup directory cannot be empty for manual backup."
)
self.main_frame.update_status_bar("Manual backup failed: Backup dir empty.")
return
# Validate backup directory path
backup_dir_abs: str = os.path.abspath(backup_dir_str)
# Check if path exists and is not a directory (create_zip_backup handles creation)
if os.path.exists(backup_dir_abs) and not os.path.isdir(backup_dir_abs):
log_handler.log_error(
f"Manual backup failed: Backup path exists but is not a directory: {backup_dir_abs}",
func_name=func_name
)
self.main_frame.show_error(
"Path Error", f"Backup path exists but is not a directory:\n{backup_dir_abs}"
)
self.main_frame.update_status_bar(
"Manual backup failed: Invalid backup path."
)
return
# Save profile settings before backup
if not self.save_profile_settings():
if not self.main_frame.ask_yes_no(
"Warning", "Could not save profile settings.\nContinue backup anyway?"
):
self.main_frame.update_status_bar(
"Backup cancelled (profile save failed)."
)
return
# Prepare parameters for worker
excluded_extensions: set[str]
excluded_dirs: set[str]
excluded_extensions, excluded_dirs = self._parse_exclusions()
# Prepare arguments tuple
args: tuple = (
self.backup_handler,
svn_path,
backup_dir_abs,
profile,
excluded_extensions,
excluded_dirs
)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_manual_backup_async,
args_tuple=args,
context_dict={"context": "manual_backup", "status_msg": "Creating manual backup"}
)
# --- Git Actions (Commit, Tag, Branch, etc.) ---
def commit_changes(self):
""" Starts async operation to commit staged changes with GUI message. """
func_name: str = "commit_changes"
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Commit")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Commit failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Commit failed: Repo not ready.")
return
# Validate commit message
commit_msg: str = self.main_frame.get_commit_message()
if not commit_msg:
log_handler.log_warning(
"Commit failed: Commit message is empty.", func_name=func_name
)
self.main_frame.show_error("Input Error", "Commit message cannot be empty.")
self.main_frame.update_status_bar("Commit failed: Empty message.")
return
# Prepare args and start async operation
args: tuple = (self.action_handler, svn_path, commit_msg)
self._start_async_operation(
worker_func=async_workers.run_commit_async,
args_tuple=args,
context_dict={
"context": "commit",
"status_msg": "Committing changes",
"committed_flag_possible": True # Hint for result handler
}
)
def open_gitignore_editor(self):
""" Opens the .gitignore editor window (Synchronous GUI action). """
# This action is synchronous as it opens a modal dialog
func_name: str = "open_gitignore_editor"
log_handler.log_info(
f"--- Action Triggered: Edit .gitignore ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
self.main_frame.update_status_bar("Processing: Opening .gitignore editor...")
svn_path: Optional[str] = self._get_and_validate_svn_path("Edit .gitignore")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Cannot edit .gitignore: Repo path invalid/not ready.",
func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Select a valid and prepared repository first."
)
self.main_frame.update_status_bar("Edit failed: Repo not ready.")
return
# Construct path and open editor window
gitignore_path: str = os.path.join(svn_path, ".gitignore")
log_handler.log_debug(
f"Target .gitignore path: {gitignore_path}", func_name=func_name
)
status_after_edit: str = "Ready." # Default status after editor closes
try:
log_handler.log_debug("Opening GitignoreEditorWindow...", func_name=func_name)
# Open the modal editor window, passing the callback for successful save
GitignoreEditorWindow(
master=self.master, # Parent window
gitignore_path=gitignore_path,
logger_ignored=None, # Logger no longer passed
on_save_success_callback=self._handle_gitignore_save # Method to call on save
)
# Code execution pauses here until the editor window is closed
log_handler.log_debug("Gitignore editor window closed.", func_name=func_name)
# Update status bar only if no async operation was started by the callback
if not self.main_frame.status_bar_var.get().startswith("Processing"):
self.main_frame.update_status_bar(status_after_edit)
except Exception as e:
# Handle errors opening the editor
log_handler.log_exception(
f"Error opening or running .gitignore editor: {e}", func_name=func_name
)
status_after_edit = "Error opening .gitignore editor."
self.main_frame.show_error("Editor Error", f"Could not open editor:\n{e}")
self.main_frame.update_status_bar(status_after_edit)
def open_diff_viewer(self, file_status_line: str):
"""
Opens the Diff Viewer window for a file from the 'changed files' list.
Compares Working Directory vs HEAD by default for changed files.
Uses helper to extract path. This is a synchronous GUI action.
"""
func_name: str = "open_diff_viewer"
log_handler.log_debug(
f"Received file_status_line: {repr(file_status_line)}", func_name=func_name
)
log_handler.log_info(
f"--- Action Triggered: Open Diff Viewer for Changed File ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Open Diff Viewer")
if not svn_path:
# Error message shown by validation method
self.main_frame.update_status_bar(
"Error: Cannot open diff (invalid repo path)."
)
return
# Extract the relative path using the helper method
relative_path: Optional[str] = self._extract_path_from_status_line(
file_status_line
)
log_handler.log_debug(
f"Extracted relative_path via helper: {repr(relative_path)}",
func_name=func_name
)
# Check if path extraction was successful
if not relative_path:
log_handler.log_error(
f"Could not extract valid path from status line: {file_status_line}",
func_name=func_name
)
self.main_frame.show_error(
"Path Error",
f"Could not parse file path from selected line:\n{file_status_line}"
)
self.main_frame.update_status_bar("Error: Invalid selection for diff.")
return
# --- Check Status Code (e.g., prevent diff for deleted files vs WD) ---
status_code: str = file_status_line.strip('\x00').strip()[:2].strip()
# Prevent showing diff for files marked as Deleted (' D') against Working Dir
if status_code == 'D':
msg: str = (
f"Cannot show Working Dir vs HEAD diff for a deleted file:\n"
f"{relative_path}"
)
log_handler.log_info(msg, func_name=func_name)
self.main_frame.show_info("Diff Not Applicable", msg)
self.main_frame.update_status_bar(
"Ready (Diff not applicable for deleted file)."
)
return
# Add checks for other non-diffable statuses if needed (e.g., '??', '!!')
if status_code in ['??', '!!']:
msg: str = (
f"Cannot show diff for file with status '{status_code}':\n"
f"{relative_path}\n\n"
f"(Untracked or Ignored files cannot be diffed against HEAD)."
)
log_handler.log_info(
f"Diff not applicable for status '{status_code}'.", func_name=func_name
)
self.main_frame.show_info("Diff Not Applicable", msg)
self.main_frame.update_status_bar("Ready (Diff not applicable).")
return
# --- Open DiffViewerWindow ---
log_handler.log_debug(
f"Opening DiffViewerWindow for '{relative_path}' (Working Dir vs HEAD)",
func_name=func_name
)
status_final: str = "Ready." # Default status after closing viewer
try:
# Instantiate and display the modal DiffViewerWindow
DiffViewerWindow(
master=self.master, # Parent window
git_commands=self.git_commands, # Pass GitCommands instance
repo_path=svn_path,
relative_file_path=relative_path, # Use the cleaned path
ref1='WORKING_DIR', # Compare working directory...
ref2='HEAD' # ...against HEAD commit
)
# Code execution pauses here until the DiffViewerWindow is closed
log_handler.log_debug("Diff viewer window closed.", func_name=func_name)
status_final = "Ready."
except Exception as e:
# Handle errors opening the diff viewer window
log_handler.log_exception(
f"Error opening or running diff viewer: {e}", func_name=func_name
)
status_final = "Error: Failed to open diff viewer."
self.main_frame.show_error(
"Diff Viewer Error", f"Could not display diff:\n{e}"
)
finally:
# Update status bar after the window closes or an error occurs,
# but only if another async operation hasn't started in the meantime.
if hasattr(self, "main_frame") and not self.main_frame.status_bar_var.get().startswith("Processing"):
self.main_frame.update_status_bar(status_final)
def _handle_gitignore_save(self):
"""
Callback executed after .gitignore is saved successfully by the editor.
Starts an asynchronous task to check for and untrack files if necessary.
"""
func_name: str = "_handle_gitignore_save"
log_handler.log_info(
"Callback: .gitignore saved. Starting async untrack check.",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Untrack Check after Gitignore Save")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_error(
"Cannot start untrack check: Invalid/Not ready path.",
func_name=func_name
)
self.main_frame.update_status_bar(
"Error: Untrack check failed (invalid path)."
)
return
# Prepare args and start the untrack worker
args: tuple = (self.action_handler, svn_path)
self._start_async_operation(
worker_func=async_workers.run_untrack_async,
args_tuple=args,
context_dict={
"context": "_handle_gitignore_save", # Context identifies origin
"status_msg": "Checking files to untrack",
"committed_flag_possible": True # Untracking involves a commit
}
)
def add_selected_file(self, file_status_line: str):
""" Starts async operation to add a selected untracked file ('??') to staging. """
func_name: str = "add_selected_file"
log_handler.log_info(
f"--- Action Triggered: Add File '{file_status_line}' (Async) ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Add File")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Add file failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Add failed: Repo not ready.")
return
# Extract relative path from the status line (should start with '??')
relative_path: Optional[str] = None
try:
line: str = file_status_line.strip("\x00").strip()
# Only proceed if the status indicates an untracked file
if line.startswith("??"):
# Extract path after '?? ' handling potential quotes
path_raw: str = line[2:].lstrip()
if len(path_raw) >= 2 and path_raw.startswith('"') and path_raw.endswith('"'):
relative_path = path_raw[1:-1]
else:
relative_path = path_raw
else:
# Show error if trying to add a non-untracked file
log_handler.log_error(
f"Cannot add non-untracked file: {line}", func_name=func_name
)
self.main_frame.show_error(
"Invalid Action",
f"Cannot 'Add' file with status '{line[:2]}'.\nUse commit for modified/staged files."
)
self.main_frame.update_status_bar("Add failed: Not an untracked file.")
return
# Check if path extraction failed
if not relative_path:
raise ValueError("Extracted relative path is empty.")
except Exception as e:
# Handle errors parsing the path
log_handler.log_error(
f"Error parsing path for add from line '{file_status_line}': {e}",
func_name=func_name
)
self.main_frame.show_error(
"Parsing Error", f"Cannot parse file path from:\n{file_status_line}"
)
self.main_frame.update_status_bar("Add failed: Parse error.")
return
# Prepare args and start the add worker
args: tuple = (self.git_commands, svn_path, relative_path)
base_filename: str = os.path.basename(relative_path) # For status message
self._start_async_operation(
worker_func=async_workers.run_add_file_async,
args_tuple=args,
context_dict={
"context": "add_file",
"status_msg": f"Adding '{base_filename}'"
}
)
def create_tag(self):
""" Handles tag creation: shows dialog, suggests name, starts async operation. """
func_name: str = "create_tag"
log_handler.log_info(
f"--- Action Triggered: Create Tag ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Create Tag")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Create Tag failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Create Tag failed: Repo not ready.")
return
# Generate suggested tag name and show dialog
self.main_frame.update_status_bar("Processing: Generating tag suggestion...")
suggested_name: str = self._generate_next_tag_suggestion(svn_path)
self.main_frame.update_status_bar("Ready for tag input.")
dialog = CreateTagDialog(self.master, suggested_tag_name=suggested_name)
tag_info: Optional[tuple[str, str]] = dialog.result # (name, message) or None
# If user provided input, start async operation
if tag_info:
tag_name, tag_message = tag_info
log_handler.log_info(f"User provided tag: '{tag_name}'", func_name=func_name)
# Prepare args
args: tuple = (self.action_handler, svn_path, tag_name, tag_message)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_create_tag_async,
args_tuple=args,
context_dict={
"context": "create_tag",
"status_msg": f"Creating tag '{tag_name}'",
"committed_flag_possible": True # Annotated tag creates commit object
}
)
else:
# User cancelled the dialog
log_handler.log_info("Tag creation cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Cancelled.")
def checkout_tag(self):
""" Handles tag checkout: confirms with user, starts async operation. """
func_name: str = "checkout_tag"
log_handler.log_info(
f"--- Action Triggered: Checkout Tag ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Tag")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Checkout Tag failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Checkout Tag failed: Repo not ready.")
return
# Get selected tag from GUI listbox
tag_name: Optional[str] = self.main_frame.get_selected_tag()
if not tag_name:
# Show error if no tag is selected
self.main_frame.show_error(
"Selection Error", "No tag selected from the list."
)
self.main_frame.update_status_bar("Checkout failed: No tag selected.")
return
# Confirm with user due to 'detached HEAD' state implications
confirmation_message: str = (
f"Checkout tag '{tag_name}'?\n\n"
f"Warning: This will put your repository in a 'detached HEAD' state. "
f"You can look around, make experimental changes and commit them, "
f"but they won't belong to any branch. "
f"Use 'Checkout Branch' to return to a branch."
)
if not self.main_frame.ask_yes_no("Confirm Checkout Tag", confirmation_message):
# User cancelled
log_handler.log_info("Tag checkout cancelled by user.", func_name=func_name)
self.main_frame.update_status_bar("Cancelled.")
return
# Prepare args and start async operation
args: tuple = (self.action_handler, svn_path, tag_name)
self._start_async_operation(
worker_func=async_workers.run_checkout_tag_async,
args_tuple=args,
context_dict={
"context": "checkout_tag",
"status_msg": f"Checking out tag '{tag_name}'"
}
)
def create_branch(self):
""" Handles branch creation: shows dialog, starts async operation. """
func_name: str = "create_branch"
log_handler.log_info(
f"--- Action Triggered: Create Branch ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Create Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Create Branch failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Create Branch failed: Repo not ready.")
return
# Show dialog to get new branch name
self.main_frame.update_status_bar("Ready for branch name input.")
dialog = CreateBranchDialog(self.master)
branch_name: Optional[str] = dialog.result # Name or None if cancelled
# If user provided a name, start async operation
if branch_name:
log_handler.log_info(
f"User provided branch name: '{branch_name}'", func_name=func_name
)
# Prepare args
args: tuple = (self.action_handler, svn_path, branch_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_create_branch_async,
args_tuple=args,
context_dict={
"context": "create_branch",
"status_msg": f"Creating branch '{branch_name}'",
"new_branch_name": branch_name # Pass name for potential checkout later
}
)
else:
# User cancelled the dialog
log_handler.log_info("Branch creation cancelled.", func_name=func_name)
self.main_frame.update_status_bar("Cancelled.")
def checkout_branch(
self,
branch_to_checkout: Optional[str] = None,
repo_path_override: Optional[str] = None
):
"""
Handles checkout of an existing local branch.
Confirms if triggered by button, starts async operation.
Can be called directly with a branch name (e.g., after creation).
"""
func_name: str = "checkout_branch"
target_description: str = branch_to_checkout if branch_to_checkout else "Selected Branch"
log_handler.log_info(
f"--- Action Triggered: Checkout Branch (Target: {target_description}) ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
return
# Determine repository path (use override if provided)
svn_path: Optional[str] = repo_path_override or self._get_and_validate_svn_path("Checkout Branch")
# Validate repo path and readiness
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Checkout Branch failed: Repo not ready.", func_name=func_name
)
self.main_frame.show_error("Action Failed", "Repository is not ready.")
self.main_frame.update_status_bar("Checkout Branch failed: Repo not ready.")
return
# Determine target branch and if confirmation is needed
target_branch: Optional[str] = branch_to_checkout
needs_confirmation: bool = False
if not target_branch:
# If no branch name passed, get selection from GUI and require confirmation
target_branch = self.main_frame.get_selected_branch()
needs_confirmation = True
# Validate if a branch was determined
if not target_branch:
self.main_frame.show_error(
"Selection Error", "No branch selected from the list."
)
self.main_frame.update_status_bar("Checkout failed: No branch selected.")
return
# Ask for confirmation only if triggered by button press (not directly called)
if needs_confirmation:
if not self.main_frame.ask_yes_no(
"Confirm Checkout Branch", f"Switch to branch '{target_branch}'?"
):
log_handler.log_info(
"Branch checkout cancelled by user.", func_name=func_name
)
self.main_frame.update_status_bar("Cancelled.")
return
# Prepare args and start async operation
args: tuple = (self.action_handler, svn_path, target_branch)
self._start_async_operation(
worker_func=async_workers.run_checkout_branch_async,
args_tuple=args,
context_dict={
"context": "checkout_branch",
"status_msg": f"Checking out branch '{target_branch}'"
}
)
def delete_local_branch(self, branch_name: str, force: bool):
""" Handles the request to delete a local branch (with confirmation). """
func_name: str = "delete_local_branch"
action_description: str = "Force delete" if force else "Delete"
log_handler.log_info(
f"--- Action Triggered: {action_description} Local Branch '{branch_name}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path(f"{action_description} Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
f"{action_description} Branch skipped: Repo not ready.",
func_name=func_name
)
self.main_frame.update_status_bar(f"{action_description} failed: Repo not ready.")
return
# --- User Confirmation ---
confirm_message: str = f"Are you sure you want to delete the local branch '{branch_name}'?"
dialog_title: str = "Confirm Delete Branch"
ask_function: Callable = self.main_frame.ask_yes_no # Default confirmation
# Add warning for force delete
if force:
confirm_message += "\n\nWARNING: Force delete will discard any unmerged changes on this branch!"
dialog_title = "Confirm Force Delete Branch"
# Ask user for confirmation
if not ask_function(dialog_title, confirm_message):
log_handler.log_info(
"Local branch deletion cancelled by user.", func_name=func_name
)
self.main_frame.update_status_bar("Delete branch cancelled.")
return
# --- Start Async Worker ---
log_handler.log_info(
f"Starting {action_description.lower()} for local branch '{branch_name}'...",
func_name=func_name
)
# Prepare arguments
args: tuple = (self.action_handler, svn_path, branch_name, force)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_delete_local_branch_async,
args_tuple=args,
context_dict={
"context": "delete_local_branch",
"status_msg": f"{action_description} branch '{branch_name}'",
"branch_name": branch_name, # Pass name for result handling
"force": force # Pass force flag
}
)
def merge_local_branch(self, branch_to_merge: str):
""" Handles the request to merge a local branch into the current branch. """
func_name: str = "merge_local_branch"
log_handler.log_info(
f"--- Action Triggered: Merge Local Branch '{branch_to_merge}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path(f"Merge Branch '{branch_to_merge}'")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Merge Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.update_status_bar("Merge failed: Repo not ready.")
return
# Get current branch for validation and confirmation message
current_branch: Optional[str] = None
try:
current_branch = self.git_commands.get_current_branch_name(svn_path)
if not current_branch:
raise ValueError("Could not determine the current branch (Detached HEAD?).")
if current_branch == branch_to_merge:
raise ValueError("Cannot merge a branch into itself.")
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Merge aborted during pre-check: {e}", func_name=func_name)
self.main_frame.show_error("Merge Error", f"Cannot start merge:\n{e}")
self.main_frame.update_status_bar("Merge failed: Pre-check error.")
return
# --- User Confirmation ---
confirm_message: str = f"Merge branch '{branch_to_merge}' into current branch '{current_branch}'?"
# Default: allow fast-forward merges without forcing a merge commit
no_ff_option: bool = False
if not self.main_frame.ask_yes_no("Confirm Merge", confirm_message):
log_handler.log_info(
"Local branch merge cancelled by user.", func_name=func_name
)
self.main_frame.update_status_bar("Merge cancelled.")
return
# --- Start Async Worker ---
log_handler.log_info(
f"Starting merge of '{branch_to_merge}' into '{current_branch}'...",
func_name=func_name
)
# Prepare arguments (pass git_commands for worker's internal checks)
args: tuple = (
self.action_handler, self.git_commands, svn_path, branch_to_merge, no_ff_option
)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_merge_local_branch_async,
args_tuple=args,
context_dict={
"context": "merge_local_branch",
"status_msg": f"Merging '{branch_to_merge}' into '{current_branch}'",
"branch_merged_into": current_branch,
"branch_merged_from": branch_to_merge,
"repo_path": svn_path, # Needed for conflict message
}
)
def compare_branch_with_current(self, other_branch_ref: str):
""" Handles comparing a selected branch (local or remote) with the current branch. """
func_name: str = "compare_branch_with_current"
log_handler.log_info(
f"--- Action Triggered: Compare Branch '{other_branch_ref}' with Current ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path(f"Compare Branch '{other_branch_ref}'")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Compare Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.update_status_bar("Compare failed: Repo not ready.")
return
# Get current branch for comparison
current_branch: Optional[str] = None
try:
current_branch = self.git_commands.get_current_branch_name(svn_path)
if not current_branch:
raise ValueError("Cannot compare: Currently in detached HEAD state.")
# Prevent comparing a branch with itself
if current_branch == other_branch_ref:
log_handler.log_warning(
"Compare Branch skipped: Cannot compare a branch with itself.",
func_name=func_name
)
self.main_frame.show_info("Compare Info", "Cannot compare a branch with itself.")
return
except (GitCommandError, ValueError) as e:
log_handler.log_error(f"Compare aborted during pre-check: {e}", func_name=func_name)
self.main_frame.show_error("Compare Error", f"Cannot start compare:\n{e}")
return
# --- Start Async Worker ---
log_handler.log_info(
f"Starting comparison between '{current_branch}' and '{other_branch_ref}'...",
func_name=func_name
)
# Prepare arguments: ref1 is current, ref2 is the other branch
args: tuple = (self.git_commands, svn_path, current_branch, other_branch_ref)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_compare_branches_async,
args_tuple=args,
context_dict={
"context": "compare_branches",
"status_msg": f"Comparing '{current_branch}' vs '{other_branch_ref}'",
"ref1": current_branch, # Pass refs for summary window
"ref2": other_branch_ref,
"repo_path": svn_path, # Pass repo path for summary window
}
)
def view_commit_details(self, history_line_or_hash: str): # Nome parametro aggiornabile per chiarezza
"""
Callback triggered by clicking a line in the history view (now Treeview).
Extracts the commit hash (if needed, ma ora riceve solo l'hash)
and starts an async worker to fetch details.
"""
func_name: str = "view_commit_details"
commit_hash_short = history_line_or_hash.strip()
log_handler.log_info(
f"--- Action Triggered: View Commit Details for hash: '{commit_hash_short}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("View Commit Details")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"View Commit Details skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
return
# --- Validazione Hash (Opzionale ma consigliata) ---
if not commit_hash_short or not re.match(r"^[0-9a-fA-F]{7,}$", commit_hash_short):
log_handler.log_error(f"Invalid commit hash received: '{commit_hash_short}'", func_name=func_name)
self.main_frame.show_error("Input Error", f"Invalid commit hash format:\n{commit_hash_short}")
return
# --- Fine Validazione ---
# --- Start Async Worker to Get Commit Details ---
log_handler.log_info(
f"Fetching details for commit '{commit_hash_short}'...", func_name=func_name
)
# Prepare arguments for the worker
args: tuple = (self.git_commands, svn_path, commit_hash_short)
# Start the async operation
self._start_async_operation(
worker_func=async_workers.run_get_commit_details_async, # Worker corretto
args_tuple=args,
context_dict={
"context": "get_commit_details",
"status_msg": f"Loading details for commit {commit_hash_short}",
"commit_hash": commit_hash_short # Passa l'hash nel contesto
}
)
# --- Remote Action Launchers ---
def apply_remote_config(self):
""" Callback for 'Apply Config' button. Starts async worker. """
func_name: str = "apply_remote_config"
log_handler.log_info(
f"--- Action Triggered: Apply Remote Config ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot apply config: Main frame missing.", func_name=func_name
)
return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Apply Remote Config")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Apply config skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Apply config failed: Repo not ready.")
return
# Get remote URL and name from GUI
remote_url: str = self.main_frame.remote_url_var.get().strip()
remote_name: str = self.main_frame.remote_name_var.get().strip()
# Validate remote URL
if not remote_url:
log_handler.log_warning(
"Apply config failed: Remote URL is empty.", func_name=func_name
)
self.main_frame.show_error("Input Error", "Remote URL cannot be empty.")
self.main_frame.update_status_bar("Apply config failed: URL empty.")
return
# Use default remote name if empty
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
log_handler.log_info(
f"Remote name empty, using default: '{remote_name}'",
func_name=func_name
)
self.main_frame.remote_name_var.set(remote_name)
# Save profile settings BEFORE applying to Git config
if not self.save_profile_settings():
if not self.main_frame.ask_yes_no(
"Warning",
"Could not save profile settings.\nContinue applying remote config anyway?"
):
self.main_frame.update_status_bar(
"Apply config cancelled (profile save failed)."
)
return
# Prepare args and start async operation
args: tuple = (self.remote_action_handler, svn_path, remote_name, remote_url)
self._start_async_operation(
worker_func=async_workers.run_apply_remote_config_async,
args_tuple=args,
context_dict={
"context": "apply_remote_config",
"status_msg": f"Applying config for remote '{remote_name}'"
}
)
def check_connection_auth(self):
""" Callback for 'Check Connection & Auth' button. """
func_name: str = "check_connection_auth"
log_handler.log_info(
f"--- Action Triggered: Check Connection & Auth ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Check Connection")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Check Connection skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
# Reset auth status indicator if repo not ready
self._update_gui_auth_status("unknown")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
log_handler.log_info(
f"Checking connection/auth for remote '{remote_name}'...",
func_name=func_name
)
# Update GUI indicator to 'checking' state
self._update_gui_auth_status("checking")
# Prepare args and start async operation
args: tuple = (self.git_commands, svn_path, remote_name)
self._start_async_operation(
worker_func=async_workers.run_check_connection_async,
args_tuple=args,
context_dict={
"context": "check_connection",
"status_msg": f"Checking remote '{remote_name}'",
"remote_name_checked": remote_name, # Pass context for result handler
"repo_path_checked": svn_path,
}
)
def fetch_remote(self):
""" Starts the asynchronous 'git fetch' operation. """
func_name: str = "fetch_remote"
log_handler.log_info(
f"--- Action Triggered: Fetch Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Fetch Remote")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Fetch Remote skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Fetch failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Optional: Check auth status before fetching (consider if needed)
# if self.remote_auth_status != 'ok': ... (warning/confirmation) ...
log_handler.log_info(
f"Starting fetch for remote '{remote_name}'...", func_name=func_name
)
# Prepare args and start async operation
args: tuple = (self.remote_action_handler, svn_path, remote_name)
self._start_async_operation(
worker_func=async_workers.run_fetch_remote_async,
args_tuple=args,
context_dict={
"context": "fetch_remote",
"status_msg": f"Fetching from remote '{remote_name}'"
}
)
def pull_remote(self):
""" Starts the asynchronous 'git pull' operation for the current branch. """
func_name: str = "pull_remote"
log_handler.log_info(
f"--- Action Triggered: Pull Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Pull Remote")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Pull Remote skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Pull failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Check authentication/connection status before attempting pull
if self.remote_auth_status != "ok":
auth_msg: str = f"Cannot Pull from '{remote_name}':\n"
if self.remote_auth_status == "required": auth_msg += "Authentication is required. Use 'Check Connection / Auth' first."
elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed. Use 'Check Connection / Auth' to retry."
elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed. Check URL and network."
else: auth_msg += "Connection status is unknown or in error. Use 'Check Connection / Auth' first."
log_handler.log_warning(f"Pull Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.", func_name=func_name)
self.main_frame.show_warning("Action Blocked", auth_msg)
self.main_frame.update_status_bar(f"Pull failed: {self.remote_auth_status}")
return
# Worker will get current branch name internally
log_handler.log_info(
f"Starting pull for remote '{remote_name}'...", func_name=func_name
)
# Prepare args (pass GitCommands for the worker to get current branch)
args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_pull_remote_async,
args_tuple=args,
context_dict={
"context": "pull_remote",
"status_msg": f"Pulling from remote '{remote_name}'",
"repo_path": svn_path, # Pass context for conflict messages
"remote_name": remote_name,
}
)
def push_remote(self):
""" Starts the asynchronous 'git push' operation for the current branch. """
func_name: str = "push_remote"
log_handler.log_info(
f"--- Action Triggered: Push Branch to Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Push Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Push Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Push failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Check authentication/connection status before attempting push
if self.remote_auth_status != "ok":
auth_msg: str = f"Cannot Push to '{remote_name}':\n"
# ... (Build specific auth message as in pull_remote) ...
if self.remote_auth_status == "required": auth_msg += "Authentication is required..." # Shortened for brevity
elif self.remote_auth_status == "failed": auth_msg += "Authentication previously failed..."
elif self.remote_auth_status == "connection_failed": auth_msg += "Connection previously failed..."
else: auth_msg += "Connection status is unknown or in error..."
log_handler.log_warning(
f"Push Remote skipped: Auth/Connection status is '{self.remote_auth_status}'.",
func_name=func_name
)
self.main_frame.show_warning("Action Blocked", auth_msg)
self.main_frame.update_status_bar(f"Push failed: {self.remote_auth_status}")
return
# Optional: Check for uncommitted changes before push
try:
if self.git_commands.git_status_has_changes(svn_path):
if not self.main_frame.ask_yes_no(
"Uncommitted Changes",
"There are uncommitted changes in your working directory.\n"
"Push anyway? (Only committed changes will be pushed)"
):
self.main_frame.update_status_bar(
"Push cancelled by user (uncommitted changes)."
)
return
except GitCommandError as status_err:
# Handle error during status check
log_handler.log_error(
f"Push aborted: Failed to check repository status before push: {status_err}",
func_name=func_name
)
self.main_frame.show_error(
"Status Error", f"Could not check repo status:\n{status_err}"
)
return
log_handler.log_info(
f"Starting push for current branch to remote '{remote_name}'...",
func_name=func_name
)
# Worker will get current branch name
# Prepare args (pass GitCommands for the worker)
args: tuple = (self.remote_action_handler, self.git_commands, svn_path, remote_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_push_remote_async,
args_tuple=args,
context_dict={
"context": "push_remote",
"status_msg": f"Pushing current branch to remote '{remote_name}'",
"remote_name": remote_name, # Pass context for result messages
}
)
def push_tags_remote(self):
""" Starts the asynchronous 'git push --tags' operation. """
func_name: str = "push_tags_remote"
log_handler.log_info(
f"--- Action Triggered: Push Tags to Remote ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Push Tags")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Push Tags skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
self.main_frame.update_status_bar("Push tags failed: Repo not ready.")
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
# Check authentication/connection status
if self.remote_auth_status != "ok":
auth_msg: str = f"Cannot Push Tags to '{remote_name}':\n"
# ... (Build specific auth message as in push_remote) ...
log_handler.log_warning(
f"Push Tags skipped: Auth/Connection status is '{self.remote_auth_status}'.",
func_name=func_name
)
self.main_frame.show_warning("Action Blocked", auth_msg)
self.main_frame.update_status_bar(f"Push tags failed: {self.remote_auth_status}")
return
# Confirm with user before pushing all local tags
if not self.main_frame.ask_yes_no(
"Confirm Push Tags",
f"Push all local tags to remote '{remote_name}'?\n"
f"(Existing tags on the remote with the same name will "
f"NOT be overwritten unless forced, which this action does not do)."
):
self.main_frame.update_status_bar("Push tags cancelled by user.")
return
log_handler.log_info(
f"Starting push tags to remote '{remote_name}'...", func_name=func_name
)
# Prepare args
args: tuple = (self.remote_action_handler, svn_path, remote_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_push_tags_async,
args_tuple=args,
context_dict={
"context": "push_tags_remote",
"status_msg": f"Pushing tags to remote '{remote_name}'",
"remote_name": remote_name, # Pass context for messages
}
)
def clone_remote_repo(self):
""" Handles the 'Clone from Remote...' action: shows dialog, validates, starts worker. """
func_name: str = "clone_remote_repo"
log_handler.log_info(
f"--- Action Triggered: Clone Remote Repository ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists():
log_handler.log_error(
"Cannot start clone: Main frame not available.", func_name=func_name
)
return
# Show modal dialog to get clone parameters
dialog = CloneFromRemoteDialog(self.master)
# Result is None if cancelled, or (url, parent_dir, profile_name_input)
dialog_result: Optional[tuple[str, str, str]] = dialog.result
if not dialog_result:
log_handler.log_info(
"Clone operation cancelled by user in dialog.", func_name=func_name
)
self.main_frame.update_status_bar("Clone cancelled.")
return
# Extract data from dialog result
remote_url, local_parent_dir, profile_name_input = dialog_result
# --- Derive target directory and profile name, validate paths ---
final_profile_name: str = ""
target_clone_dir: str = ""
repo_name_from_url: str = ""
try:
# Derive repo name from URL (remove .git suffix)
repo_name_from_url = os.path.basename(remote_url)
if repo_name_from_url.endswith(".git"):
repo_name_from_url = repo_name_from_url[:-4]
if not repo_name_from_url:
raise ValueError("Could not derive repository name from URL.")
# Construct full target path for the clone
target_clone_dir = os.path.join(local_parent_dir, repo_name_from_url)
# Normalize the path
target_clone_dir = os.path.abspath(target_clone_dir)
# Determine final profile name (use input or derive from repo name)
if profile_name_input:
final_profile_name = profile_name_input
# Check if proposed profile name already exists
if final_profile_name in self.config_manager.get_profile_sections():
raise ValueError(
f"Profile name '{final_profile_name}' already exists. "
f"Please choose a different name."
)
else:
# Use repo name as base, add counter if it exists
final_profile_name = repo_name_from_url
counter: int = 1
while final_profile_name in self.config_manager.get_profile_sections():
final_profile_name = f"{repo_name_from_url}_{counter}"
counter += 1
log_handler.log_debug(
f"Derived target clone directory: {target_clone_dir}",
func_name=func_name
)
log_handler.log_debug(
f"Determined profile name: {final_profile_name}",
func_name=func_name
)
# --- CRITICAL CHECK: Target directory must NOT exist ---
if os.path.exists(target_clone_dir):
error_msg: str = (
f"Clone failed: Target directory already exists:\n{target_clone_dir}\n"
f"Please choose a different parent directory or ensure the target is clear."
)
log_handler.log_error(error_msg, func_name=func_name)
self.main_frame.show_error("Clone Path Error", error_msg)
self.main_frame.update_status_bar("Clone failed: Target directory exists.")
return # Stop the operation
except ValueError as ve:
# Handle errors deriving names or validating profile name
log_handler.log_error(
f"Clone configuration error: {ve}", func_name=func_name
)
self.main_frame.show_error("Configuration Error", str(ve))
self.main_frame.update_status_bar("Clone failed: Configuration error.")
return
except Exception as e:
# Handle unexpected errors during preparation
log_handler.log_exception(
f"Unexpected error preparing for clone: {e}", func_name=func_name
)
self.main_frame.show_error(
"Internal Error", f"An unexpected error occurred:\n{e}"
)
self.main_frame.update_status_bar("Clone failed: Internal error.")
return
# --- Start Asynchronous Worker ---
log_handler.log_info(
f"Starting clone for '{remote_url}' into '{target_clone_dir}'...",
func_name=func_name
)
# Prepare arguments
args: tuple = (self.git_commands, remote_url, target_clone_dir, final_profile_name)
# Start async operation
self._start_async_operation(
worker_func=async_workers.run_clone_remote_async,
args_tuple=args,
context_dict={
"context": "clone_remote",
"status_msg": f"Cloning '{repo_name_from_url}'...", # Use repo name
"clone_success_data": { # Pass data needed for profile creation
"profile_name": final_profile_name,
"cloned_path": target_clone_dir,
"remote_url": remote_url,
}
}
)
def refresh_remote_branches(self):
""" Starts the asynchronous refresh of the remote branches list. """
func_name: str = "refresh_remote_branches"
log_handler.log_info(
f"--- Action Triggered: Refresh Remote Branches List ---", func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Refresh Remote Branches")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Refresh Remote Branches skipped: Repo not ready.", func_name=func_name
)
self._update_gui_for_not_ready_state() # Update GUI lists
return
# Get remote name, using default if empty
remote_name: str = self.main_frame.remote_name_var.get().strip()
if not remote_name:
remote_name = DEFAULT_REMOTE_NAME
self.main_frame.remote_name_var.set(remote_name)
log_handler.log_info(
f"Starting refresh of remote branches list for '{remote_name}'...",
func_name=func_name
)
# Update GUI list to show "Loading..."
if hasattr(self.main_frame, "update_remote_branches_list"):
self.main_frame.update_remote_branches_list(["(Loading...)"])
# Prepare args and start async operation
args: tuple = (self.git_commands, svn_path, remote_name)
self._start_async_operation(
worker_func=async_workers.run_refresh_remote_branches_async,
args_tuple=args,
context_dict={
"context": "refresh_remote_branches",
"status_msg": f"Refreshing remote branches for '{remote_name}'"
}
)
def checkout_remote_branch_as_local(
self, remote_branch_full: str, local_branch_suggestion: str
):
"""
Handles checkout of a remote branch as a new local tracking branch.
Checks if local branch exists, confirms if needed, starts async worker.
"""
func_name: str = "checkout_remote_branch_as_local"
log_handler.log_info(
f"--- Action Triggered: Checkout Remote Branch '{remote_branch_full}' as Local '{local_branch_suggestion}' ---",
func_name=func_name
)
# Ensure main frame exists
if not hasattr(self, "main_frame") or not self.main_frame.winfo_exists(): return
# Validate repo path and readiness
svn_path: Optional[str] = self._get_and_validate_svn_path("Checkout Remote Branch")
if not svn_path or not self._is_repo_ready(svn_path):
log_handler.log_warning(
"Checkout Remote Branch skipped: Repo not ready.", func_name=func_name
)
self.main_frame.show_error(
"Action Failed", "Repository path is not valid or not prepared."
)
# No status bar update here as it's usually from a menu click
return
try:
# Check if a local branch with the suggested name already exists
local_branches: list[str]
current: Optional[str]
local_branches, current = self.git_commands.list_branches(svn_path)
if local_branch_suggestion in local_branches:
# If it exists, ask user if they want to checkout the existing one instead
log_handler.log_warning(
f"Local branch '{local_branch_suggestion}' already exists.",
func_name=func_name
)
if self.main_frame.ask_yes_no(
"Branch Exists",
f"A local branch named '{local_branch_suggestion}' already exists.\n\n"
f"Do you want to check out the existing local branch instead?"
):
# User wants to checkout existing local branch
log_handler.log_info(
f"User chose to checkout existing local branch '{local_branch_suggestion}'.",
func_name=func_name
)
# Call the standard checkout function, passing the path override
self.checkout_branch(
branch_to_checkout=local_branch_suggestion,
repo_path_override=svn_path
)
else:
# User cancelled
log_handler.log_info(
"Checkout cancelled because local branch exists.",
func_name=func_name
)
self.main_frame.update_status_bar("Checkout cancelled.")
# Exit in either case (another async task started or cancelled)
return
# If local branch doesn't exist, proceed to create tracking branch
log_handler.log_info(
f"Starting checkout of '{remote_branch_full}' as new local branch '{local_branch_suggestion}'...",
func_name=func_name
)
# Prepare args and start async operation
args: tuple = (
self.action_handler,
svn_path,
local_branch_suggestion,
remote_branch_full,
)
self._start_async_operation(
worker_func=async_workers.run_checkout_tracking_branch_async,
args_tuple=args,
context_dict={
"context": "checkout_tracking_branch",
"status_msg": f"Checking out '{local_branch_suggestion}' tracking '{remote_branch_full}'"
}
)
except Exception as e:
# Handle errors during local branch check or starting the worker
log_handler.log_exception(
f"Error preparing for tracking branch checkout: {e}",
func_name=func_name
)
if hasattr(self, "main_frame"):
self.main_frame.show_error(
"Checkout Error", f"Could not start checkout operation:\n{e}"
)
self.main_frame.update_status_bar("Checkout failed: Internal error.")
# --- Simplified Queue Checking Method (Refactored) ---
def _check_completion_queue(self, results_queue: queue.Queue, context: dict):
"""
Checks result queue, updates status bar briefly, delegates processing.
"""
task_context: str = context.get('context', 'unknown')
func_name: str = "_check_completion_queue"
try:
# Get result without blocking
result_data: Dict[str, Any] = results_queue.get_nowait()
log_handler.log_info(
f"Result received for '{task_context}'. Status: {result_data.get('status')}",
func_name=func_name
)
# Determine if widgets should be re-enabled immediately
should_reenable_now: bool = self._should_reenable_widgets_now(
task_context, result_data.get('status')
)
if not should_reenable_now:
log_handler.log_debug(
f"Postponing widget re-enable for context: {task_context}",
func_name=func_name
)
# Re-enable widgets now if appropriate
if should_reenable_now:
self._reenable_widgets_if_ready()
# Update status bar with generic message (may be overridden by handler)
self._update_status_bar_from_result(task_context, result_data)
# Process the result using the dedicated handler
self._process_result_with_handler(result_data, context)
except queue.Empty:
# Queue empty, reschedule check
self._reschedule_queue_check(results_queue, context)
except Exception as e:
# Critical error during queue check or basic handling
self._handle_queue_check_error(e, task_context)
# --- Helper methods for _check_completion_queue ---
def _should_reenable_widgets_now(self, task_context: str, status: Optional[str]) -> bool:
""" Determines if GUI widgets should be re-enabled immediately. """
# Don't re-enable immediately if waiting for user interaction or follow-up task
if task_context == "check_connection" and status == 'auth_required': return False
if task_context == "interactive_auth" and status == 'success': return False
if task_context == 'clone_remote' and status == 'success': return False
if task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success': return False
if task_context in ['pull_remote', 'merge_local_branch'] and status == 'conflict': return False
if task_context == 'compare_branches' and status == 'success': return False
# Default: Re-enable immediately
return True
def _reenable_widgets_if_ready(self):
""" Safely re-enables action widgets if the main frame exists. """
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
log_handler.log_debug(
"Re-enabling widgets now.", func_name="_reenable_widgets_if_ready"
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
else:
log_handler.log_warning(
"Cannot re-enable widgets, MainFrame missing.",
func_name="_reenable_widgets_if_ready"
)
def _update_status_bar_from_result(self, task_context: str, result_data: dict):
""" Updates status bar based on result, unless specific conditions apply. """
status: Optional[str] = result_data.get('status')
message: str = result_data.get('message', "Operation finished.")
# Conditions where status bar update is skipped or handled differently
skip_update: bool = False
if (task_context == 'clone_remote' and status == 'success'): skip_update = True
if (task_context in ['checkout_tracking_branch', 'checkout_branch', 'checkout_tag'] and status == 'success'): skip_update = True
if (task_context == 'compare_branches' and status == 'success'): skip_update = True
if status in ['conflict', 'rejected']: skip_update = True
# Update status bar if not skipped and main frame exists
if not skip_update and hasattr(self, "main_frame") and self.main_frame.winfo_exists():
status_color: Optional[str] = None
reset_duration: int = 5000 # Default reset time
# Determine color and duration based on status
if status == 'success': status_color = self.main_frame.STATUS_GREEN
elif status == 'warning': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 7000
elif status == 'auth_required': status_color = self.main_frame.STATUS_YELLOW; reset_duration = 15000
elif status == 'conflict': status_color = self.main_frame.STATUS_RED; reset_duration = 15000
elif status == 'rejected': status_color = self.main_frame.STATUS_RED; reset_duration = 15000
elif status == 'error': status_color = self.main_frame.STATUS_RED; reset_duration = 10000
# Call the GUI update method
self.main_frame.update_status_bar(
message, bg_color=status_color, duration_ms=reset_duration
)
def _process_result_with_handler(self, result_data: dict, context: dict):
""" Instantiates and calls the AsyncResultHandler to process the result. """
task_context: str = context.get('context', 'unknown')
func_name: str = "_process_result_with_handler"
try:
# Create handler instance, passing the current app instance
result_handler = AsyncResultHandler(self)
# Delegate detailed processing
result_handler.process(result_data, context)
log_handler.log_debug(
f"Result processing delegated to handler for context '{task_context}'.",
func_name=func_name
)
except Exception as handler_e:
# Handle errors occurring *within* the result handler
log_handler.log_exception(
f"Error during result processing by handler for {task_context}: {handler_e}",
func_name=func_name
)
# Show error to user
if hasattr(self, "main_frame"):
self.main_frame.show_error(
"Processing Error", f"Failed to handle task result:\n{handler_e}"
)
# Ensure widgets are re-enabled if handler fails unexpectedly
if not self._should_reenable_widgets_now(task_context, result_data.get('status')) and \
hasattr(self.main_frame, "winfo_exists") and self.main_frame.winfo_exists():
log_handler.log_warning(
"Re-enabling widgets after handler error.", func_name=func_name
)
self.main_frame.set_action_widgets_state(tk.NORMAL)
def _reschedule_queue_check(self, results_queue: queue.Queue, context: dict):
""" Reschedules the check for the completion queue if the app is running. """
if hasattr(self, "master") and self.master.winfo_exists():
self.master.after(
self.ASYNC_QUEUE_CHECK_INTERVAL_MS,
self._check_completion_queue,
results_queue,
context
)
def _handle_queue_check_error(self, error: Exception, task_context: str):
""" Handles critical errors during the queue check process itself. """
func_name: str = "_handle_queue_check_error"
log_handler.log_exception(
f"Critical error checking completion queue for {task_context}: {error}",
func_name=func_name
)
try: # Attempt GUI recovery
if hasattr(self, "main_frame") and self.main_frame.winfo_exists():
# Re-enable widgets
self.main_frame.set_action_widgets_state(tk.NORMAL)
# Show error in status bar
self.main_frame.update_status_bar(
"Error processing async result.",
bg_color=self.main_frame.STATUS_RED,
duration_ms=10000
)
# Reset other relevant GUI states (e.g., sync status)
self._update_gui_for_status_error()
except Exception as recovery_e:
# Log error during recovery attempt
log_handler.log_error(
f"Failed to recover GUI after queue processing error: {recovery_e}",
func_name=func_name
)
# --- Helper per Suggestion Tag (Metodo interno) ---
def _generate_next_tag_suggestion(self, svn_path: str) -> str:
""" Generates a suggested tag name based on the latest v.X.X.X.X tag. """
func_name: str = "_generate_next_tag_suggestion"
log_handler.log_debug("Generating next tag suggestion...", func_name=func_name)
default_suggestion: str = "v.0.0.0.1"
latest_valid_tag: Optional[str] = None
# Regex to match tags like v.1.2.3.4
tag_pattern = re.compile(r"^v\.(\d+)\.(\d+)\.(\d+)\.(\d+)$")
try:
# Get existing tags sorted by creation date (newest first)
tags_data: list[tuple[str, str]] = self.git_commands.list_tags(svn_path)
if not tags_data:
log_handler.log_debug(
"No existing tags found. Suggesting default.", func_name=func_name
)
return default_suggestion
# Find the latest tag matching the pattern
for tag_name, _ in tags_data:
match = tag_pattern.match(tag_name)
if match:
latest_valid_tag = tag_name
log_handler.log_debug(
f"Found latest tag matching pattern: {latest_valid_tag}",
func_name=func_name
)
break # Found the newest matching tag
# If no matching tag found, return default
if not latest_valid_tag:
log_handler.log_debug(
"No tags matched the pattern v.X.X.X.X. Suggesting default.",
func_name=func_name
)
return default_suggestion
# Extract version numbers from the latest tag
match = tag_pattern.match(latest_valid_tag)
# This check should ideally not fail if latest_valid_tag was set correctly
if not match:
log_handler.log_error(
f"Internal error: Could not re-match tag {latest_valid_tag}",
func_name=func_name
)
return default_suggestion
# Increment the version numbers (with carry-over logic)
v1, v2, v3, v4 = map(int, match.groups())
limit: int = 99 # Assuming max 99 for each part
v4 += 1
if v4 > limit:
v4 = 0
v3 += 1
if v3 > limit:
v3 = 0
v2 += 1
if v2 > limit:
v2 = 0
v1 += 1
# Handle potential overflow of v1 if needed, or just let it increase
# Format the next suggested tag name
next_tag: str = f"v.{v1}.{v2}.{v3}.{v4}"
log_handler.log_debug(
f"Generated suggestion: {next_tag}", func_name=func_name
)
return next_tag
except Exception as e:
# Log errors during suggestion generation and return default
log_handler.log_exception(
f"Error generating tag suggestion: {e}", func_name=func_name
)
return default_suggestion
# --- Application Entry Point ---
def main():
""" Main function to initialize and run the Tkinter application. """
root: Optional[tk.Tk] = None
app: Optional[GitSvnSyncApp] = None
try:
# Create the main Tkinter window
print("Creating Tkinter root window...")
root = tk.Tk()
# Set minimum window size
root.minsize(850, 750)
print("Tkinter root window created.")
# Initialize the main application controller
print("Initializing GitSvnSyncApp...")
app = GitSvnSyncApp(root)
print("GitSvnSyncApp initialization attempt complete.")
# Start the Tkinter event loop only if GUI initialization was successful
if hasattr(app, "main_frame") and app.main_frame and app.main_frame.winfo_exists():
print("Starting Tkinter main event loop.")
root.mainloop()
print("Tkinter main event loop finished.")
else:
# If GUI init failed, an error was likely shown already
print(
"CRITICAL: App init failed before mainloop could start. Exiting.",
file=sys.stderr
)
# Ensure root window is destroyed if it still exists
if root and root.winfo_exists():
try:
root.destroy()
except Exception:
pass # Ignore errors during final cleanup
except Exception as e:
# Catch unexpected errors during startup or runtime
print(f"FATAL error during application execution: {e}", file=sys.stderr)
traceback.print_exc() # Print detailed traceback
# Attempt to show a final error message in a popup
try:
parent_window = root if root and root.winfo_exists() else None
messagebox.showerror(
"Fatal Application Error",
f"Application failed unexpectedly:\n{e}",
parent=parent_window
)
except Exception as msg_e:
# Fallback if even the error message fails
print(f"FATAL (GUI error message failed: {msg_e}):\n{e}", file=sys.stderr)
finally:
# Log application exit
print("Application exiting.")
# Standard Python entry point check
if __name__ == "__main__":
main()
# --- END OF FILE GitUtility.py ---