SXXXXXXX_ProjectUtility/projectutility/gui/main_window.py
2025-12-01 08:29:48 +01:00

2836 lines
120 KiB
Python

# projectutility/gui/main_window.py
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext, filedialog
import logging
import queue
import threading
import os
import sys
import json
import dataclasses # Although ToolInfo/Parameter are imported, might need for other potential uses
from typing import Dict, Any, Optional, List, Tuple, Callable
from functools import partial # Useful for callbacks with arguments
# --- Robust Imports & Feature Flags ---
# Determine Project Root and ensure it's discoverable
_project_root = None
_app_source_root = None
try:
_gui_dir = os.path.dirname(os.path.abspath(__file__)) # .../projectutility/gui
_app_source_root = os.path.dirname(_gui_dir) # .../projectutility
_project_root = os.path.dirname(_app_source_root) # .../ProjectUtility (Repo Root)
# Ensure the source root's parent (repo root) is in sys.path for absolute imports
# This might be redundant if running with `python -m`, but adds robustness
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
logging.getLogger(__name__).debug(
f"Added repository root to sys.path: {_project_root}"
)
# Define constants early, they might be needed by other imports/modules if structured differently
APP_ROOT_DIR = _project_root # Repository Root
CONFIG_DIR = os.path.join(
_app_source_root, "config"
) # Config dir inside the package
STATE_FILENAME = os.path.join(CONFIG_DIR, "tool_state.json") # State file location
logging.getLogger(__name__).debug(
f"MainWindow: Repo Root (APP_ROOT_DIR): {APP_ROOT_DIR}"
)
logging.getLogger(__name__).debug(
f"MainWindow: Config Dir (CONFIG_DIR): {CONFIG_DIR}"
)
logging.getLogger(__name__).debug(f"MainWindow: State File: {STATE_FILENAME}")
except Exception as e:
logging.getLogger(__name__).critical(
f"CRITICAL ERROR: Failed to determine project structure paths in MainWindow: {e}",
exc_info=True,
)
# Use fallback paths - application state/config likely won't work
APP_ROOT_DIR = os.getcwd()
CONFIG_DIR = os.path.join(APP_ROOT_DIR, "config") # Fallback
STATE_FILENAME = os.path.join(CONFIG_DIR, "tool_state.json")
# Consider raising the error or exiting if paths are critical
# raise RuntimeError("Could not determine project structure for MainWindow.") from e
# --- Import Core Modules (Models, Discovery, Registry) ---
# These provide the data structures and logic for managing tools.
CORE_MODULES_LOADED = False
TOOL_DISCOVERY_ENABLED = False
try:
# Absolute imports from the projectutility package
from projectutility.core.models import ToolInfo, ToolParameter
from projectutility.core.tool_discovery import discover_tools
from projectutility.core.registry_models import ToolRegistryEntry
from projectutility.core import registry_manager # Import the module itself
CORE_MODULES_LOADED = True
TOOL_DISCOVERY_ENABLED = True # Discovery depends on all core modules
logging.getLogger(__name__).info(
"Successfully imported core modules (models, discovery, registry)."
)
except ImportError as e:
logging.getLogger(__name__).critical(
f"Failed to import core modules: {e}. Tool discovery and management will be unavailable.",
exc_info=True,
)
# Define dummies
from collections import namedtuple
ToolParameter = namedtuple(
"ToolParameter",
["name", "label", "type", "required", "default", "description", "options"],
)
ToolInfo = namedtuple(
"ToolInfo",
[
"id",
"display_name",
"description",
"command",
"working_dir",
"parameters",
"version",
"has_gui",
],
defaults=[[], None, False],
)
@dataclasses.dataclass
class ToolRegistryEntry:
pass
def discover_tools():
return {}
class registry_manager:
@staticmethod
def load_registry():
return []
@staticmethod
def get_tool_config(tid, reg):
return None
@staticmethod
def save_registry(reg):
return False
except Exception as e:
logging.getLogger(__name__).critical(
f"Unexpected error importing core modules: {e}", exc_info=True
)
# Define dummies again... (omitted for brevity)
GIT_ENABLED = False
GIT_STATUS_ERROR = "Error"
GIT_STATUS_GITPYTHON_MISSING = "GitPython Missing"
GIT_STATUS_CHECKING="Checking..."
# Add other status constants with default/dummy values
GIT_STATUS_UP_TO_DATE = "Up-to-date (N/A)"
GIT_STATUS_BEHIND = "Behind (N/A)"
# ... (altre definizioni dummy di default) ...
GIT_STATUS_UPDATING = "Updating... (N/A)"
try:
# Absolute import
from projectutility.core import git_manager
GIT_ENABLED = git_manager.GITPYTHON_AVAILABLE
if GIT_ENABLED:
# QUESTO BLOCCO IMPORTA LE COSTANTI REALI DA git_manager
from projectutility.core.git_manager import (
GIT_STATUS_UP_TO_DATE, GIT_STATUS_BEHIND, GIT_STATUS_AHEAD,
GIT_STATUS_DIVERGED, GIT_STATUS_NOT_CLONED, GIT_STATUS_ERROR,
GIT_STATUS_CHECKING, GIT_STATUS_UPDATING, GIT_STATUS_GITPYTHON_MISSING
)
logging.getLogger(__name__).info("Successfully imported git_manager (GitPython available).")
else:
# LE COSTANTI MANTENGONO I VALORI DUMMY DEFINITI PRIMA DEL TRY
logging.getLogger(__name__).warning("Imported git_manager, but GitPython library not found. Git features disabled.")
# Constants keep their default dummy values defined above
except ImportError as e:
# QUESTO BLOCCO VIENE ESEGUITO SE L'IMPORT DI git_manager FALLISCE
logging.getLogger(__name__).error(
f"Failed to import core.git_manager: {e}. Git features disabled.",
exc_info=True
)
# Ensure flag is False and constants remain dummies
GIT_ENABLED = False
# Define dummy class if import failed completely
class git_manager:
GITPYTHON_AVAILABLE = False
@staticmethod
def get_local_repo_path(*a, **kw):
return ""
@staticmethod
def ensure_repository_cloned(*a, **kw):
return None, GIT_STATUS_GITPYTHON_MISSING
@staticmethod
def update_repository(*a, **kw):
return False, GIT_STATUS_GITPYTHON_MISSING
@staticmethod
def get_repository_status(*a, **kw):
return {
"status": GIT_STATUS_GITPYTHON_MISSING,
"message": "GitPython missing",
}
except Exception as e:
logging.getLogger(__name__).error(
f"Unexpected error importing git_manager: {e}. Git features disabled.",
exc_info=True
)
GIT_ENABLED = False
# --- Import Process Worker ---
# Executes external tool processes.
PROCESS_WORKER_ENABLED = False
try:
# Relative import from the same 'gui' package
from .process_worker import ProcessWorker
PROCESS_WORKER_ENABLED = True
logging.getLogger(__name__).info("Successfully imported ProcessWorker.")
except ImportError as e:
logging.getLogger(__name__).critical(
f"Failed to import gui.process_worker: {e}. Tool execution will be disabled.",
exc_info=True,
)
# Define dummy class
class ProcessWorker:
def __init__(self, *a, **kw):
self._process = None
def run(self):
pass
def terminate(self):
pass
class ToolInfo: # Nested dummy needed if accessed directly
display_name = "Dummy"
has_gui = False
parameters = []
tool_info = ToolInfo()
except Exception as e:
logging.getLogger(__name__).critical(
f"Unexpected error importing ProcessWorker: {e}", exc_info=True
)
PROCESS_WORKER_ENABLED = False
# --- Import Config Window ---
# Provides the configuration dialog for tools.
CONFIG_WINDOW_ENABLED = False
try:
# Relative import from the same 'gui' package
from .config_window import ConfigWindow
CONFIG_WINDOW_ENABLED = True
logging.getLogger(__name__).info("Successfully imported ConfigWindow.")
except ImportError as e:
logging.getLogger(__name__).error(
f"Failed to import gui.config_window: {e}. Tool configuration dialog unavailable.",
exc_info=True,
)
# Define dummy class
class ConfigWindow:
def __init__(self, *args, **kwargs):
logging.error("ConfigWindow class is unavailable due to import error!")
# Maybe show a messagebox immediately? No, let the calling code handle it.
except Exception as e:
logging.getLogger(__name__).error(
f"Unexpected error importing ConfigWindow: {e}", exc_info=True
)
CONFIG_WINDOW_ENABLED = False
# Define dummy class again
# --- Global Constants for MainWindow ---
MAX_OUTPUT_LINES = 1000 # Max lines to keep in the output text widget
# Constants for GUI Queue Message Types (used internally in MainWindow)
MSG_TYPE_WORKER_STDOUT = "stdout"
MSG_TYPE_WORKER_STDERR = "stderr"
MSG_TYPE_WORKER_STATUS = "status" # Internal status from worker (not process output)
MSG_TYPE_WORKER_FINISHED = "finished"
MSG_TYPE_GIT_STATUS = "git_status" # Status messages from git background tasks
MSG_TYPE_GIT_PROGRESS = "git_progress" # Progress messages (e.g., during clone/fetch)
MSG_TYPE_GIT_ERROR = "git_error" # Specific error messages from git tasks
MSG_TYPE_GIT_SINGLE_RESULT = "git_single_result" # Result of a single tool update
MSG_TYPE_GIT_ALL_FINISHED = "git_all_finished" # Signal that batch update is done
# --- Import Version Info FOR THE WRAPPER ITSELF ---
try:
# Use absolute import based on package name
from projectutility import _version as wrapper_version
WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})"
WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}"
except ImportError:
# This might happen if you run the wrapper directly from source
# without generating its _version.py first (if you use that approach for the wrapper itself)
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
WRAPPER_BUILD_INFO = "Wrapper build time unknown"
# --- End Import Version Info ---
# --- Constants for Version Generation ---
DEFAULT_VERSION = "0.0.0+unknown"
DEFAULT_COMMIT = "Unknown"
DEFAULT_BRANCH = "Unknown"
# --- End Constants ---
# --- Main Window Class ---
class MainWindow:
"""
Main application window for the Project Utility.
Manages the UI layout, tool discovery and listing, parameter input,
process execution via ProcessWorker, Git interactions via git_manager,
and tool configuration via ConfigWindow. Uses a queue for thread-safe
communication from background tasks (ProcessWorker, Git updates) to the GUI.
"""
def __init__(self, root: tk.Tk) -> None:
"""
Initializes the main application window.
Args:
root: The main Tkinter root window instance.
"""
self.root = root
self.logger = logging.getLogger(__name__)
self.gui_queue = queue.Queue() # Queue for messages from worker/git threads
# Application State
self.available_tools: Dict[str, ToolInfo] = (
{}
) # Discovered tools {tool_id: ToolInfo}
self.tools_in_listbox: List[str] = [] # Order of tool_ids in the listbox
self.selected_tool_id: Optional[str] = None # Currently selected tool ID
self.running_workers: Dict[str, ProcessWorker] = {} # {run_id: ProcessWorker}
self.current_parameter_widgets: Dict[str, Tuple[tk.Widget, tk.Variable]] = (
{}
) # {param_name: (widget, tk_var)}
self.selected_run_id_for_termination: Optional[str] = (
None # run_id selected in process tree
)
self.tool_state: Dict[str, Dict[str, Any]] = (
self._load_tool_state()
) # Load last used parameters
# Git Update State
self._git_update_running: bool = (
False # Flag indicating if a batch Git update is active
)
self._git_update_thread: Optional[threading.Thread] = None
# Check if core components failed to load and warn user
if not CORE_MODULES_LOADED:
# Use messagebox if Tkinter is available
try:
messagebox.showerror(
"Initialization Error",
"Core application modules failed to load. Functionality will be severely limited. Check logs.",
parent=self.root,
)
except Exception:
pass # Ignore if root isn't ready
# --- Setup UI ---
self._setup_window() # Configure root window properties
self._setup_ui_styles() # Configure ttk styles
self._setup_ui() # Create and arrange all widgets
# --- Initial Load and Start ---
self._load_tools() # Discover tools and populate the list
self._start_queue_processing() # Start polling the GUI queue
self.root.deiconify() # Show the main window
self.logger.info("MainWindow initialized and UI displayed.")
self.logger.info(
f"Tool Discovery: {'Enabled' if TOOL_DISCOVERY_ENABLED else 'DISABLED'}"
)
self.logger.info(
f"Process Execution: {'Enabled' if PROCESS_WORKER_ENABLED else 'DISABLED'}"
)
self.logger.info(f"Git Features: {'Enabled' if GIT_ENABLED else 'DISABLED'}")
self.logger.info(
f"Config Window: {'Enabled' if CONFIG_WINDOW_ENABLED else 'DISABLED'}"
)
def _load_tool_state(self) -> Dict[str, Dict[str, Any]]:
"""Loads the last used parameter values for each tool from the state file."""
self.logger.debug(f"Attempting to load tool state from: {STATE_FILENAME}")
if not os.path.isfile(STATE_FILENAME):
self.logger.info("Tool state file not found. Starting with empty state.")
return {}
try:
with open(STATE_FILENAME, "r", encoding="utf-8") as f:
state_data = json.load(f)
# Basic validation: ensure it's a dictionary
if not isinstance(state_data, dict):
self.logger.warning(
f"Invalid format in state file {STATE_FILENAME}: Expected a dictionary, got {type(state_data)}. Resetting state."
)
return {}
self.logger.info(f"Successfully loaded tool state from {STATE_FILENAME}")
return state_data
except json.JSONDecodeError as e:
self.logger.error(
f"Failed to parse JSON in state file {STATE_FILENAME}: {e}. Resetting state.",
exc_info=True,
)
return {}
except IOError as e:
self.logger.error(
f"Failed to read state file {STATE_FILENAME}: {e}. Resetting state.",
exc_info=True,
)
return {}
except Exception as e:
# Catch any other unexpected errors
self.logger.exception(
f"Unexpected error loading state file {STATE_FILENAME}. Resetting state."
)
return {}
def _save_tool_state(self, tool_id: str, parameters_used: Dict[str, Any]) -> None:
"""
Saves the used parameter values for a specific tool to the state file.
Uses an atomic write (write to temp file, then replace) to minimize
risk of corruption.
Args:
tool_id: The ID of the tool whose state is being saved.
parameters_used: A dictionary of parameter names and their last used values.
"""
if not tool_id:
self.logger.warning(
"Attempted to save state for an empty tool_id. Skipping."
)
return
self.logger.info(
f"Saving state for tool '{tool_id}' with parameters: {parameters_used}"
)
# Ensure the configuration directory exists
try:
os.makedirs(CONFIG_DIR, exist_ok=True)
except OSError as e:
self.logger.error(
f"Cannot create config directory '{CONFIG_DIR}' to save state: {e}. Aborting save.",
exc_info=True,
)
# Optionally show a user warning here?
# messagebox.showwarning("Save Warning", f"Could not create config directory.\nTool state for '{tool_id}' was not saved.", parent=self.root)
return
except Exception as e:
self.logger.exception(
f"Unexpected error creating config directory '{CONFIG_DIR}': {e}. Aborting save."
)
return
# Update the in-memory state dictionary
if tool_id not in self.tool_state:
self.tool_state[tool_id] = {}
# Only update with non-None values? Or save None too? Let's save what was used.
self.tool_state[tool_id].update(parameters_used)
# --- Atomic Write ---
temp_filename = STATE_FILENAME + ".tmp"
try:
with open(temp_filename, "w", encoding="utf-8") as f:
# Use indent for readability
json.dump(self.tool_state, f, indent=4, ensure_ascii=False)
# If write to temp file succeeded, replace the original file
os.replace(temp_filename, STATE_FILENAME)
self.logger.debug(f"Tool state saved successfully to {STATE_FILENAME}")
except (TypeError, ValueError) as json_err:
self.logger.error(
f"Failed to serialize tool state to JSON: {json_err}. State not saved.",
exc_info=True,
)
# Clean up temp file if it exists
if os.path.exists(temp_filename):
os.remove(temp_filename)
except (IOError, OSError) as file_err:
self.logger.error(
f"Failed to write or replace state file {STATE_FILENAME}: {file_err}. State not saved.",
exc_info=True,
)
if os.path.exists(temp_filename):
os.remove(temp_filename)
except Exception as e:
self.logger.exception(
f"Unexpected error saving state file {STATE_FILENAME}. State not saved."
)
if os.path.exists(temp_filename):
os.remove(temp_filename)
def _setup_window(self) -> None:
"""Configures the main Tkinter root window properties."""
self.root.title(f"Project Utility - {WRAPPER_APP_VERSION_STRING}")
self.root.minsize(800, 650) # Adjusted min size
self.root.geometry("1000x750") # Adjusted default size
# Set the close protocol handler
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
self.logger.debug("Main window properties configured.")
def _setup_ui_styles(self) -> None:
"""Sets up custom ttk styles used in the application UI."""
style = ttk.Style(self.root)
try:
# Attempt to set a more modern theme if available
available_themes = style.theme_names()
preferred_themes = [
"clam",
"alt",
"vista",
"xpnative",
"default",
] # Order of preference
theme_to_use = next(
(t for t in preferred_themes if t in available_themes),
style.theme_use(),
)
if theme_to_use:
style.theme_use(theme_to_use)
self.logger.info(f"Using ttk theme: {theme_to_use}")
else:
self.logger.warning("Could not find any preferred ttk themes.")
# Custom styles
style.configure(
"Italic.TLabel", font=("Segoe UI", 9, "italic")
) # Adjust font as needed
style.configure(
"Placeholder.TLabel", foreground="grey", font=("Segoe UI", 9, "italic")
)
# Get background color for consistency, handle potential TclError
try:
frame_bg = style.lookup("TFrame", "background")
lframe_bg = style.lookup("TLabelFrame", "background")
except tk.TclError:
frame_bg = "#f0f0f0" # Fallback grey
lframe_bg = frame_bg
# Configure parameter container frame background
style.configure(
"ParamContainer.TFrame", background=lframe_bg
) # Match LabelFrame background
# Configure command preview frame background (optional)
# style.configure("Preview.TFrame", background="#ffffff")
# style.configure("Preview.TLabel", background="#ffffff")
# Style for Accent button (optional)
style.configure("Accent.TButton", font="-weight bold")
except tk.TclError as e:
self.logger.warning(f"Could not configure ttk styles: {e}")
def _setup_ui(self) -> None:
"""Creates and arranges the main UI widgets and menu."""
self.logger.debug("Setting up main UI layout...")
# Configure root window grid
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
# --- Menu Bar ---
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# File Menu (Optional)
# file_menu = tk.Menu(menubar, tearoff=0)
# menubar.add_cascade(label="File", menu=file_menu)
# file_menu.add_command(label="Exit", command=self._on_close)
# Tools Menu
tools_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Tools", menu=tools_menu)
tools_menu.add_command(
label="Update All Git Tools",
command=self._update_all_git_tools_threaded,
state=(
tk.NORMAL if GIT_ENABLED else tk.DISABLED
), # Enable only if GitPython available
)
tools_menu.add_separator()
tools_menu.add_command(
label="Configure Selected Tool...",
command=self._open_tool_config_window,
state=tk.DISABLED, # Initially disabled, enabled when a tool is selected
)
# Add command to reload tool list manually?
tools_menu.add_command(label="Reload Tool List", command=self._load_tools)
self.tools_menu = tools_menu # Store reference to enable/disable items later
# --- Main Application Frame ---
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky="nsew")
# Configure main frame grid columns (Tool list | Details/Run)
main_frame.columnconfigure(
0, weight=0, minsize=200
) # Tool list column (fixed min width)
main_frame.columnconfigure(1, weight=1) # Details/Run column (expands)
# Configure main frame grid rows (Top Panes | Bottom Pane)
main_frame.rowconfigure(0, weight=1) # Top panes expand vertically
main_frame.rowconfigure(
1, weight=0
) # Bottom pane (running processes) fixed size
# --- Left Pane: Tool List ---
tool_list_frame = ttk.Frame(main_frame) # No extra padding needed here
tool_list_frame.grid(
row=0, column=0, sticky="nsew", padx=(0, 5)
) # Add padding to the right
tool_list_frame.rowconfigure(1, weight=1) # Make listbox expand vertically
tool_list_frame.columnconfigure(0, weight=1) # Make listbox expand horizontally
ttk.Label(tool_list_frame, text="Available Tools:", font="-weight bold").grid(
row=0, column=0, columnspan=2, sticky="w", pady=(0, 5)
)
# Listbox for tools
self.tool_listbox = tk.Listbox(
tool_list_frame, exportselection=False, width=25
) # exportselection=False allows other widgets to select text
self.tool_listbox.grid(row=1, column=0, sticky="nsew")
# Scrollbar for listbox
listbox_scrollbar_y = ttk.Scrollbar(
tool_list_frame, orient=tk.VERTICAL, command=self.tool_listbox.yview
)
listbox_scrollbar_y.grid(row=1, column=1, sticky="ns")
self.tool_listbox.config(yscrollcommand=listbox_scrollbar_y.set)
# Bind selection event
self.tool_listbox.bind("<<ListboxSelect>>", self._on_tool_selected)
# --- Right Pane: Tool Details, Parameters, Output, Run ---
right_pane = ttk.PanedWindow(
main_frame, orient=tk.VERTICAL
) # Use PanedWindow for resizing
right_pane.grid(row=0, column=1, sticky="nsew")
# Top section of right pane: Controls (Scrollable)
controls_outer_frame = ttk.LabelFrame(
right_pane, text="Tool Options", padding="5"
)
# Add this frame to the PanedWindow
right_pane.add(controls_outer_frame, weight=1) # Allow this pane to resize
controls_outer_frame.rowconfigure(0, weight=1) # Make canvas expand
controls_outer_frame.columnconfigure(0, weight=1) # Make canvas expand
# Canvas for scrolling
controls_canvas = tk.Canvas(
controls_outer_frame, borderwidth=0, highlightthickness=0
)
# Scrollbar for the canvas
controls_scrollbar = ttk.Scrollbar(
controls_outer_frame, orient="vertical", command=controls_canvas.yview
)
# Frame inside the canvas where actual controls will be placed
self.tool_controls_frame = ttk.Frame(
controls_canvas, style="ParamContainer.TFrame", padding="5"
)
# Link canvas and scrollbar
controls_canvas.configure(yscrollcommand=controls_scrollbar.set)
# Create the window on the canvas to hold the frame
controls_canvas_window = controls_canvas.create_window(
(0, 0), window=self.tool_controls_frame, anchor="nw"
)
# Configure scrolling behavior
# Update scrollregion when the controls frame size changes
self.tool_controls_frame.bind(
"<Configure>",
lambda e: controls_canvas.configure(
scrollregion=controls_canvas.bbox("all")
),
)
# Update the width of the frame inside the canvas when the canvas width changes
controls_canvas.bind(
"<Configure>",
lambda e: self.root.after_idle( # Use after_idle to ensure geometry is updated
lambda: controls_canvas.itemconfig(
controls_canvas_window, width=e.width
)
),
)
# Place canvas and scrollbar in the outer frame grid
controls_canvas.grid(row=0, column=0, sticky="nsew")
controls_scrollbar.grid(row=0, column=1, sticky="ns")
# Placeholder label shown when no tool is selected
self.controls_placeholder_label = ttk.Label(
self.tool_controls_frame,
text="Select a tool from the list on the left.",
style="Placeholder.TLabel",
)
self.controls_placeholder_label.pack(padx=10, pady=20, anchor="center")
# Git Info Label (created here, packed/unpacked in _on_tool_selected)
# Ensure it uses the background of its parent frame for better appearance
style_name = self.tool_controls_frame.cget("style") or "TFrame"
try:
bg_color = ttk.Style().lookup(style_name, "background")
except tk.TclError:
bg_color = "#f0f0f0" # Fallback
self.git_info_label = ttk.Label(
self.tool_controls_frame,
text="",
style="Italic.TLabel",
foreground="navy", # Dark blue color for Git info
background=bg_color, # Match parent background
)
# Do not pack it initially
# Bottom section of right pane: Output Log
output_frame = ttk.LabelFrame(right_pane, text="Output Log", padding="5")
right_pane.add(output_frame, weight=1) # Allow output log to resize too
output_frame.rowconfigure(0, weight=1) # Make text widget expand
output_frame.columnconfigure(0, weight=1) # Make text widget expand
self.output_text = scrolledtext.ScrolledText(
output_frame,
wrap=tk.WORD, # Wrap lines at word boundaries
state=tk.DISABLED, # Start as read-only
height=15, # Initial height in lines
relief=tk.SUNKEN,
borderwidth=1,
# Consider adding padding within the text area
# padx=5, pady=5
)
self.output_text.grid(row=0, column=0, sticky="nsew")
# Configure tags for colored output (use distinct, meaningful names)
self.output_text.tag_config("error", foreground="#D63031") # Red
self.output_text.tag_config("info", foreground="#0984E3") # Blue
self.output_text.tag_config("success", foreground="#00B894") # Green
self.output_text.tag_config("warning", foreground="#E17055") # Orange
self.output_text.tag_config("debug", foreground="#B2BEC3") # Grey
self.output_text.tag_config(
"command", foreground="#6C5CE7", font="-weight bold"
) # Purple, Bold
self.output_text.tag_config(
"run_id", foreground="#636E72", font="-size 8"
) # Dark Grey, Small
# Specific tags for log messages forwarded from tools (if applicable)
self.output_text.tag_config(
"log_info", foreground=self.output_text.tag_cget("info", "foreground")
)
self.output_text.tag_config(
"log_warning", foreground=self.output_text.tag_cget("warning", "foreground")
)
self.output_text.tag_config(
"log_error", foreground=self.output_text.tag_cget("error", "foreground")
)
self.output_text.tag_config(
"log_debug", foreground=self.output_text.tag_cget("debug", "foreground")
)
# Tag for raw stdout that isn't parsed JSON
self.output_text.tag_config(
"raw_stdout", foreground="#2d3436"
) # Very Dark Grey/Black
# --- Run Button Area (Below Output Log, within right_pane implicitly via layout) ---
# We place this outside the PanedWindow, directly in the main_frame grid below the right_pane
button_frame = ttk.Frame(
main_frame, padding=(0, 5, 0, 0)
) # Padding: left, top, right, bottom
button_frame.grid(
row=1, column=1, sticky="ew", pady=(5, 0)
) # Span only the right column
button_frame.columnconfigure(0, weight=1) # Push button to the right
self.run_button = ttk.Button(
button_frame,
text="Run Tool",
command=self._launch_tool,
state=tk.DISABLED, # Initially disabled
style="Accent.TButton", # Optional accent style
)
self.run_button.grid(row=0, column=1, sticky="e", padx=5) # Align right
# --- Bottom Pane: Running Processes ---
process_frame = ttk.LabelFrame(main_frame, text="Running Tools", padding="5")
# Grid position below tool list (col 0) and details (col 1), spanning both
process_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(10, 0))
process_frame.columnconfigure(0, weight=1) # Make Treeview expand horizontally
process_frame.rowconfigure(
0, weight=0
) # Treeview height controlled by 'height' option
# Treeview for running processes
self.process_tree = ttk.Treeview(
process_frame,
columns=("run_id", "tool_name", "status", "pid"),
show="headings", # Don't show the implicit first '#' column
selectmode="browse", # Only allow selecting one row
height=5, # Initial height in rows
)
# Define headings
self.process_tree.heading("run_id", text="Run ID", anchor=tk.W)
self.process_tree.heading("tool_name", text="Tool Name", anchor=tk.W)
self.process_tree.heading("status", text="Status", anchor=tk.CENTER)
self.process_tree.heading("pid", text="PID", anchor=tk.E)
# Define column properties (width, alignment, stretch)
self.process_tree.column("run_id", width=160, anchor=tk.W, stretch=tk.NO)
self.process_tree.column(
"tool_name", width=250, anchor=tk.W, stretch=tk.YES
) # Allow name to stretch
self.process_tree.column("status", width=100, anchor=tk.CENTER, stretch=tk.NO)
self.process_tree.column("pid", width=80, anchor=tk.E, stretch=tk.NO)
# Scrollbars for Treeview
tree_scroll_y = ttk.Scrollbar(
process_frame, orient="vertical", command=self.process_tree.yview
)
tree_scroll_x = ttk.Scrollbar(
process_frame, orient="horizontal", command=self.process_tree.xview
)
self.process_tree.configure(
yscrollcommand=tree_scroll_y.set, xscrollcommand=tree_scroll_x.set
)
# Place Treeview and scrollbars in the grid
self.process_tree.grid(row=0, column=0, sticky="nsew")
tree_scroll_y.grid(row=0, column=1, sticky="ns")
tree_scroll_x.grid(row=1, column=0, columnspan=2, sticky="ew")
# Frame for terminate button, below the treeview
process_button_frame = ttk.Frame(process_frame)
process_button_frame.grid(
row=2, column=0, columnspan=2, sticky="ew", pady=(5, 0)
)
process_button_frame.columnconfigure(0, weight=1) # Push button right
self.terminate_button = ttk.Button(
process_button_frame,
text="Terminate Selected",
command=self._terminate_selected_process,
state=tk.DISABLED, # Initially disabled
)
self.terminate_button.grid(row=0, column=1, sticky="e") # Align right
# Bind selection event for the process tree
self.process_tree.bind("<<TreeviewSelect>>", self._on_process_selected)
self.logger.debug("Main UI setup complete.")
# --- Tool Loading and Selection ---
def _load_tools(self) -> None:
"""
Discovers tools using the tool_discovery module and populates the
tool listbox in the GUI. Clears previous selection and parameter widgets.
"""
self.logger.info("Reloading available tools...")
# Clear the listbox
self.tool_listbox.delete(0, tk.END)
# Clear internal state related to tool list and selection
self.available_tools.clear()
self.tools_in_listbox.clear()
# Reset selection state and clear parameter area
self._clear_tool_controls() # Resets selection, buttons, and clears param widgets
if not TOOL_DISCOVERY_ENABLED:
self.logger.error(
"Cannot load tools: Tool discovery module is unavailable."
)
messagebox.showerror(
"Error",
"Tool Discovery module failed to load. Cannot find tools.",
parent=self.root,
)
self._update_placeholder("Tool Discovery Error.")
return
try:
# Call the discovery function (now imported from core)
self.available_tools = discover_tools()
if not self.available_tools:
self.logger.warning("No enabled/available tools were discovered.")
self._update_placeholder("No available tools found.")
return # Keep placeholder visible
# Sort tools by display name for consistent listing
# Items are (tool_id, ToolInfo object)
sorted_tools = sorted(
self.available_tools.items(), key=lambda item: item[1].display_name
)
# Populate the listbox
for tool_id, tool_info in sorted_tools:
self.tool_listbox.insert(tk.END, tool_info.display_name)
self.tools_in_listbox.append(tool_id) # Keep track of ID order
self.logger.info(
f"Finished loading {len(self.tools_in_listbox)} tools into the listbox."
)
self._update_placeholder(
"Select a tool from the list on the left."
) # Update placeholder text
except Exception as e:
# Catch unexpected errors during discovery or list population
self.logger.exception(
"An unexpected error occurred during tool discovery and loading."
)
messagebox.showerror(
"Error Loading Tools",
f"An error occurred while loading tools:\n{e}",
parent=self.root,
)
self._update_placeholder("Error loading tools.")
def _clear_parameter_widgets_only(self) -> None:
"""Destroys all widgets currently inside the tool_controls_frame."""
self.logger.debug("Clearing existing parameter widgets.")
if (
hasattr(self, "tool_controls_frame")
and self.tool_controls_frame.winfo_exists()
):
# Destroy all children of the controls frame
# This includes labels, separators, entry widgets, git info label etc.
for widget in self.tool_controls_frame.winfo_children():
# Keep the placeholder label, just pack_forget it later if needed
if (
widget != self.controls_placeholder_label
and widget != self.git_info_label
): # Also keep git label instance
widget.destroy()
# Clear the dictionary tracking parameter widgets
self.current_parameter_widgets.clear()
# Ensure Git info label is hidden
if hasattr(self, "git_info_label") and self.git_info_label.winfo_exists():
self.git_info_label.pack_forget()
def _update_placeholder(self, text: str):
"""Updates the text of the placeholder label and ensures it's packed."""
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.config(text=text)
# Ensure it's visible (pack if not already - pack_info() check is safer)
if not self.controls_placeholder_label.winfo_ismapped():
self.controls_placeholder_label.pack(padx=10, pady=20, anchor="center")
else:
self.logger.warning("Placeholder label does not exist, cannot update text.")
def _clear_tool_controls(self) -> None:
"""
Resets the tool selection state and clears the parameter/details area,
showing the placeholder text. Disables relevant buttons and menu items.
"""
self.logger.debug("Resetting tool selection and clearing controls area.")
self._clear_parameter_widgets_only()
self._update_placeholder(
"Select a tool from the list on the left."
) # Show default placeholder
self.selected_tool_id = None # Clear selection ID
# Disable Run button and Configure menu item
if hasattr(self, "run_button"):
self.run_button.config(state=tk.DISABLED)
try:
if hasattr(self, "tools_menu"):
# Find the index for "Configure Selected Tool..." - safer than hardcoding index
configure_index = None
try:
configure_index = self.tools_menu.index(
"Configure Selected Tool..."
)
except tk.TclError:
pass # Item not found
if configure_index is not None:
self.tools_menu.entryconfig(configure_index, state=tk.DISABLED)
except tk.TclError:
self.logger.warning("Could not disable 'Configure' menu item (TclError).")
def _on_tool_selected(self, event: Optional[tk.Event] = None) -> None:
"""
Handles the selection change event in the tool listbox.
Retrieves the selected tool's information, clears the old controls,
and dynamically builds the UI elements (description, Git status,
parameter widgets) for the newly selected tool.
Args:
event: The Tkinter event object (optional, can be called manually).
"""
if not hasattr(self, "tool_listbox") or not self.tool_listbox.winfo_exists():
self.logger.warning("Tool listbox not available, cannot handle selection.")
return
selected_indices = self.tool_listbox.curselection()
if not selected_indices:
# Selection cleared, possibly programmatically
self.logger.debug("Tool selection cleared.")
self._clear_tool_controls()
return
# Get the index of the selected item
selected_index = selected_indices[0]
# Ensure the index is valid for our internal list of tool IDs
if not (0 <= selected_index < len(self.tools_in_listbox)):
self.logger.error(
f"Selected index {selected_index} is out of bounds for tools_in_listbox."
)
self._clear_tool_controls()
return
new_tool_id = self.tools_in_listbox[selected_index]
# Avoid redundant processing if the same tool is clicked again
if new_tool_id == self.selected_tool_id:
self.logger.debug(f"Tool '{new_tool_id}' re-selected. No UI change needed.")
return
self.logger.info(f"Tool selected: ID='{new_tool_id}'")
self.selected_tool_id = new_tool_id
# Get the ToolInfo object for the selected tool
# This data comes from the discover_tools() function
selected_tool_info: Optional[ToolInfo] = self.available_tools.get(
self.selected_tool_id
)
if not selected_tool_info:
self.logger.error(
f"ToolInfo data missing for selected tool ID: '{self.selected_tool_id}'. Cannot display details."
)
messagebox.showerror(
"Internal Error",
f"Could not retrieve information for tool '{self.selected_tool_id}'.",
parent=self.root,
)
self._clear_tool_controls()
return
# --- Clear Old Controls and Prepare for New Ones ---
self._clear_parameter_widgets_only() # Destroy old widgets
# Hide the placeholder label before adding new content
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_ismapped()
):
self.controls_placeholder_label.pack_forget()
# --- Load Last Used Parameters for this Tool ---
last_used_params = self.tool_state.get(self.selected_tool_id, {})
self.logger.debug(
f"Last used parameters for '{self.selected_tool_id}': {last_used_params}"
)
# --- Display General Tool Info (Name, Description) ---
# Pack these directly into the tool_controls_frame
ttk.Label(
self.tool_controls_frame,
text=f"{selected_tool_info.display_name}",
font="-weight bold -size 11", # Slightly larger bold font for name
).pack(anchor="w", pady=(0, 5), padx=5)
if selected_tool_info.description:
ttk.Label(
self.tool_controls_frame,
text=selected_tool_info.description,
wraplength=500, # Adjust wraplength as needed based on window width
).pack(
anchor="w", pady=(0, 10), padx=5
) # Add more padding after description
# --- Display Git Info (if applicable) ---
# Load the registry entry to check the type
all_registry_entries = registry_manager.load_registry()
tool_registry_entry = registry_manager.get_tool_config(
self.selected_tool_id, all_registry_entries
)
git_display_text = ""
is_git_tool = bool(tool_registry_entry and tool_registry_entry.type == "git")
if is_git_tool:
if (
not hasattr(self, "git_info_label")
or not self.git_info_label.winfo_exists()
):
# Should not happen, but recreate defensively
self.logger.warning("Git info label missing, recreating.")
self.git_info_label = ttk.Label(
self.tool_controls_frame,
text="",
style="Italic.TLabel",
foreground="navy",
)
if (
GIT_ENABLED and tool_registry_entry
): # Need registry entry for status check
self.git_info_label.config(text=f"Git Status: {GIT_STATUS_CHECKING}")
self.git_info_label.pack(
anchor="w", padx=5, pady=(0, 10)
) # Pack below description
self.root.update_idletasks() # Show "Checking..." immediately
# Call status function (synchronous for now - could be slow)
# Consider making this async like in ConfigWindow if it causes freezes
try:
status_dict = git_manager.get_repository_status(tool_registry_entry)
status = status_dict.get("status", GIT_STATUS_ERROR)
message = status_dict.get("message", "")
if status not in [
GIT_STATUS_ERROR,
GIT_STATUS_NOT_CLONED,
GIT_STATUS_GITPYTHON_MISSING,
]:
local_ref = status_dict.get("local_ref", "?")
local_hash_short = (status_dict.get("local_hash") or "-------")[
:7
]
git_display_text = f"Git Status: {status} (Ref: {local_ref} [{local_hash_short}])"
else:
# Show simpler status for errors or missing repo
git_display_text = f"Git Status: {status} - {message}"
except Exception as e:
self.logger.error(
f"Error getting Git status for '{self.selected_tool_id}' in selection handler: {e}",
exc_info=True,
)
git_display_text = "Git Status: Error checking status"
else:
# GitPython missing or registry entry unavailable
git_display_text = (
f"Git Status: {GIT_STATUS_GITPYTHON_MISSING}"
if not GIT_ENABLED
else "Git Status: Error (Registry)"
)
# Update the label content and ensure it's packed
self.git_info_label.config(text=git_display_text)
if not self.git_info_label.winfo_ismapped():
self.git_info_label.pack(anchor="w", padx=5, pady=(0, 10))
else: # Not a git tool, ensure label is hidden
if hasattr(self, "git_info_label") and self.git_info_label.winfo_ismapped():
self.git_info_label.pack_forget()
# --- Display Parameters ---
# The parameters are now readily available in selected_tool_info.parameters
current_tool_params: List[ToolParameter] = selected_tool_info.parameters
if current_tool_params:
self.logger.debug(
f"Found {len(current_tool_params)} parameters for tool '{self.selected_tool_id}'. Creating widgets."
)
# Add a separator before parameters if other info was displayed
if selected_tool_info.description or is_git_tool:
ttk.Separator(self.tool_controls_frame, orient="horizontal").pack(
fill="x", pady=10, padx=5
)
# Create a container frame for the parameter widgets grid
param_container = ttk.Frame(
self.tool_controls_frame, style="ParamContainer.TFrame"
)
param_container.pack(fill="x", expand=True, padx=5)
param_container.columnconfigure(
1, weight=1
) # Allow input widgets to expand
# Create widget for each parameter definition
for i, param_def in enumerate(current_tool_params):
self._create_parameter_widget(
param_container, i, param_def, last_used_params
)
elif (
not is_git_tool
): # Only show "no params" if not a Git tool (Git tools show status instead)
ttk.Label(
self.tool_controls_frame,
text="This tool requires no parameters.",
style="Italic.TLabel",
).pack(anchor="w", padx=5, pady=10)
else:
# For Git tools without params, the Git status serves as the primary info.
pass
# --- Final UI Updates for Selection Change ---
# Adjust scrolling for the parameter area
self.tool_controls_frame.update_idletasks() # Ensure layout is updated
canvas_widget = self.tool_controls_frame.master
if isinstance(canvas_widget, tk.Canvas):
canvas_widget.configure(scrollregion=canvas_widget.bbox("all"))
canvas_widget.yview_moveto(0) # Scroll parameter area to top
# Update Run button state
self.run_button.config(
state=tk.NORMAL if PROCESS_WORKER_ENABLED else tk.DISABLED
)
if not PROCESS_WORKER_ENABLED:
# Show warning if worker is disabled
ttk.Label(
self.tool_controls_frame,
text="Tool execution is disabled (ProcessWorker unavailable).",
foreground="orange",
).pack(anchor="w", pady=(15, 0), padx=5)
# Update Configure menu item state
try:
if hasattr(self, "tools_menu"):
configure_index = self.tools_menu.index("Configure Selected Tool...")
# Enable configure only if a registry entry exists and ConfigWindow is available
conf_state = (
tk.NORMAL
if tool_registry_entry and CONFIG_WINDOW_ENABLED
else tk.DISABLED
)
self.tools_menu.entryconfig(configure_index, state=conf_state)
except tk.TclError:
pass # Ignore if menu gone or item not found
def _create_parameter_widget(
self,
parent: ttk.Frame,
row_index: int,
param_def: ToolParameter,
last_used_values: Dict[str, Any],
):
"""
Creates the appropriate Tkinter label and input widget for a given
ToolParameter definition and places it in the parent frame's grid.
Args:
parent: The parent ttk.Frame where widgets will be placed.
row_index: The grid row index for these widgets.
param_def: The ToolParameter object defining the parameter.
last_used_values: Dictionary of previously used values for this tool.
"""
param_name = param_def.name
label_text = f"{param_def.label}{'*' if param_def.required else ''}:"
param_type = param_def.type.lower()
tk_variable = None # The Tkinter variable associated with the widget
input_widget = None # The main input widget (Entry, Checkbutton, etc.)
# Determine initial value: Last used value OR default value from definition
initial_value = last_used_values.get(param_name, param_def.default)
self.logger.debug(
f"Creating widget for Param '{param_name}': Type='{param_type}', Required={param_def.required}, "
f"InitialValue='{initial_value}' (from {'state' if param_name in last_used_values else 'default'})"
)
# Create Label
label_widget = ttk.Label(parent, text=label_text)
label_widget.grid(
row=row_index, column=0, sticky="nw", padx=(0, 10), pady=3
) # Align top-west
# --- Create Input Widget based on Parameter Type ---
try:
if param_type == "string":
tk_variable = tk.StringVar()
input_widget = ttk.Entry(
parent, textvariable=tk_variable, width=50
) # Adjust width as needed
elif param_type == "integer":
tk_variable = tk.IntVar()
# Use Spinbox for constrained integer input
input_widget = ttk.Spinbox(
parent,
from_=-(2**31), # Min 32-bit signed int
to=2**31 - 1, # Max 32-bit signed int
textvariable=tk_variable,
width=15, # Adjust width
wrap=False, # Don't wrap around
)
# Set default to 0 if initial value is None
if initial_value is None:
initial_value = 0
elif param_type == "float":
tk_variable = tk.DoubleVar()
input_widget = ttk.Spinbox(
parent,
from_=-float("inf"),
to=float("inf"),
format="%0.6g", # Format for float display (adjust precision)
textvariable=tk_variable,
width=15, # Adjust width
wrap=False,
)
if initial_value is None:
initial_value = 0.0
elif param_type == "boolean":
tk_variable = tk.BooleanVar()
input_widget = ttk.Checkbutton(
parent, variable=tk_variable, style="TCheckbutton"
)
# No grid here, placed later
elif param_type == "file" or param_type == "folder":
tk_variable = tk.StringVar()
# Use a Frame to combine Entry (read-only) and Button
widget_frame = ttk.Frame(
parent, style="ParamContainer.TFrame"
) # Match parent background
path_entry = ttk.Entry(
widget_frame, textvariable=tk_variable, width=45, state="readonly"
) # Readonly display
browse_button = ttk.Button(
widget_frame,
text="Browse...",
style="TButton",
# Use partial to pass necessary arguments to the browse function
command=partial(self._browse_path, param_def, tk_variable),
)
path_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
browse_button.pack(side=tk.LEFT, padx=(5, 0))
input_widget = widget_frame # The frame is the main widget to grid
# elif param_type == "choice": # Example for dropdown/combobox
# tk_variable = tk.StringVar()
# options = param_def.options.get("options", []) if param_def.options else []
# input_widget = ttk.Combobox(parent, textvariable=tk_variable, values=options, state="readonly", width=48)
else:
# Fallback for unsupported types: treat as string
self.logger.warning(
f"Unsupported parameter type '{param_def.type}' for parameter '{param_name}'. "
f"Using standard string input."
)
tk_variable = tk.StringVar()
input_widget = ttk.Entry(parent, textvariable=tk_variable, width=50)
# --- Set Initial Value for the Tkinter Variable ---
if tk_variable is not None and initial_value is not None:
try:
# Handle type conversions carefully
if param_type == "integer":
tk_variable.set(int(initial_value))
elif param_type == "float":
tk_variable.set(float(initial_value))
elif param_type == "boolean":
is_true = (
str(initial_value).lower() in ["true", "1", "yes", "on"]
if isinstance(initial_value, str)
else bool(initial_value)
)
tk_variable.set(is_true)
# elif param_type == "choice":
# if initial_value in options: tk_variable.set(initial_value)
# elif options: tk_variable.set(options[0]) # Default to first option?
else: # String, File, Folder, etc.
tk_variable.set(str(initial_value))
except (ValueError, TypeError) as e:
self.logger.warning(
f"Could not set initial value '{initial_value}' for parameter "
f"'{param_name}' (Type: {param_type}). Error: {e}. Using default/empty."
)
# Optionally set a fallback default (e.g., 0 for numbers, False for bool)
if param_type == "integer":
tk_variable.set(0)
elif param_type == "float":
tk_variable.set(0.0)
elif param_type == "boolean":
tk_variable.set(False)
else:
tk_variable.set("")
# --- Place Widget in Grid and Store Reference ---
if input_widget and tk_variable:
# Boolean Checkbutton aligns better without stretching east-west
sticky_align = "w" if param_type == "boolean" else "ew"
input_widget.grid(
row=row_index, column=1, sticky=sticky_align, padx=0, pady=3
)
# Store the widget and its variable for later retrieval
self.current_parameter_widgets[param_name] = (input_widget, tk_variable)
else:
# This should not happen if widget creation logic is correct
self.logger.error(
f"Failed to create input widget or Tk variable for parameter '{param_name}'."
)
except Exception as e:
# Catch-all for unexpected errors during widget creation
self.logger.exception(
f"Unexpected error creating widget for parameter '{param_name}'"
)
# Display an error placeholder in the GUI
error_label = ttk.Label(
parent, text="Error creating widget!", foreground="red"
)
error_label.grid(row=row_index, column=1, sticky="w", padx=0, pady=3)
# Store placeholder info? No, better to just show error.
self.current_parameter_widgets[param_name] = (
error_label,
None,
) # Store error label, no variable
def _browse_path(self, param_def: ToolParameter, tk_variable: tk.StringVar):
"""Opens a file or folder selection dialog based on the parameter definition."""
param_type = param_def.type.lower()
self.logger.debug(
f"Browse path requested for param '{param_def.name}', type '{param_type}'"
)
# Determine initial directory for the dialog
current_path = tk_variable.get()
initial_dir = APP_ROOT_DIR # Default to repo root
if current_path:
potential_dir = os.path.dirname(current_path)
if os.path.isdir(potential_dir):
initial_dir = potential_dir
elif os.path.isdir(current_path): # If current path IS a directory
initial_dir = current_path
selected_path: Optional[str] = None
try:
if param_type == "file":
# --- Configure file dialog options ---
options = param_def.options or {}
file_types = [("All Files", "*.*")] # Default filter
# Parse filters from options if provided
if isinstance(options.get("filter"), list):
try:
parsed_filters = [
(ft.get("n", "Type ?"), ft.get("p", "*.*"))
for ft in options["filter"]
if isinstance(ft, dict) and "n" in ft and "p" in ft
]
if parsed_filters:
file_types = parsed_filters
except Exception as filter_err:
self.logger.warning(
f"Error parsing file filters for '{param_def.name}': {filter_err}"
)
# Determine if it's a save dialog
is_save_dialog = options.get("save_as", False)
dialog_func = (
filedialog.asksaveasfilename
if is_save_dialog
else filedialog.askopenfilename
)
# Determine default extension for save dialog
default_extension = None
if is_save_dialog:
# Try to get extension from the first filter pattern
first_pattern = file_types[0][1] # e.g., "*.png" or "*"
if "*." in first_pattern and "." in first_pattern.split("*.")[-1]:
default_extension = "." + first_pattern.split(".")[-1]
# --- Show file dialog ---
selected_path = dialog_func(
parent=self.root,
title=f"Select file for {param_def.label}",
initialdir=initial_dir,
initialfile=os.path.basename(current_path)
or "", # Pre-fill filename if exists
filetypes=file_types,
defaultextension=default_extension, # Only used by asksaveasfilename
)
elif param_type == "folder":
# --- Show directory dialog ---
selected_path = filedialog.askdirectory(
parent=self.root,
title=f"Select folder for {param_def.label}",
initialdir=initial_dir,
mustexist=True, # Or False depending on requirements
)
else:
# Should not happen if called correctly
self.logger.error(
f"Browse path called for unsupported parameter type: {param_type}"
)
return
# --- Update variable if a path was selected ---
if selected_path: # Dialog returns empty string or None if cancelled
# Normalize the path (optional, but good practice)
normalized_path = os.path.normpath(selected_path)
tk_variable.set(normalized_path)
self.logger.debug(f"Path set for '{param_def.name}': {normalized_path}")
else:
self.logger.debug("Browse path dialog cancelled by user.")
except Exception as e:
# Catch errors during dialog creation or interaction
self.logger.exception("An error occurred during the browse path dialog.")
messagebox.showerror(
"Dialog Error", f"Could not open browse dialog:\n{e}", parent=self.root
)
# --- Process Execution and Management ---
def _launch_tool(self) -> None:
"""
Gathers parameter values, validates them, saves the current state,
and launches the selected tool using ProcessWorker in a background thread.
"""
if not self.selected_tool_id:
self.logger.warning("Launch attempt failed: No tool selected.")
messagebox.showwarning(
"Cannot Run",
"Please select a tool from the list first.",
parent=self.root,
)
return
if not PROCESS_WORKER_ENABLED:
self.logger.error("Launch attempt failed: ProcessWorker is unavailable.")
messagebox.showerror(
"Error",
"Tool execution engine (ProcessWorker) is not available. Cannot run tool.",
parent=self.root,
)
return
selected_tool_info = self.available_tools.get(self.selected_tool_id)
if not selected_tool_info:
self.logger.error(
f"Launch attempt failed: ToolInfo not found for selected ID '{self.selected_tool_id}'."
)
messagebox.showerror(
"Internal Error",
f"Could not find information for tool '{self.selected_tool_id}'.",
parent=self.root,
)
return
self.logger.info(
f"Preparing to launch tool: '{selected_tool_info.display_name}' (ID: {self.selected_tool_id})"
)
# --- Gather and Validate Parameters ---
parameters_to_pass: Dict[str, Any] = {}
validation_errors: List[str] = []
for param_name, (widget, tk_var) in self.current_parameter_widgets.items():
# Find the corresponding ToolParameter definition
param_def = next(
(p for p in selected_tool_info.parameters if p.name == param_name), None
)
if not param_def:
self.logger.warning(
f"Parameter definition not found for widget key '{param_name}'. Skipping."
)
continue
if tk_var is None: # Error during widget creation
self.logger.error(
f"Cannot get value for '{param_name}': Tkinter variable is missing."
)
validation_errors.append(f"Internal error reading '{param_def.label}'.")
continue
try:
value = tk_var.get()
# --- Perform validation based on param_def ---
# 1. Required field check (especially for strings)
if param_def.required and isinstance(value, str) and not value.strip():
validation_errors.append(
f"'{param_def.label}' is required and cannot be empty."
)
# 2. Type validation (Tkinter variables usually handle basic types, but check if needed)
# Example: Ensure integer field didn't somehow get non-numeric input (less likely with Spinbox)
# if param_def.type == "integer":
# try: int(value) # Check if convertible
# except ValueError: validation_errors.append(f"Invalid integer value for '{param_def.label}'.")
# Store the value to be passed to the worker
parameters_to_pass[param_name] = value
except tk.TclError as e:
# Error retrieving value from Tkinter variable
self.logger.error(
f"TclError getting value for parameter '{param_name}': {e}"
)
validation_errors.append(
f"Invalid value entered for '{param_def.label}'."
)
except Exception as e:
# Catch any other unexpected errors during value retrieval/validation
self.logger.exception(
f"Unexpected error getting value for parameter '{param_name}'."
)
validation_errors.append(
f"Error reading value for '{param_def.label}'."
)
# --- Show Validation Errors if Any ---
if validation_errors:
error_message = (
"Please correct the following parameter errors:\n\n- "
+ "\n- ".join(validation_errors)
)
self.logger.warning(f"Parameter validation failed: {validation_errors}")
messagebox.showerror("Invalid Parameters", error_message, parent=self.root)
return # Stop launch
# --- Save Current Parameter Values to State ---
try:
self._save_tool_state(self.selected_tool_id, parameters_to_pass)
except Exception as e:
# Log error but don't necessarily stop the launch? User might still want to run.
self.logger.exception(
"Error occurred while saving tool state before launch."
)
# messagebox.showwarning("State Save Error", "Could not save the current parameter values.", parent=self.root)
# --- Launch Process Worker ---
try:
# Generate a unique run ID for this execution instance
run_id = f"{self.selected_tool_id}_{os.urandom(4).hex()}"
self.logger.info(f"Generated unique Run ID: {run_id}")
# Add entry to the running processes Treeview
initial_pid = "Starting..."
status_msg = "Starting"
if hasattr(self, "process_tree"):
self.process_tree.insert(
"", # Insert at the root level
tk.END, # Insert at the end of the list
iid=run_id, # Use run_id as the unique item identifier
values=(
run_id,
selected_tool_info.display_name,
status_msg,
initial_pid,
),
tags=("running",), # Optional tag for styling
)
# Scroll to make the new item visible?
# self.process_tree.see(run_id)
# Append starting message to the output log
self._append_output(f"[{run_id}] ", ("run_id",))
self._append_output(
f"-- Starting Tool: {selected_tool_info.display_name} --\n",
("info", "command"),
)
# Create the ProcessWorker instance
worker = ProcessWorker(
run_id, selected_tool_info, parameters_to_pass, self.gui_queue
)
# Create and start the background thread for the worker's run() method
thread = threading.Thread(
target=worker.run,
daemon=True, # Allow application exit even if worker thread hangs
name=f"{run_id}_worker",
)
# Store the worker instance, keyed by run_id, to manage running processes
self.running_workers[run_id] = worker
# Start the thread
thread.start()
# Schedule a check to update the PID in the treeview shortly after start
# (PID is available only after Popen returns inside worker.run)
self.root.after(200, self._update_pid_in_treeview, run_id, worker)
self.logger.info(f"ProcessWorker thread started for Run ID: {run_id}")
except Exception as e:
# Catch errors during worker/thread creation or launch
self.logger.exception(
f"Failed to launch tool '{selected_tool_info.display_name}'"
)
messagebox.showerror(
"Tool Launch Error", f"Could not start the tool:\n{e}", parent=self.root
)
self._append_output(
f"ERROR: Failed to launch {selected_tool_info.display_name}\n",
("error",),
)
# Clean up entry in process tree if it was added
if hasattr(self, "process_tree") and self.process_tree.exists(run_id):
self.process_tree.delete(run_id)
# Remove from running workers dict if added
if run_id in self.running_workers:
del self.running_workers[run_id]
def _update_pid_in_treeview(self, run_id: str, worker: ProcessWorker):
"""Callback scheduled to update the PID in the process treeview after launch."""
if not hasattr(self, "process_tree") or not self.process_tree.winfo_exists():
return # Do nothing if treeview is gone
try:
if self.process_tree.exists(run_id):
# Check if worker has the _process attribute and if it has a pid
pid_value = "N/A"
if (
worker
and hasattr(worker, "_process")
and worker._process
and hasattr(worker._process, "pid")
):
pid_value = worker._process.pid
self.process_tree.set(run_id, column="pid", value=pid_value)
# Update status from "Starting" to "Running" if PID is available
current_status = self.process_tree.set(run_id, column="status")
if current_status == "Starting" and pid_value != "N/A":
self.process_tree.set(run_id, column="status", value="Running")
self.logger.debug(
f"Updated PID in treeview for {run_id} to {pid_value}"
)
else:
self.logger.debug(
f"Run ID {run_id} no longer exists in treeview, cannot update PID."
)
except tk.TclError:
# Can happen if the item is deleted concurrently
self.logger.debug(
f"TclError updating PID for {run_id}, item likely deleted."
)
except Exception as e:
self.logger.error(
f"Error updating PID in treeview for {run_id}: {e}", exc_info=True
)
def _on_process_selected(self, event: Optional[tk.Event] = None) -> None:
"""Handles selection changes in the running processes Treeview."""
if not hasattr(self, "process_tree") or not self.process_tree.winfo_exists():
return
selected_items = self.process_tree.selection()
if not selected_items:
# No item selected, disable terminate button
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
return
# Get the run_id (which is the item ID) of the selected process
self.selected_run_id_for_termination = selected_items[0]
# Enable the terminate button only if the selected process is still tracked as running
current_status = self.process_tree.set(
self.selected_run_id_for_termination, column="status"
)
is_running = (
self.selected_run_id_for_termination in self.running_workers
and current_status not in ["Finished", "Error", "Terminated"]
)
term_state = tk.NORMAL if is_running else tk.DISABLED
self.terminate_button.config(state=term_state)
if term_state == tk.NORMAL:
self.logger.debug(
f"Process selected for termination: {self.selected_run_id_for_termination}"
)
def _terminate_selected_process(self) -> None:
"""Attempts to terminate the process selected in the Treeview."""
run_id = self.selected_run_id_for_termination
if not run_id:
self.logger.warning("Terminate button clicked but no process selected.")
return
worker = self.running_workers.get(run_id)
if not worker:
self.logger.warning(
f"Cannot terminate {run_id}: Worker instance not found (already finished?)."
)
messagebox.showinfo(
"Process Finished",
f"Process '{run_id}' seems to have already finished.",
parent=self.root,
)
# Update UI state: remove from tree if still there, disable button
if hasattr(self, "process_tree") and self.process_tree.exists(run_id):
self.process_tree.delete(run_id)
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
return
tool_name = worker.tool_info.display_name if worker.tool_info else run_id
# --- Confirmation Dialog ---
if messagebox.askyesno(
"Confirm Termination",
f"Are you sure you want to terminate the running tool:\n\n'{tool_name}' (Run ID: {run_id})?",
parent=self.root,
):
self.logger.info(f"User confirmed termination for Run ID: {run_id}")
# Update status in Treeview immediately
try:
if hasattr(self, "process_tree") and self.process_tree.exists(run_id):
self.process_tree.set(
run_id, column="status", value="Terminating..."
)
self.process_tree.selection_remove(run_id) # Deselect
except tk.TclError:
self.logger.warning(
"TclError updating treeview status during termination."
)
except Exception as e:
self.logger.error(
f"Error updating treeview status for termination: {e}"
)
# Disable button immediately
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
# --- Call Worker's Terminate Method ---
try:
worker.terminate() # This signals the process (SIGTERM/TerminateProcess)
self.logger.info(
f"Terminate signal sent to process for Run ID: {run_id}."
)
# The worker thread will eventually send the 'finished' message with the exit code.
except Exception as e:
self.logger.exception(
f"Error occurred while calling worker.terminate() for {run_id}"
)
messagebox.showerror(
"Termination Error",
f"Could not send terminate signal:\n{e}",
parent=self.root,
)
# Try to update status back? Or leave as Terminating?
# Maybe refresh status when 'finished' message eventually arrives.
else:
# User cancelled termination
self.logger.info("Termination cancelled by user.")
# --- Queue Processing ---
def _start_queue_processing(self) -> None:
"""Starts the periodic polling of the GUI message queue."""
self.logger.debug("Starting GUI queue processing loop.")
# Schedule the first call to _process_queue
# Use a small delay (e.g., 100ms)
try:
self.root.after(100, self._process_queue)
except tk.TclError:
self.logger.warning(
"Cannot schedule queue processing: Root window destroyed."
)
def _process_queue(self) -> None:
"""Processes all messages currently in the GUI queue."""
try:
# Process all messages currently in the queue without blocking
while True:
try:
# Get message without waiting
queue_item = self.gui_queue.get_nowait()
# --- Process Based on Message Type ---
if isinstance(queue_item, dict) and "type" in queue_item:
# Assumed message from ProcessWorker
self._handle_worker_message(queue_item)
elif isinstance(queue_item, tuple) and len(queue_item) == 2:
# Assumed message from Git background task (type, data)
self._handle_git_queue_message(queue_item)
else:
# Log unknown message format
self.logger.warning(
f"Received unknown item format in GUI queue: {queue_item}"
)
# Mark task as done (important for queue management)
self.gui_queue.task_done()
except queue.Empty:
# No more messages in the queue for now
break
except Exception as e:
# Catch errors during handling of a specific message
self.logger.exception("Error processing item from GUI queue.")
# Mark task done even if handling failed to avoid blocking queue
self.gui_queue.task_done()
except Exception as e:
# Catch errors related to the queue processing loop itself
self.logger.exception(
"An unexpected error occurred in the queue processing loop."
)
finally:
# --- Reschedule the next check ---
# Ensure the loop continues as long as the window exists
if self.root.winfo_exists():
self.root.after(100, self._process_queue) # Check again after 100ms
else:
self.logger.info(
"Root window destroyed, stopping GUI queue processing."
)
def _handle_worker_message(self, message: dict) -> None:
"""Handles messages received from ProcessWorker instances."""
msg_type = message.get("type")
run_id = message.get("run_id", "UNKNOWN_RUN_ID")
data = message.get("data", "") # Default to empty string if missing
prefix = f"[{run_id}] "
run_id_tag = ("run_id",) # Tag for the prefix
# --- Handle Different Message Types ---
if msg_type == MSG_TYPE_WORKER_STDOUT:
# Try parsing stdout as JSON first
parsed = None
is_json = False
line = data.strip() if isinstance(data, str) else ""
if line.startswith("{") and line.endswith("}"):
try:
parsed = json.loads(line)
is_json = isinstance(parsed, dict) # Check if it's a dictionary
except json.JSONDecodeError:
is_json = False
# Don't log parse failure here, just treat as raw text below
# If it's valid JSON with a 'type' field, handle structured message
if is_json and "type" in parsed:
json_type = parsed.get("type")
json_data = parsed # Use the whole parsed dict
self.logger.debug(
f"Received structured JSON from {run_id}: Type={json_type}"
)
if json_type == "progress":
value = json_data.get("value", 0)
msg = json_data.get("message", "")
progress_percent = (
f"{value:.0%}"
if isinstance(value, (int, float))
else f"{value}"
)
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Progress {progress_percent}] {msg}\n", ("info",)
)
elif json_type == "status":
msg = json_data.get("message", "")
self._append_output(prefix, run_id_tag)
self._append_output(f"[Status] {msg}\n", ("info",))
elif json_type == "log":
level_char = str(json_data.get("level", "info")).lower()[
0
] # i, w, e, d
msg = json_data.get("message", "")
log_tag = f"log_{level_char}" # Map 'i'->log_i, 'w'->log_w etc.
# Use fallback tag if specific level tag doesn't exist
final_tag = (
log_tag if log_tag in self.output_text.tag_names() else "info"
)
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Log:{level_char.upper()}] {msg}\n", (final_tag,)
)
elif json_type == "result":
result_payload = json_data.get("data", {})
self.logger.info(
f"Received Result data from {run_id}: {result_payload}"
)
try:
# Pretty print the result data
result_str = json.dumps(
result_payload, indent=2, ensure_ascii=False
)
except Exception:
result_str = str(
result_payload
) # Fallback to string representation
self._append_output(prefix, run_id_tag)
self._append_output(f"[Result Data]:\n{result_str}\n", ("success",))
else:
# Unknown JSON type received from tool
self.logger.warning(
f"Received unknown structured JSON type '{json_type}' from {run_id}: {json_data}"
)
try:
unknown_json_str = json.dumps(json_data, indent=2)
except Exception:
unknown_json_str = str(json_data)
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Unknown JSON Type:{json_type}]:\n{unknown_json_str}\n",
("warning",),
)
else:
# Not valid JSON or doesn't follow expected structure, treat as raw stdout
# self.logger.debug(f"Raw stdout from {run_id}: {data.rstrip()}")
self._append_output(prefix, run_id_tag)
self._append_output(
data, ("raw_stdout",)
) # Append raw data with specific tag
elif msg_type == MSG_TYPE_WORKER_STDERR:
self.logger.warning(f"Stderr from {run_id}: {data.rstrip()}")
self._append_output(prefix, run_id_tag)
self._append_output(data, ("error",)) # Append stderr data with error tag
elif msg_type == MSG_TYPE_WORKER_STATUS:
# Internal status messages *from* the ProcessWorker itself (not the tool)
self.logger.info(f"Worker Status [{run_id}]: {data}")
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Worker Status] {data}\n", ("debug",)
) # Use debug tag for internal status
elif msg_type == MSG_TYPE_WORKER_FINISHED:
exit_code = message.get("exit_code", "?")
self.logger.info(
f"Worker finished message received for {run_id}. Exit Code: {exit_code}"
)
# Determine status and tag based on exit code
status_text = "Finished"
log_tag = "success"
if exit_code == 0:
status_text = "Finished (Success)"
elif isinstance(exit_code, int) and exit_code < 0:
# Use negative codes for internal worker/launch errors
status_text = "Error (Internal)"
log_tag = "error"
elif exit_code != 0:
status_text = f"Finished (Code: {exit_code})"
log_tag = "warning" # Use warning for non-zero exit codes from tool
# Update the output log
self._append_output(prefix, run_id_tag)
self._append_output(f"-- {status_text} --\n", (log_tag, "command"))
# --- Clean up running process state ---
# Remove worker from tracking dictionary
if run_id in self.running_workers:
del self.running_workers[run_id]
self.logger.debug(f"Removed worker {run_id} from running_workers dict.")
else:
self.logger.warning(
f"Received finished message for untracked run_id: {run_id}"
)
# Remove or update entry in the process Treeview
try:
if hasattr(self, "process_tree") and self.process_tree.exists(run_id):
# Update status column before deleting? Or just delete? Delete is simpler.
# self.process_tree.set(run_id, column="status", value=status_text)
self.process_tree.delete(run_id)
self.logger.debug(f"Removed {run_id} from process treeview.")
# Reset termination button state if the finished process was selected
if run_id == self.selected_run_id_for_termination:
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
except tk.TclError:
self.logger.warning(
f"TclError cleaning up process tree for {run_id} (likely already deleted)."
)
except Exception as e:
self.logger.error(
f"Error cleaning up process tree for {run_id}: {e}", exc_info=True
)
else:
# Unknown message type from worker
self.logger.error(
f"Received unknown message type '{msg_type}' from worker {run_id}: {data}"
)
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Internal Error: Unknown Msg Type '{msg_type}'] {data}\n", ("error",)
)
def _handle_git_queue_message(self, queue_item: tuple) -> None:
"""Handles messages received from Git background operations."""
try:
# Expecting a tuple: (message_type, data)
msg_type, data = queue_item
self.logger.debug(f"Git Queue Received: Type={msg_type}, Data={data}")
prefix = "[Git Update] "
base_tag = "info" # Default tag
if msg_type == MSG_TYPE_GIT_STATUS:
base_tag = "info"
log_message = f"{prefix}{data}\n"
elif msg_type == MSG_TYPE_GIT_PROGRESS:
base_tag = "debug" # Progress messages can be verbose
log_message = f"{prefix}{data}\n"
elif msg_type == MSG_TYPE_GIT_ERROR:
base_tag = "error"
log_message = f"{prefix}ERROR: {data}\n"
elif msg_type == MSG_TYPE_GIT_SINGLE_RESULT:
# Data expected: (tool_id, success_bool, message_str)
if isinstance(data, tuple) and len(data) == 3:
tid, success, msg = data
base_tag = "success" if success else "error"
log_message = f"{prefix}[{tid}] {msg}\n"
self.logger.info(
f"Git single result for '{tid}': Success={success}, Msg={msg}"
)
else:
base_tag = "error"
log_message = f"{prefix}Internal Error: Invalid GIT_SINGLE_RESULT data format: {data}\n"
self.logger.error(f"Invalid GIT_SINGLE_RESULT data: {data}")
elif msg_type == MSG_TYPE_GIT_ALL_FINISHED:
# Data expected: (success_count, error_count)
self.logger.info("Received Git 'all finished' signal.")
self._git_update_running = False # Reset the global update flag
# Re-enable the menu item
try:
if hasattr(self, "tools_menu"):
update_index = self.tools_menu.index("Update All Git Tools")
self.tools_menu.entryconfig(
update_index,
state=tk.NORMAL if GIT_ENABLED else tk.DISABLED,
)
except tk.TclError:
pass # Ignore menu errors
if isinstance(data, tuple) and len(data) == 2:
ok_count, err_count = data
summary_msg = f"Finished batch Git update. Succeeded: {ok_count}, Failed: {err_count}."
base_tag = "info" if err_count == 0 else "warning"
log_message = f"--- {summary_msg} ---\n"
self.logger.info(summary_msg)
# Show summary message box to user
msg_func = (
messagebox.showinfo
if err_count == 0
else messagebox.showwarning
)
msg_func("Update Complete", summary_msg, parent=self.root)
# Reload tool list to reflect potential changes (new clones, updated status)
self.logger.info("Reloading tool list after Git batch update.")
self._load_tools()
else:
# Invalid data format for finished message
base_tag = "error"
log_message = f"{prefix}Internal Error: Invalid GIT_ALL_FINISHED data format: {data}\n"
self.logger.error(f"Invalid GIT_ALL_FINISHED data: {data}")
messagebox.showinfo(
"Update Complete",
"Batch Git update finished (error in summary data).",
parent=self.root,
)
self._load_tools() # Still reload tools
# Return here as we've handled everything for this message type
self._append_output(log_message, (base_tag, "command"))
return # Avoid appending again below
else:
# Unknown message type from Git task
self.logger.warning(
f"Received unknown message type '{msg_type}' from Git queue: {data}"
)
base_tag = "warning"
log_message = f"{prefix}[Unknown Git Msg Type: {msg_type}] {data}\n"
# Append the formatted message to the output log
self._append_output(log_message, (base_tag,))
except Exception as e:
# Catch errors during the handling of the Git message itself
self.logger.exception(
f"Error handling message from Git queue: {queue_item}"
)
# Ensure update flag is reset if an error occurs handling the 'finished' message
if (
isinstance(queue_item, tuple)
and len(queue_item) == 2
and queue_item[0] == MSG_TYPE_GIT_ALL_FINISHED
):
self._git_update_running = False
try: # Try re-enabling menu item even on error
if hasattr(self, "tools_menu"):
update_index = self.tools_menu.index("Update All Git Tools")
self.tools_menu.entryconfig(
update_index,
state=tk.NORMAL if GIT_ENABLED else tk.DISABLED,
)
except tk.TclError:
pass
# --- Git Actions ---
def _get_single_git_status(self, tool_id: str) -> Dict[str, Any]:
"""
Retrieves the Git status for a single tool.
This method is designed to be passed as a callback to ConfigWindow.
It retrieves the tool's registry entry and calls the appropriate
git_manager function.
Args:
tool_id: The ID of the tool whose status is needed.
Returns:
A dictionary containing the Git status details, compatible with
what ConfigWindow expects and what git_manager.get_repository_status returns.
Returns an error status if Git is disabled or the tool is not found.
"""
self.logger.debug(f"Callback invoked: Get Git status for tool '{tool_id}'")
if not GIT_ENABLED:
return {
"status": GIT_STATUS_GITPYTHON_MISSING,
"message": "GitPython library not available.",
}
if not CORE_MODULES_LOADED:
return {"status": GIT_STATUS_ERROR, "message": "Core modules not loaded."}
# Need the ToolRegistryEntry for the git_manager function
registry = registry_manager.load_registry()
tool_entry = registry_manager.get_tool_config(tool_id, registry)
if not tool_entry:
self.logger.error(
f"Cannot get Git status: Tool '{tool_id}' not found in registry."
)
return {
"status": GIT_STATUS_ERROR,
"message": "Tool not found in registry.",
}
if tool_entry.type != "git":
self.logger.warning(
f"Cannot get Git status: Tool '{tool_id}' is not type 'git'."
)
return {
"status": GIT_STATUS_ERROR,
"message": f"Tool is type '{tool_entry.type}', not 'git'.",
}
try:
# Call the git_manager function to get the status
status_result = git_manager.get_repository_status(tool_entry)
self.logger.debug(f"Git status result for '{tool_id}': {status_result}")
return status_result
except Exception as e:
# Catch errors from git_manager call
self.logger.exception(
f"Error occurred in git_manager.get_repository_status for '{tool_id}'"
)
return {
"status": GIT_STATUS_ERROR,
"message": f"Error checking status: {e}",
}
def _update_single_tool_threaded(self, tool_id: str) -> None:
"""
Starts the update process for a single Git tool in a background thread.
This method is designed to be passed as a callback to ConfigWindow.
Args:
tool_id: The ID of the Git tool to update.
"""
self.logger.info(f"Callback invoked: Update single Git tool '{tool_id}'")
if not GIT_ENABLED:
messagebox.showerror(
"Git Error", GIT_STATUS_GITPYTHON_MISSING, parent=self.root
)
return
if not CORE_MODULES_LOADED:
messagebox.showerror(
"Error", "Cannot update: Core modules not loaded.", parent=self.root
)
return
# Prevent concurrent updates (allow only one Git operation at a time?)
# For now, allow single updates even if batch is running, but might need locking.
# Let's check the *global* flag for now to simplify.
if self._git_update_running:
messagebox.showinfo(
"Update in Progress",
"Another Git update operation (batch?) is currently running. Please wait.",
parent=self.root,
)
return
# Retrieve the tool's registry entry
registry = registry_manager.load_registry()
tool_entry = registry_manager.get_tool_config(tool_id, registry)
if not tool_entry:
messagebox.showerror(
"Error",
f"Cannot update tool '{tool_id}': Not found in registry.",
parent=self.root,
)
return
if tool_entry.type != "git":
messagebox.showerror(
"Error",
f"Cannot update tool '{tool_id}': Not a 'git' type tool.",
parent=self.root,
)
return
self.logger.info(f"Starting background thread for single Git update: {tool_id}")
# Set the global flag to indicate *an* update is running.
# This might be too restrictive if we want parallel single updates.
self._git_update_running = True
# Disable the main menu update option while *any* update runs
try:
if hasattr(self, "tools_menu"):
update_index = self.tools_menu.index("Update All Git Tools")
self.tools_menu.entryconfig(update_index, state=tk.DISABLED)
except tk.TclError:
pass
# Add message to output log
self._append_output(
f"--- Starting Update for Tool: {tool_entry.display_name} ---\n",
("info", "command"),
)
# Create and start the background thread
thread = threading.Thread(
target=self._run_single_git_update_background,
args=(tool_entry,), # Pass the specific entry to the target function
daemon=True,
name=f"{tool_id}_git_update",
)
thread.start()
def _run_single_git_update_background(self, tool_entry: ToolRegistryEntry):
"""
Target function executed in a background thread to update a single Git tool.
Calls git_manager.update_repository and sends results back via the GUI queue.
Args:
tool_entry: The ToolRegistryEntry of the tool to update.
"""
tool_id = tool_entry.id
self.logger.debug(f"Background thread started for single update: {tool_id}")
success = False # Default outcome
message = "Update failed in background thread." # Default message
try:
# Call the git_manager update function, passing the progress callback
success, message = git_manager.update_repository(
tool_entry,
progress_callback=self._git_progress_callback, # Use the shared callback
)
self.logger.info(
f"Single update finished for '{tool_id}'. Success: {success}, Message: {message}"
)
except Exception as e:
self.logger.exception(
f"Unexpected error during single git update for '{tool_id}' in background thread."
)
success = False
message = f"Unexpected error during update: {e}"
# Send error message immediately?
self.gui_queue.put((MSG_TYPE_GIT_ERROR, f"[{tool_id}] {message}"))
finally:
# --- Send Result and Reset State ---
# Send the specific result message for this tool
self.gui_queue.put(
(MSG_TYPE_GIT_SINGLE_RESULT, (tool_id, success, message))
)
# Signal that the update process (this single one) is finished
# We reuse MSG_TYPE_GIT_ALL_FINISHED here to reset the global state
# and trigger UI refresh / tool reload.
# The counts indicate the outcome of this *single* operation.
self.gui_queue.put(
(MSG_TYPE_GIT_ALL_FINISHED, (1 if success else 0, 0 if success else 1))
)
# Note: _git_update_running will be set to False by _handle_git_queue_message
# when it processes the MSG_TYPE_GIT_ALL_FINISHED message.
self.logger.debug(
f"Background thread finished for single update: {tool_id}"
)
def _update_all_git_tools_threaded(self) -> None:
"""
Starts the batch update process for all enabled Git tools
in a background thread.
"""
self.logger.info("User requested 'Update All Git Tools'.")
if not GIT_ENABLED:
messagebox.showerror(
"Git Error", GIT_STATUS_GITPYTHON_MISSING, parent=self.root
)
return
if not CORE_MODULES_LOADED:
messagebox.showerror(
"Error", "Cannot update: Core modules not loaded.", parent=self.root
)
return
if self._git_update_running:
messagebox.showinfo(
"Update in Progress",
"A Git update operation is already running. Please wait.",
parent=self.root,
)
return
self.logger.info("Starting threaded batch update for all enabled Git tools...")
self._git_update_running = True # Set flag to indicate batch update started
# Disable the menu item while running
try:
if hasattr(self, "tools_menu"):
update_index = self.tools_menu.index("Update All Git Tools")
self.tools_menu.entryconfig(update_index, state=tk.DISABLED)
except tk.TclError:
pass
# Add starting message to log
self._append_output(
"--- Starting Batch Update for All Enabled Git Tools ---\n",
("info", "command"),
)
# Create and start the background thread
# Store thread reference? Optional, maybe useful for checking if alive.
self._git_update_thread = threading.Thread(
target=self._run_git_updates_in_background,
daemon=True,
name="git_batch_update",
)
self._git_update_thread.start()
def _git_progress_callback(self, msg_type: str, message: str) -> None:
"""
Callback function passed to git_manager operations.
Puts progress, status, or error messages received from git_manager
onto the thread-safe GUI queue for processing by the main thread.
Args:
msg_type: Type of message ('git_status', 'git_progress', 'git_error').
message: The content of the message.
"""
# Simply put the received type and message onto the queue
self.gui_queue.put((msg_type, message))
def _run_git_updates_in_background(self) -> None:
"""
Target function for the batch Git update background thread.
Iterates through all enabled Git tools in the registry, calls
git_manager.update_repository for each, sends individual results
to the queue, and sends a final completion message.
"""
self.logger.debug("Background thread started for batch Git update.")
success_count = 0
error_count = 0
try:
registry = registry_manager.load_registry()
if not registry:
self.logger.warning(
"Cannot run batch update: Registry is empty or failed to load."
)
self._git_progress_callback(
MSG_TYPE_GIT_ERROR, "Registry empty/unavailable."
)
# Send finished message immediately
self.gui_queue.put((MSG_TYPE_GIT_ALL_FINISHED, (0, 0)))
return
# Filter for enabled Git tools
git_tools_to_update = [
entry for entry in registry if entry.type == "git" and entry.enabled
]
if not git_tools_to_update:
self.logger.info(
"No enabled Git tools found in the registry to update."
)
self._git_progress_callback(
MSG_TYPE_GIT_STATUS, "No enabled Git tools found."
)
self.gui_queue.put((MSG_TYPE_GIT_ALL_FINISHED, (0, 0)))
return
total_tools = len(git_tools_to_update)
self.logger.info(f"Found {total_tools} enabled Git tools to process.")
self._git_progress_callback(
MSG_TYPE_GIT_STATUS, f"Starting update for {total_tools} tool(s)..."
)
# --- Iterate and Update Each Tool ---
for i, tool_entry in enumerate(git_tools_to_update):
tool_id = tool_entry.id
tool_name = tool_entry.display_name
self.logger.debug(
f"Processing tool {i+1}/{total_tools}: '{tool_name}' ({tool_id})"
)
self._git_progress_callback(
MSG_TYPE_GIT_STATUS, f"Updating {i+1}/{total_tools}: {tool_name}..."
)
# Call the update function for this tool
success, message = git_manager.update_repository(
tool_entry, self._git_progress_callback
)
# Send the result for this specific tool to the queue
self.gui_queue.put(
(MSG_TYPE_GIT_SINGLE_RESULT, (tool_id, success, message))
)
if success:
success_count += 1
else:
error_count += 1
self.logger.info(
f"Batch update loop finished. Success: {success_count}, Errors: {error_count}"
)
except Exception as e:
# Catch unexpected errors during the batch update loop
self.logger.exception(
"An unexpected error occurred during the batch Git update process."
)
# Send an error message to the log
self._git_progress_callback(
MSG_TYPE_GIT_ERROR, f"Unexpected batch update error: {e}"
)
# Ensure error count reflects the failure? Maybe increment error_count here.
# error_count += 1 # Or set it based on remaining tools?
finally:
# --- Send Final Completion Message ---
# This runs even if exceptions occurred during the loop
self.logger.debug("Sending final batch update finished message to queue.")
self.gui_queue.put(
(MSG_TYPE_GIT_ALL_FINISHED, (success_count, error_count))
)
self.logger.debug("Background thread finished for batch Git update.")
# --- Configuration Window ---
def _open_tool_config_window(self) -> None:
"""Opens the modal ConfigWindow dialog for the currently selected tool."""
if not self.selected_tool_id:
self.logger.warning("Configure tool requested but no tool selected.")
messagebox.showwarning(
"No Tool Selected",
"Please select a tool from the list before configuring.",
parent=self.root,
)
return
if not CONFIG_WINDOW_ENABLED:
self.logger.error(
"Cannot open config window: ConfigWindow module failed to load."
)
messagebox.showerror(
"Error", "Configuration window is unavailable.", parent=self.root
)
return
# Load the most recent registry data
# This ensures the ConfigWindow gets the current state, especially if saved recently.
self.logger.debug(
f"Loading registry to get config for tool: {self.selected_tool_id}"
)
registry_data = registry_manager.load_registry()
if (
registry_data is None
): # load_registry returns list or potentially None on critical fail? Assume list.
self.logger.error(
"Failed to load registry data. Cannot open config window."
)
messagebox.showerror(
"Error", "Could not load tool registry.", parent=self.root
)
return
# Find the specific configuration entry for the selected tool
tool_config_data = registry_manager.get_tool_config(
self.selected_tool_id, registry_data
)
if not tool_config_data:
self.logger.error(
f"Could not find configuration for tool '{self.selected_tool_id}' in the loaded registry."
)
messagebox.showerror(
"Error",
f"Configuration data for tool '{self.selected_tool_id}' not found.",
parent=self.root,
)
return
self.logger.info(
f"Opening configuration window for tool: {self.selected_tool_id}"
)
try:
# Create and display the ConfigWindow instance
# Pass the necessary data and callback functions
config_win = ConfigWindow(
parent=self.root, # Parent window
tool_id=self.selected_tool_id,
tool_config=tool_config_data, # The specific ToolRegistryEntry
# Callback Functions:
git_status_func=self._get_single_git_status, # For checking Git status
update_func=self._update_single_tool_threaded, # For triggering single update
save_registry_func=registry_manager.save_registry, # For saving changes
get_full_registry_func=registry_manager.load_registry, # For getting registry before save
)
# ConfigWindow handles its own modality (grab_set) and lifecycle.
# No need to call wait_window here.
except Exception as e:
# Catch errors during ConfigWindow instantiation or display
self.logger.exception(
f"Failed to create or show ConfigWindow for '{self.selected_tool_id}'"
)
messagebox.showerror(
"Error", f"Could not open configuration window:\n{e}", parent=self.root
)
# --- Output Logging ---
def _append_output(self, text: str, tags: Optional[Tuple[str, ...]] = None) -> None:
"""
Appends text to the output ScrolledText widget with optional tags for styling.
Ensures the operation is safe (widget exists, state is handled) and
limits the total number of lines in the widget to prevent excessive memory use.
Args:
text: The string to append.
tags: An optional tuple of tag names (strings) to apply to the text.
Tags should be pre-configured using self.output_text.tag_config().
"""
if not hasattr(self, "output_text") or not self.output_text.winfo_exists():
# Silently ignore if the widget doesn't exist (e.g., during shutdown)
return
try:
# --- Limit Output Lines ---
# Get current line count (efficiently, without loading all text)
# index("end-1c") gives the position of the last character
current_line_count = int(self.output_text.index("end-1c").split(".")[0])
if (
current_line_count > MAX_OUTPUT_LINES + 50
): # Add buffer before truncating
# Calculate where to start deleting (delete roughly half the excess)
lines_to_delete = current_line_count - MAX_OUTPUT_LINES
delete_start_index = "1.0"
delete_end_index = f"{lines_to_delete + 1}.0" # Delete line N+1's start
# Add truncation marker only once at the top
# Check if marker already exists (more reliable than boolean flag)
# if not self.output_text.tag_ranges("truncated"): # Needs tag config
# Simplified: Just always delete from top for now.
# marker = "[... Log Truncated ...]\n"
# self.output_text.insert("1.0", marker, ("info",)) # Requires 'truncated' tag?
# Temporarily enable editing
self.output_text.config(state=tk.NORMAL)
# Delete lines from the beginning
self.output_text.delete(delete_start_index, delete_end_index)
# Disable editing again
self.output_text.config(state=tk.DISABLED)
# Log truncation event
# if not hasattr(self, "_last_truncate_log_time") or time.time() - self._last_truncate_log_time > 60: # Log max once per minute
# self.logger.warning(f"Output log truncated. Deleted {lines_to_delete} lines.")
# self._last_truncate_log_time = time.time()
# --- Append Text ---
# Enable widget for modification
self.output_text.config(state=tk.NORMAL)
# Insert text with specified tags (or empty tuple if None)
self.output_text.insert(tk.END, text, tags or ())
# Automatically scroll to the end to show the latest output
self.output_text.see(tk.END)
# Disable widget again to make it read-only
self.output_text.config(state=tk.DISABLED)
except tk.TclError as e:
# Handle Tcl errors, which can occur if the widget is destroyed
# during the operation (e.g., closing window)
self.logger.debug(
f"TclError appending output (widget likely destroyed): {e}"
)
except Exception as e:
# Catch any other unexpected errors during output append
self.logger.exception(
"An unexpected error occurred while appending to the output log."
)
# Attempt to disable the widget again if an error occurred mid-operation
try:
if self.output_text.winfo_exists():
self.output_text.config(state=tk.DISABLED)
except Exception:
pass # Ignore errors during fallback disable
# --- Window Closing ---
def _on_close(self) -> None:
"""Handles the window close event (triggered by WM_DELETE_WINDOW)."""
self.logger.info("Window close requested by user.")
# --- Check for Running Operations ---
# 1. Git Update Running?
if self._git_update_running:
messagebox.showwarning(
"Operation in Progress",
"A Git update task is currently running.\n"
"Please wait for it to complete before closing.",
parent=self.root,
)
self.logger.info("Close cancelled: Git update in progress.")
return # Prevent closing
# 2. Tool Processes Running?
if self.running_workers:
num_workers = len(self.running_workers)
plural = "s" if num_workers > 1 else ""
confirm_msg = (
f"{num_workers} tool process{plural} still seem to be running.\n\n"
f"Closing the application will attempt to terminate these processes.\n\n"
f"Are you sure you want to exit?"
)
if messagebox.askyesno("Confirm Exit", confirm_msg, parent=self.root):
self.logger.warning(
f"User confirmed exit. Attempting to terminate {num_workers} running tool process{plural}."
)
# Create a copy of keys to avoid issues while iterating and deleting
running_ids = list(self.running_workers.keys())
for run_id in running_ids:
worker = self.running_workers.get(run_id)
if worker:
self.logger.debug(
f"Sending terminate signal to worker: {run_id}"
)
try:
worker.terminate()
# Give a very brief moment for signal processing? Optional.
# time.sleep(0.1)
except Exception as term_err:
# Log error but continue terminating others
self.logger.error(
f"Error terminating worker {run_id} on close: {term_err}",
exc_info=True,
)
# Proceed to destroy the window after attempting termination
self.logger.info(
"Proceeding with window destruction after termination attempts."
)
self.root.destroy()
else:
# User cancelled exit
self.logger.info("Close cancelled by user due to running processes.")
return # Prevent closing
else:
# No running processes or Git updates, safe to close
self.logger.info("No running tasks detected. Closing application window.")
self.root.destroy()
# --- End of MainWindow Class ---
# Standard boilerplate for testing this module directly (optional, requires adjustments)
# Running this file directly will likely fail due to reliance on the package structure
# and potentially missing dependencies if core modules failed.
# Use `python -m projectutility` from the repository root for proper execution.
if __name__ == "__main__":
# Basic logging setup ONLY for direct testing run
log_format_test = "%(asctime)s - %(name)s [%(levelname)s] - %(message)s"
logging.basicConfig(
level=logging.DEBUG,
format=log_format_test,
handlers=[logging.StreamHandler(sys.stdout)],
)
logger_test = logging.getLogger(__name__)
logger_test.warning(
"Running MainWindow module directly is intended for basic UI layout checks ONLY."
)
logger_test.warning(
"Functionality relying on core modules or Git might be limited or broken."
)
logger_test.warning(
"Use 'python -m projectutility' from the repo root for full application run."
)
# Example: Setup minimal dummy environment if imports likely failed
if not CORE_MODULES_LOADED:
logger_test.warning(
"Core modules likely failed to load. Using minimal dummies for UI test."
)
# Define dummies again if needed...
if not PROCESS_WORKER_ENABLED:
logger_test.warning("ProcessWorker unavailable.")
if not GIT_ENABLED:
logger_test.warning("Git features unavailable.")
# ... etc ...
try:
test_root = tk.Tk()
test_root.withdraw() # Hide initially
app = MainWindow(test_root)
logger_test.info("MainWindow instantiated for direct test run.")
# test_root.deiconify() # Show window - MainWindow __init__ should do this
test_root.mainloop()
except Exception as e:
logger_test.critical(
"Failed to run MainWindow directly for testing.", exc_info=True
)
# Attempt cleanup
if "test_root" in locals() and test_root.winfo_exists():
test_root.destroy()
sys.exit(1)
finally:
logger_test.info("Direct test run finished.")