SXXXXXXX_ProjectUtility/gui/main_window.py
VALLONGOL 5bfa023bbf Chore: Stop tracking files based on .gitignore update.
Summary:
- Rule "tools/" untracked 8 files.
2025-04-29 12:17:49 +02:00

1380 lines
57 KiB
Python

# ProjectUtility/gui/main_window.py
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext, filedialog
import logging
import queue
import threading
import os
import sys
import json
from typing import Dict, Any, Optional, List, Tuple
# --- INIZIO BLOCCO IMPORT CORRETTO E ROBUSTO ---
# Determine the application's root directory more robustly
# Assumes gui is one level down from the project root where ProjectUtility.py resides
_project_root = None
try:
# Path to the gui directory itself
_gui_dir = os.path.dirname(os.path.abspath(__file__))
# Path to the parent directory (should be the project root)
_project_root = os.path.dirname(_gui_dir)
# Add project root to sys.path if it's not already there
# This makes imports like 'from core...' work regardless of how the script is run
if _project_root not in sys.path:
sys.path.insert(0, _project_root)
logging.getLogger(__name__).debug(
f"Added project root to sys.path: {_project_root}"
)
else:
logging.getLogger(__name__).debug(
f"Project root already in sys.path: {_project_root}"
)
except Exception as e:
logging.getLogger(__name__).critical(
f"Failed to determine project root directory: {e}", exc_info=True
)
# Cannot proceed without project root for imports
raise RuntimeError("Could not determine project structure for imports.") from e
# Now perform the imports using absolute paths from the project root
try:
# These imports should now work because _project_root is in sys.path
from core.models import ToolInfo, ToolParameter
from core.tool_discovery import discover_tools
from gui.process_worker import ProcessWorker # Absolute import from project root
TOOL_DISCOVERY_ENABLED = True
PROCESS_WORKER_ENABLED = True
logging.getLogger(__name__).debug("Core and GUI modules imported successfully.")
except ImportError as e:
# This block now ONLY catches genuine import errors AFTER path setup
logging.getLogger(__name__).critical(
f"ImportError after setting path. Check module existence/dependencies. Error: {e}",
exc_info=True,
)
TOOL_DISCOVERY_ENABLED = False
PROCESS_WORKER_ENABLED = False
# Define dummy classes if needed for static analysis or basic structure
from collections import namedtuple
ToolParameter = namedtuple(
"ToolParameter",
["name", "label", "type", "required", "default", "description", "options"],
)
ToolInfo = namedtuple(
"ToolInfo",
[
"id",
"display_name",
"description",
"command",
"working_dir",
"parameters",
"version",
"has_gui",
],
) # Added has_gui
class ProcessWorker: # Dummy ProcessWorker
def __init__(self, *args, **kwargs):
pass
def run(self):
pass
def terminate(self):
pass
# Add dummy tool_info if needed by handlers
class DummyToolInfo:
display_name = "Dummy (Import Failed)"
has_gui = False
tool_info = DummyToolInfo()
except Exception as e:
# Catch any other unexpected error during the import phase
logging.getLogger(__name__).critical(
f"Unexpected error during imports: {e}", exc_info=True
)
TOOL_DISCOVERY_ENABLED = False
PROCESS_WORKER_ENABLED = False
# Define dummies again just in case
from collections import namedtuple
ToolParameter = namedtuple(
"ToolParameter",
["name", "label", "type", "required", "default", "description", "options"],
)
ToolInfo = namedtuple(
"ToolInfo",
[
"id",
"display_name",
"description",
"command",
"working_dir",
"parameters",
"version",
"has_gui",
],
) # Added has_gui
class ProcessWorker:
def __init__(self, *args, **kwargs):
pass
def run(self):
pass
def terminate(self):
pass
class DummyToolInfo:
display_name = "Dummy (Import Failed)"
has_gui = False
tool_info = DummyToolInfo()
# --- FINE BLOCCO IMPORT CORRETTO E ROBUSTO ---
# --- Constants ---
MAX_OUTPUT_LINES = 1000
# Use the calculated project root for config directory if available
APP_ROOT_DIR = (
_project_root if _project_root else os.getcwd()
) # Fallback to current dir if root calc failed
CONFIG_DIR = os.path.join(APP_ROOT_DIR, "config")
STATE_FILENAME = os.path.join(CONFIG_DIR, "tool_state.json")
# --- Inizio Classe MainWindow ---
class MainWindow:
"""
The main application window for ProjectUtility using Tkinter.
Manages the display of available tools, dynamic tool parameter inputs,
and process output. Handles launching tools in background threads and
persists last used parameters. Shows and manages running processes.
"""
def __init__(self, root: tk.Tk) -> None:
"""Initializes the main application window."""
self.root = root
self.logger = logging.getLogger(__name__)
self.gui_queue = queue.Queue()
self.available_tools: dict[str, ToolInfo] = {}
self.tools_in_listbox: list[str] = []
self.selected_tool_id: str | None = None
self.running_workers: dict[str, ProcessWorker] = {}
# Key: parameter name, Value: tuple (widget_instance, tk_variable)
self.current_parameter_widgets: Dict[str, Tuple[tk.Widget, tk.Variable]] = {}
# Variable for process termination
self.selected_run_id_for_termination: Optional[str] = None
# Load initial tool state (last used parameters)
self.tool_state: Dict[str, Dict[str, Any]] = self._load_tool_state()
# Setup UI components
self._setup_window()
self._setup_ui_styles() # Setup styles first
self._setup_ui()
self._load_tools() # Load tools after UI is ready
self._start_queue_processing() # Start checking for messages from workers
# Make window visible only after everything is set up
self.root.deiconify()
self.logger.info("MainWindow initialized and displayed.")
def _load_tool_state(self) -> Dict[str, Dict[str, Any]]:
"""Loads the last used parameter values from the state file."""
if not os.path.isfile(STATE_FILENAME):
self.logger.info(
f"State file not found ({STATE_FILENAME}). Starting with empty state."
)
return {}
try:
with open(STATE_FILENAME, "r", encoding="utf-8") as f:
state_data = json.load(f)
if not isinstance(state_data, dict):
self.logger.warning(
f"Invalid format in state file ({STATE_FILENAME}). Expected dictionary. Resetting state."
)
return {}
self.logger.info(f"Successfully loaded tool state from {STATE_FILENAME}")
return state_data
except json.JSONDecodeError as e:
self.logger.error(
f"Failed to parse state file ({STATE_FILENAME}): {e}. Resetting state."
)
return {}
except Exception as e:
self.logger.exception(
f"An unexpected error occurred loading state file {STATE_FILENAME}. Resetting state."
)
return {}
def _save_tool_state(self, tool_id: str, parameters_used: Dict[str, Any]) -> None:
"""Saves the used parameters for a specific tool to the state file."""
if not tool_id:
self.logger.warning("Attempted to save state without a valid tool_id.")
return
self.logger.info(
f"Saving state for tool '{tool_id}' with parameters: {parameters_used}"
)
try:
os.makedirs(CONFIG_DIR, exist_ok=True)
except OSError as e:
self.logger.error(
f"Failed to create config directory '{CONFIG_DIR}': {e}. Cannot save state."
)
return
if tool_id not in self.tool_state:
self.tool_state[tool_id] = {}
self.tool_state[tool_id].update(parameters_used)
temp_filename = STATE_FILENAME + ".tmp"
try:
with open(temp_filename, "w", encoding="utf-8") as f:
json.dump(self.tool_state, f, indent=4, ensure_ascii=False)
os.replace(temp_filename, STATE_FILENAME)
self.logger.debug(f"Tool state saved successfully to {STATE_FILENAME}")
except (TypeError, ValueError) as e:
self.logger.error(
f"Failed to serialize state to JSON for {STATE_FILENAME}: {e}. State not saved.",
exc_info=True,
)
if os.path.exists(temp_filename):
os.remove(temp_filename)
except (IOError, OSError) as e:
self.logger.error(
f"Failed to write or replace state file {STATE_FILENAME}: {e}. State not saved.",
exc_info=True,
)
if os.path.exists(temp_filename):
os.remove(temp_filename)
except Exception as e:
self.logger.exception(
f"An unexpected error occurred saving state file {STATE_FILENAME}."
)
if os.path.exists(temp_filename):
os.remove(temp_filename)
def _setup_window(self) -> None:
"""Configures the main Tkinter window."""
self.root.title("Project Utility")
self.root.minsize(750, 600) # Adjusted minimum size
self.root.geometry("950x750") # Adjusted initial size
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
self.logger.debug("Main window configured.")
def _setup_ui_styles(self) -> None:
"""Sets up custom ttk styles used in the UI."""
style = ttk.Style(self.root)
try:
available_themes = style.theme_names()
for theme in ["clam", "alt", "default"]:
if theme in available_themes:
style.theme_use(theme)
self.logger.debug(f"Using ttk theme: {theme}")
break
except tk.TclError:
self.logger.warning("Could not set ttk theme.")
style.configure("Italic.TLabel", font=("Segoe UI", 9, "italic"))
style.configure(
"Placeholder.TLabel", foreground="gray", font=("Segoe UI", 9, "italic")
)
try:
lf_bg = style.lookup("TLabelFrame", "background")
except tk.TclError:
lf_bg = "#f0f0f0" # Fallback
style.configure("ParamContainer.TFrame", background=lf_bg)
style.configure("Preview.TFrame", background="#ffffff")
style.configure("Preview.TLabel", background="#ffffff")
def _setup_ui(self) -> None:
"""Creates and arranges the main UI widgets."""
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
# --- Main Frame (diviso ORA in 2 righe principali) ---
# Riga 0: Tool list (col 0) | Dettagli/Log/Run (col 1)
# Riga 1: Process List (col 0, span 2)
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky="nsew")
# Configura colonne per la riga 0
main_frame.columnconfigure(0, weight=0) # Colonna lista tool (larghezza fissa)
main_frame.columnconfigure(1, weight=1) # Colonna dettagli (espande)
# Configura righe per main_frame
main_frame.rowconfigure(
0, weight=1
) # Riga 0 (tool+dettagli) espande verticalmente
main_frame.rowconfigure(1, weight=0) # Riga 1 (processi) altezza fissa o minima
# --- Riga 0, Colonna 0: Tool List ---
tool_list_frame = ttk.Frame(main_frame, padding=(0, 0, 5, 0))
tool_list_frame.grid(
row=0, column=0, sticky="nsew", padx=(0, 5)
) # Posizionato in main_frame[0,0]
tool_list_frame.rowconfigure(1, weight=1) # Listbox espande
ttk.Label(tool_list_frame, text="Available Tools:", font="-weight bold").grid(
row=0, column=0, sticky="w", pady=(0, 5)
)
# Listbox + Scrollbar
self.tool_listbox = tk.Listbox(tool_list_frame, exportselection=False, width=25)
self.tool_listbox.grid(row=1, column=0, sticky="nsew")
listbox_scrollbar = ttk.Scrollbar(
tool_list_frame, orient=tk.VERTICAL, command=self.tool_listbox.yview
)
listbox_scrollbar.grid(row=1, column=1, sticky="ns")
self.tool_listbox.config(yscrollcommand=listbox_scrollbar.set)
self.tool_listbox.bind("<<ListboxSelect>>", self._on_tool_selected)
# --- Riga 0, Colonna 1: Right Pane (Controlli + Log + Run Button) ---
right_pane = ttk.Frame(main_frame)
right_pane.grid(
row=0, column=1, sticky="nsew"
) # Posizionato in main_frame[0,1]
right_pane.columnconfigure(0, weight=1) # Contenuto espande orizzontalmente
# Configura righe interne del right_pane ORA:
# Riga 0: Controlli Tool
# Riga 1: Output Log
# Riga 2: Run Button
right_pane.rowconfigure(
0, weight=0
) # Controlli non espandono verticalmente (scroll)
right_pane.rowconfigure(1, weight=1) # Output Log espande verticalmente
right_pane.rowconfigure(2, weight=0) # Run Button altezza fissa
# --- Tool Controls/Options Area (Scrollable) ---
# (Come prima, ma posizionato in right_pane[0,0])
controls_outer_frame = ttk.LabelFrame(
right_pane, text="Tool Options", padding="5"
)
controls_outer_frame.grid(row=0, column=0, sticky="nsew", pady=(0, 10))
controls_outer_frame.rowconfigure(0, weight=0)
controls_outer_frame.columnconfigure(0, weight=1)
controls_canvas = tk.Canvas(
controls_outer_frame, borderwidth=0, highlightthickness=0
)
controls_scrollbar = ttk.Scrollbar(
controls_outer_frame, orient="vertical", command=controls_canvas.yview
)
self.tool_controls_frame = ttk.Frame(
controls_canvas, style="ParamContainer.TFrame", padding="5"
)
self.tool_controls_frame.bind(
"<Configure>",
lambda e: controls_canvas.configure(
scrollregion=controls_canvas.bbox("all")
),
)
controls_canvas_window = controls_canvas.create_window(
(0, 0), window=self.tool_controls_frame, anchor="nw"
)
controls_canvas.configure(yscrollcommand=controls_scrollbar.set)
controls_canvas.bind(
"<Configure>",
lambda e: self.root.after_idle(
lambda: controls_canvas.itemconfig(
controls_canvas_window, width=e.width
)
),
)
controls_canvas.grid(row=0, column=0, sticky="nsew")
controls_scrollbar.grid(row=0, column=1, sticky="ns")
self.controls_placeholder_label = ttk.Label(
self.tool_controls_frame,
text="Select a tool from the list.",
style="Placeholder.TLabel",
)
self.controls_placeholder_label.pack(padx=5, pady=20)
# --- Output Log Area ---
# (Come prima, ma posizionato in right_pane[1,0])
output_frame = ttk.LabelFrame(right_pane, text="Output Log", padding="5")
output_frame.grid(
row=1, column=0, sticky="nsew", pady=(0, 5)
) # Modificato row a 1
output_frame.rowconfigure(0, weight=1)
output_frame.columnconfigure(0, weight=1)
self.output_text = scrolledtext.ScrolledText(
output_frame,
wrap=tk.WORD,
state=tk.DISABLED,
height=15,
relief=tk.SUNKEN,
borderwidth=1,
) # Altezza aumentata
self.output_text.grid(row=0, column=0, sticky="nsew")
# ... (configurazione tag output come prima) ...
self.output_text.tag_config("error", foreground="#D63031")
self.output_text.tag_config("info", foreground="#0984E3")
self.output_text.tag_config("success", foreground="#00B894")
self.output_text.tag_config(
"command", foreground="#6C5CE7", font="-weight bold"
)
self.output_text.tag_config("run_id", foreground="#636E72", font="-size 8")
self.output_text.tag_config("log_warning", foreground="#E17055")
self.output_text.tag_config("log_debug", foreground="#B2BEC3")
self.output_text.tag_config(
"log_info", foreground=self.output_text.tag_cget("info", "foreground")
)
self.output_text.tag_config(
"log_error", foreground=self.output_text.tag_cget("error", "foreground")
)
self.output_text.tag_config("raw_stdout", foreground="#2D3436")
# --- Run Button Area ---
# (Come prima, ma posizionato in right_pane[2,0])
button_frame = ttk.Frame(right_pane, padding=(0, 5, 0, 0))
button_frame.grid(row=2, column=0, sticky="ew") # Modificato row a 2
button_frame.columnconfigure(0, weight=1)
self.run_button = ttk.Button(
button_frame, text="Run Tool", command=self._launch_tool, state=tk.DISABLED
)
self.run_button.grid(row=0, column=1, sticky="e", padx=5)
# --- Riga 1: Running Processes Area (Full Width) ---
# (Come prima, ma posizionato in main_frame[1,0] con columnspan=2)
process_frame = ttk.LabelFrame(main_frame, text="Running Tools", padding="5")
process_frame.grid(
row=1, column=0, columnspan=2, sticky="nsew", pady=(10, 0)
) # Posizionato in main_frame[1,0] span 2
process_frame.columnconfigure(0, weight=1) # Treeview espande orizzontalmente
process_frame.rowconfigure(0, weight=0) # Riga Treeview altezza fissa/minima
# Treeview widget
self.process_tree = ttk.Treeview(
process_frame,
columns=("run_id", "tool_name", "status", "pid"),
show="headings",
selectmode="browse",
height=5, # Altezza in righe leggermente aumentata
)
# Definisci colonne
self.process_tree.heading("run_id", text="Run ID", anchor=tk.W)
self.process_tree.column("run_id", width=140, anchor=tk.W, stretch=tk.NO)
self.process_tree.heading("tool_name", text="Tool Name", anchor=tk.W)
self.process_tree.column(
"tool_name", width=200, anchor=tk.W, stretch=tk.YES
) # Più spazio per nome
self.process_tree.heading("status", text="Status", anchor=tk.CENTER)
self.process_tree.column("status", width=100, anchor=tk.CENTER, stretch=tk.NO)
self.process_tree.heading("pid", text="PID", anchor=tk.E)
self.process_tree.column("pid", width=70, anchor=tk.E, stretch=tk.NO)
# Scrollbars
tree_scroll_y = ttk.Scrollbar(
process_frame, orient="vertical", command=self.process_tree.yview
)
tree_scroll_x = ttk.Scrollbar(
process_frame, orient="horizontal", command=self.process_tree.xview
)
self.process_tree.configure(
yscrollcommand=tree_scroll_y.set, xscrollcommand=tree_scroll_x.set
)
# Grid Treeview e scrollbars
self.process_tree.grid(row=0, column=0, sticky="nsew")
tree_scroll_y.grid(row=0, column=1, sticky="ns")
tree_scroll_x.grid(
row=1, column=0, columnspan=2, sticky="ew"
) # X scrollbar span 2
# Terminate button frame
process_button_frame = ttk.Frame(process_frame)
process_button_frame.grid(
row=2, column=0, columnspan=2, sticky="ew", pady=(5, 0)
)
process_button_frame.columnconfigure(0, weight=1)
self.terminate_button = ttk.Button(
process_button_frame,
text="Terminate Selected",
command=self._terminate_selected_process,
state=tk.DISABLED,
)
self.terminate_button.grid(row=0, column=1, sticky="e")
# Bind selection
self.process_tree.bind("<<TreeviewSelect>>", self._on_process_selected)
self.logger.debug("UI setup complete with new layout.")
def _load_tools(self) -> None:
"""Discovers tools and populates the tool listbox."""
self.logger.info("Loading available tools...")
self.tool_listbox.delete(0, tk.END)
self.available_tools.clear()
self.tools_in_listbox.clear()
if self.selected_tool_id:
self._clear_tool_controls()
else:
self.run_button.config(state=tk.DISABLED)
if not TOOL_DISCOVERY_ENABLED:
messagebox.showwarning(
"Tool Discovery Disabled",
"Could not load tool discovery module.",
parent=self.root,
)
self._clear_tool_controls()
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.config(
text="Tool Discovery Module Missing."
)
return
try:
tools_dir = os.path.join(APP_ROOT_DIR, "tools")
if not os.path.isdir(tools_dir):
raise FileNotFoundError(f"Tools directory does not exist: {tools_dir}")
self.available_tools = discover_tools(tools_dir)
if not self.available_tools:
self.logger.warning(f"No valid tools found in '{tools_dir}'.")
self._clear_tool_controls()
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.config(text="No tools found.")
return
sorted_tool_items = sorted(
self.available_tools.items(), key=lambda item: item[1].display_name
)
for tool_id, tool_info in sorted_tool_items:
if isinstance(tool_info, ToolInfo):
self.tool_listbox.insert(tk.END, tool_info.display_name)
self.tools_in_listbox.append(tool_id)
self.logger.info(f"Finished loading {len(self.tools_in_listbox)} tools.")
except FileNotFoundError as e:
self.logger.error(f"Tools directory error: {e}")
messagebox.showerror(
"Error Loading Tools", f"Tools directory error:\n{e}", parent=self.root
)
self._clear_tool_controls()
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.config(text="Error: Tools directory.")
except Exception as e:
self.logger.exception("Unexpected error during tool discovery.")
messagebox.showerror(
"Error Loading Tools",
f"Unexpected error loading tools:\n{e}",
parent=self.root,
)
self._clear_tool_controls()
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.config(text="Error loading tools.")
def _clear_parameter_widgets_only(self) -> None:
"""Clears only the parameter widgets from the controls frame and the widget dictionary."""
self.logger.debug("Clearing parameter widgets area.")
self.current_parameter_widgets.clear()
if (
hasattr(self, "tool_controls_frame")
and self.tool_controls_frame.winfo_exists()
):
widgets_to_destroy = list(self.tool_controls_frame.winfo_children())
for widget in widgets_to_destroy:
# Avoid destroying the placeholder label itself here
if widget != getattr(self, "controls_placeholder_label", None):
widget.destroy()
# Ensure placeholder is visible if frame becomes empty
if (
not self.tool_controls_frame.winfo_children()
and hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.pack(
padx=5, pady=20
) # Ensure it's packed
def _clear_tool_controls(self) -> None:
"""Resets the entire tool selection state and clears control widgets."""
self.logger.debug("Resetting tool selection and clearing controls.")
self._clear_parameter_widgets_only() # Reuse widget clearing
self.selected_tool_id = None # Reset selection ID
if hasattr(self, "run_button"):
self.run_button.config(state=tk.DISABLED)
# Ensure placeholder is visible after full clear
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.pack(padx=5, pady=20)
def _on_tool_selected(self, event: tk.Event) -> None:
"""Handles the selection change in the tool listbox and builds parameter UI."""
if not self.tool_listbox.winfo_exists():
return
selected_indices = self.tool_listbox.curselection()
if not selected_indices:
return
selected_index = selected_indices[0]
if not (0 <= selected_index < len(self.tools_in_listbox)):
self.logger.warning(f"Invalid listbox index: {selected_index}.")
self._clear_tool_controls()
return
new_selected_tool_id = self.tools_in_listbox[selected_index]
if new_selected_tool_id == self.selected_tool_id:
return
self.selected_tool_id = new_selected_tool_id
self.logger.info(f"Tool selected: {self.selected_tool_id}")
selected_tool_info = self.available_tools.get(self.selected_tool_id)
if not selected_tool_info:
self.logger.error(
f"Internal Error: ToolInfo not found for ID '{self.selected_tool_id}'."
)
self._clear_tool_controls()
return
last_used_params = self.tool_state.get(self.selected_tool_id, {})
self._clear_parameter_widgets_only() # Clear only params, keep selection ID
if (
hasattr(self, "controls_placeholder_label")
and self.controls_placeholder_label.winfo_exists()
):
self.controls_placeholder_label.pack_forget() # Hide placeholder
ttk.Label(
self.tool_controls_frame,
text=f"{selected_tool_info.display_name}",
font="-weight bold",
).pack(anchor="w", pady=(0, 5), padx=5)
ttk.Label(
self.tool_controls_frame,
text=selected_tool_info.description,
wraplength=450,
).pack(anchor="w", pady=(0, 10), padx=5)
if selected_tool_info.parameters:
ttk.Separator(self.tool_controls_frame, orient="horizontal").pack(
fill="x", pady=10, padx=5
)
param_container = ttk.Frame(
self.tool_controls_frame, style="ParamContainer.TFrame"
)
param_container.pack(fill="x", expand=True, padx=5)
param_container.columnconfigure(1, weight=1)
for i, param_def in enumerate(selected_tool_info.parameters):
self._create_parameter_widget(
param_container, i, param_def, last_used_params
)
else:
ttk.Label(
self.tool_controls_frame,
text="This tool requires no parameters.",
style="Italic.TLabel",
).pack(anchor="w", padx=5, pady=10)
self.tool_controls_frame.update_idletasks()
canvas_widget = self.tool_controls_frame.master
if isinstance(canvas_widget, tk.Canvas):
canvas_widget.configure(scrollregion=canvas_widget.bbox("all"))
canvas_widget.yview_moveto(0)
self.run_button.config(
state=tk.NORMAL if PROCESS_WORKER_ENABLED else tk.DISABLED
)
if not PROCESS_WORKER_ENABLED:
ttk.Label(
self.tool_controls_frame,
text="Process execution disabled.",
foreground="orange",
).pack(anchor="w", pady=(10, 0), padx=5)
def _create_parameter_widget(
self,
parent_frame: ttk.Frame,
row_index: int,
param_def: ToolParameter,
last_used_values: Dict[str, Any],
):
"""Creates the appropriate Label and input Widget for a ToolParameter."""
param_name = param_def.name
label_text = f"{param_def.label}{'*' if param_def.required else ''}"
widget_variable = None
input_widget = None
initial_value = last_used_values.get(param_name, param_def.default)
self.logger.debug(
f"Param '{param_name}': InitialValue='{initial_value}' (Type: {type(initial_value)})"
)
label = ttk.Label(parent_frame, text=label_text)
label.grid(row=row_index, column=0, sticky="w", padx=(0, 10), pady=3)
param_type = param_def.type.lower()
try:
if param_type == "string":
widget_variable = tk.StringVar()
input_widget = ttk.Entry(
parent_frame, textvariable=widget_variable, width=40
)
elif param_type == "integer":
widget_variable = tk.IntVar()
input_widget = ttk.Spinbox(
parent_frame,
from_=-(2**31),
to=2**31 - 1,
textvariable=widget_variable,
width=15,
)
elif param_type == "float":
widget_variable = tk.DoubleVar()
input_widget = ttk.Spinbox(
parent_frame,
from_=-float("inf"),
to=float("inf"),
format="%.6g",
textvariable=widget_variable,
width=15,
)
elif param_type == "boolean":
widget_variable = tk.BooleanVar()
input_widget = ttk.Checkbutton(
parent_frame, variable=widget_variable, style="TCheckbutton"
)
elif param_type == "file" or param_type == "folder":
widget_variable = tk.StringVar()
entry_frame = ttk.Frame(parent_frame, style="ParamContainer.TFrame")
display_entry = ttk.Entry(
entry_frame,
textvariable=widget_variable,
width=35,
state="readonly",
)
browse_button = ttk.Button(
entry_frame,
text="Browse...",
style="TButton",
command=lambda p=param_def, v=widget_variable: self._browse_path(
p, v
),
)
display_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
browse_button.pack(side=tk.LEFT, padx=(5, 0))
input_widget = entry_frame
else:
self.logger.warning(
f"Unsupported type '{param_def.type}'. Treating as string."
)
widget_variable = tk.StringVar()
input_widget = ttk.Entry(
parent_frame, textvariable=widget_variable, width=40
)
# Set initial value after widget creation
if initial_value is not None and widget_variable is not None:
if param_type == "integer":
try:
widget_variable.set(int(initial_value))
except (ValueError, TypeError):
widget_variable.set(0)
self.logger.warning(f"Invalid saved int value for {param_name}")
elif param_type == "float":
try:
widget_variable.set(float(initial_value))
except (ValueError, TypeError):
widget_variable.set(0.0)
self.logger.warning(
f"Invalid saved float value for {param_name}"
)
elif param_type == "boolean":
if isinstance(initial_value, str):
widget_variable.set(
initial_value.lower() in ["true", "1", "yes", "on"]
)
else:
widget_variable.set(bool(initial_value))
else: # String, File, Folder, Unknown
widget_variable.set(str(initial_value))
elif (
widget_variable is not None
): # Set default for typed vars if initial_value is None
if param_type == "integer":
widget_variable.set(0)
elif param_type == "float":
widget_variable.set(0.0)
elif param_type == "boolean":
widget_variable.set(False)
except Exception as e:
self.logger.error(
f"Error creating widget for '{param_name}': {e}", exc_info=True
)
widget_variable = tk.StringVar(value="Error!")
input_widget = ttk.Entry(
parent_frame, textvariable=widget_variable, width=40, state="disabled"
)
if input_widget is not None and widget_variable is not None:
input_widget.grid(row=row_index, column=1, sticky="ew", padx=0, pady=3)
self.current_parameter_widgets[param_name] = (input_widget, widget_variable)
else:
self.logger.error(f"Failed widget/variable pair for '{param_name}'")
def _browse_path(self, param_def: ToolParameter, var: tk.StringVar):
"""Opens a file or folder dialog based on parameter type."""
param_type = param_def.type.lower()
current_value = var.get()
initial_dir = (
os.path.dirname(current_value)
if current_value and os.path.isdir(os.path.dirname(current_value))
else APP_ROOT_DIR
)
file_types = []
options = param_def.options or {}
if isinstance(options.get("filter"), list):
try:
file_types = [
(ft.get("name", "?"), ft.get("pattern", "*.*"))
for ft in options["filter"]
if isinstance(ft, dict)
]
except Exception as e:
self.logger.warning(f"Err parsing filter: {e}")
if not file_types:
file_types = [("All Files", "*.*")]
path = None
try:
if param_type == "file":
save_as = options.get("save_as", False)
dialog_func = (
filedialog.asksaveasfilename
if save_as
else filedialog.askopenfilename
)
path = dialog_func(
parent=self.root,
title=f"Select Path for {param_def.label}",
initialdir=initial_dir,
initialfile=(
os.path.basename(current_value) if current_value else ""
),
filetypes=file_types,
defaultextension=(
next(
(
".{p}".format(p=pat.split(".")[-1])
for _, pat in file_types
if "*." in pat and "." in pat
),
None,
)
if save_as
else None
),
)
elif param_type == "folder":
path = filedialog.askdirectory(
parent=self.root,
title=f"Select Folder for {param_def.label}",
initialdir=(
initial_dir if os.path.isdir(initial_dir) else APP_ROOT_DIR
),
)
else:
self.logger.error(f"Bad call to _browse_path type '{param_type}'")
return
if path:
var.set(path)
self.logger.debug(f"Path set for '{param_name}': {path}")
else:
self.logger.debug(f"Path selection cancelled for '{param_name}'.")
except Exception as e:
self.logger.exception(f"Dialog error for '{param_name}'")
messagebox.showerror("Dialog Error", f"Error:\n{e}", parent=self.root)
def _on_process_selected(self, event: tk.Event) -> None:
"""Handles selection changes in the running processes Treeview."""
if not self.process_tree.winfo_exists():
return # Check widget exists
selected_items = self.process_tree.selection()
if not selected_items:
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
return
selected_item_id = selected_items[0]
self.selected_run_id_for_termination = selected_item_id
if self.selected_run_id_for_termination in self.running_workers:
self.terminate_button.config(state=tk.NORMAL)
self.logger.debug(
f"Process selected: {self.selected_run_id_for_termination}"
)
else: # Process finished between selection and check
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
def _terminate_selected_process(self) -> None:
"""Attempts to terminate the process selected in the Treeview."""
if not self.selected_run_id_for_termination:
return
run_id = self.selected_run_id_for_termination
worker = self.running_workers.get(run_id)
if not worker:
messagebox.showinfo(
"Process Finished",
f"Process {run_id} already finished.",
parent=self.root,
)
if self.process_tree.exists(run_id):
self.process_tree.delete(run_id)
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
return
tool_name = (
worker.tool_info.display_name if hasattr(worker, "tool_info") else run_id
)
if messagebox.askyesno(
"Confirm Termination",
f"Terminate '{tool_name}' (Run ID: {run_id})?",
parent=self.root,
):
self.logger.info(f"Requesting termination for {run_id}...")
try: # Update status in treeview visually
if self.process_tree.exists(run_id):
self.process_tree.set(
run_id, column="status", value="Terminating..."
)
except tk.TclError:
pass # Ignore if item gone
try: # Call terminate
worker.terminate()
self.logger.info(f"Terminate signal sent to {run_id}.")
except Exception as e:
self.logger.exception(f"Error terminating {run_id}")
messagebox.showerror(
"Termination Error", f"Error:\n{e}", parent=self.root
)
finally: # Always disable button and clear selection after attempt
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
if (
self.process_tree.winfo_exists() and self.process_tree.selection()
): # Deselect in tree
self.process_tree.selection_remove(self.process_tree.selection())
else:
self.logger.info("Termination cancelled.")
def _launch_tool(self) -> None:
"""Gathers parameters, saves state, validates, and launches the selected tool."""
if not self.selected_tool_id or not PROCESS_WORKER_ENABLED:
messagebox.showwarning(
"Cannot Run Tool", "Please select a tool first.", parent=self.root
)
return
selected_tool_info = self.available_tools.get(self.selected_tool_id)
if not selected_tool_info:
messagebox.showerror(
"Internal Error", "Tool info missing.", parent=self.root
)
return
parameters_to_pass = {}
validation_errors = []
for param_name, (_, tk_var) in self.current_parameter_widgets.items():
param_def = next(
(p for p in selected_tool_info.parameters if p.name == param_name), None
)
if not param_def:
continue
try:
value = tk_var.get()
if param_def.required and isinstance(value, str) and not value.strip():
validation_errors.append(f"- '{param_def.label}' is required.")
# Add more validation here if needed
parameters_to_pass[param_name] = value
except tk.TclError as e:
validation_errors.append(f"- Invalid value for '{param_def.label}'.")
except Exception as e:
validation_errors.append(f"- Error reading '{param_def.label}'.")
self.logger.exception(f"Err reading {param_name}")
if validation_errors:
messagebox.showerror(
"Invalid Parameters",
"Errors:\n\n" + "\n".join(validation_errors),
parent=self.root,
)
return
try:
self._save_tool_state(self.selected_tool_id, parameters_to_pass)
except Exception:
self.logger.exception("Error saving state.")
if not PROCESS_WORKER_ENABLED:
messagebox.showerror(
"Error", "Process Worker unavailable.", parent=self.root
)
return
try:
run_id = f"{self.selected_tool_id}_{os.urandom(4).hex()}"
pid = "Starting..." # Placeholder PID before Popen returns
# Add to Treeview immediately with "Starting" status
if hasattr(self, "process_tree"): # Check if treeview exists
self.process_tree.insert(
parent="",
index=tk.END,
iid=run_id,
values=(run_id, selected_tool_info.display_name, "Starting", pid),
)
self._append_output(f"[{run_id}] ", ("run_id",))
self._append_output(
f"-- Starting: {selected_tool_info.display_name} --\n", ("info",)
)
worker = ProcessWorker(
run_id, selected_tool_info, parameters_to_pass, self.gui_queue
)
thread = threading.Thread(target=worker.run, daemon=True)
self.running_workers[run_id] = worker
thread.start()
# Update PID in treeview after worker._process is likely populated (slight delay needed)
self.root.after(100, self._update_pid_in_treeview, run_id, worker)
self.logger.info(f"Worker thread started for run ID: {run_id}")
except Exception as e:
self.logger.exception(f"Failed to launch tool '{self.selected_tool_id}'")
messagebox.showerror(
"Launch Error", f"Could not start process:\n{e}", parent=self.root
)
self._append_output(
f"ERROR: Failed launch for {selected_tool_info.display_name}\n",
("error",),
)
if hasattr(self, "process_tree") and self.process_tree.exists(
run_id
): # Remove from tree if launch failed
self.process_tree.delete(run_id)
def _update_pid_in_treeview(self, run_id: str, worker: ProcessWorker):
"""Callback to update PID in treeview shortly after process start."""
try:
if self.process_tree.exists(run_id):
pid = worker._process.pid if worker and worker._process else "N/A"
self.process_tree.set(run_id, column="pid", value=pid)
# Also update status to "Running" if it was "Starting"
current_status = self.process_tree.set(run_id, column="status")
if current_status == "Starting":
self.process_tree.set(run_id, column="status", value="Running")
except tk.TclError:
pass # Item might be gone
except Exception as e:
self.logger.error(f"Error updating PID for {run_id} in treeview: {e}")
def _start_queue_processing(self) -> None:
"""Starts the periodic check of the GUI queue."""
self.logger.debug("Starting GUI queue processing loop.")
if self.root.winfo_exists():
self.root.after(100, self._process_queue)
def _process_queue(self) -> None:
"""Processes all available messages from the worker threads queue."""
try:
# --- INIZIO CORREZIONE ---
while True: # Process all messages currently in the queue without blocking
message = (
self.gui_queue.get_nowait()
) # Get message if available AND store it in 'message'
self._handle_worker_message(message) # Handle the received message
self.gui_queue.task_done() # Indicate message processing is complete
# --- FINE CORREZIONE ---
except queue.Empty:
# Queue is empty, this is the normal case, do nothing
pass
except Exception: # Catch any other errors during message handling
self.logger.exception("Error processing message from GUI queue.")
finally:
# Crucial: Reschedule the check ONLY if the root window still exists
if self.root.winfo_exists():
self.root.after(100, self._process_queue) # Check again after 100ms
else:
self.logger.info("Root window destroyed, stopping queue processing.")
def _handle_worker_message(self, message: dict) -> None:
"""Handles a single message received from a worker thread via the queue,
parsing structured JSON from stdout if possible."""
msg_type = message.get("type")
run_id = message.get("run_id", "NO_RUN_ID")
data_str = message.get("data", "") # Raw data string from queue
worker = self.running_workers.get(run_id)
tool_name = (
worker.tool_info.display_name
if worker and hasattr(worker, "tool_info")
else "Unknown Tool"
)
run_id_tag = ("run_id",)
prefix = f"[{run_id}] " # Prefix to add to output lines
# --- Handle non-stdout messages first ---
if msg_type == "stderr":
self._append_output(prefix, run_id_tag)
self._append_output(data_str, tag="error")
return
if msg_type == "status":
self.logger.info(f"Worker Status [{run_id}]: {data_str}")
self._append_output(prefix, run_id_tag)
self._append_output(f"[Worker Status] {data_str}\n", tag="info")
return
if msg_type == "finished":
exit_code = message.get("exit_code", "?")
self.logger.info(f"Finished: '{tool_name}' ({run_id}), Code={exit_code}")
tag = "success" if exit_code == 0 else "error"
self._append_output(prefix, run_id_tag)
self._append_output(
f"-- Finished: {tool_name} (Exit Code: {exit_code}) --\n", tag=tag
)
if run_id in self.running_workers:
del self.running_workers[run_id]
try:
if hasattr(self, "process_tree") and self.process_tree.exists(run_id):
self.process_tree.delete(run_id)
except tk.TclError:
pass
except Exception as e:
self.logger.error(f"Err removing {run_id} from tree: {e}")
if run_id == self.selected_run_id_for_termination:
self.terminate_button.config(state=tk.DISABLED)
self.selected_run_id_for_termination = None
return
# --- Handle stdout: Try parsing as JSON ---
if msg_type == "stdout":
parsed_json = None
is_json_line = False
line_content = data_str.strip()
if line_content.startswith("{") and line_content.endswith("}"):
try:
parsed_json = json.loads(line_content)
is_json_line = True
except json.JSONDecodeError:
is_json_line = False
except Exception:
is_json_line = False
self.logger.exception("Unexpected JSON parse error")
if is_json_line and isinstance(parsed_json, dict) and "type" in parsed_json:
json_type = parsed_json["type"]
json_data = parsed_json # Use json_data consistently
# --- INIZIO CORREZIONI NEI BLOCCHI EXCEPT ---
if json_type == "progress":
value = json_data.get("value", 0)
msg = json_data.get("message", "")
self.logger.info(f"Progress [{run_id}]: {value:.0%} {msg}")
self._append_output(prefix, run_id_tag)
self._append_output(f"[Progress {value:.0%}] {msg}\n", tag="info")
elif json_type == "status":
msg = json_data.get("message", "")
self.logger.info(f"Tool Status [{run_id}]: {msg}")
self._append_output(prefix, run_id_tag)
self._append_output(f"[Status] {msg}\n", tag="info")
elif json_type == "log":
level = str(json_data.get("level", "info")).lower()
msg = json_data.get("message", "")
self.logger.info(f"Tool Log [{run_id}][{level}]: {msg}")
tag = (
f"log_{level}"
if f"log_{level}" in self.output_text.tag_names()
else "info"
)
self._append_output(prefix, run_id_tag)
self._append_output(f"[Log:{level}] {msg}\n", tag=tag)
elif json_type == "result":
result_payload = json_data.get("data", {})
self.logger.info(f"Tool Result [{run_id}]: {result_payload}")
try:
result_str = json.dumps(result_payload, indent=2)
except Exception:
result_str = str(result_payload) # CORRETTO: Usa result_payload
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Result Data]:\n{result_str}\n", tag="success"
)
else: # Unknown JSON type
self.logger.warning(f"Unknown JSON type '{json_type}' [{run_id}]")
try:
unknown_json_str = json.dumps(json_data, indent=2)
except Exception:
unknown_json_str = str(json_data) # CORRETTO: Usa json_data
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Unknown JSON Type:{json_type}]:\n{unknown_json_str}\n",
tag="log_warning",
)
# --- FINE CORREZIONI NEI BLOCCHI EXCEPT ---
elif is_json_line: # Parsed but wrong structure
self.logger.warning(f"Unexpected JSON struct [{run_id}]: {parsed_json}")
# --- INIZIO CORREZIONE ---
try:
unexpected_json_str = json.dumps(parsed_json, indent=2)
except Exception:
unexpected_json_str = str(parsed_json) # CORRETTO: Usa parsed_json
# --- FINE CORREZIONE ---
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Unexpected JSON]:\n{unexpected_json_str}\n", tag="log_warning"
)
else: # Not JSON, treat as raw stdout
self.logger.debug(f"Raw stdout [{run_id}]: {data_str.rstrip()}")
self._append_output(prefix, run_id_tag)
self._append_output(data_str, tag="raw_stdout")
else: # Should not happen
self.logger.error(
f"Internal Error: Unknown queue msg type '{msg_type}' ({run_id})"
)
self._append_output(prefix, run_id_tag)
self._append_output(
f"[Internal Q Error: {msg_type}] {data_str}\n", tag="error"
)
def _append_output(
self, text: str, tag: Optional[str | Tuple[str, ...]] = None
) -> None:
"""Appends text to the output ScrolledText widget safely, limiting total lines."""
if not hasattr(self, "output_text") or not self.output_text.winfo_exists():
return
try:
self.output_text.config(state=tk.NORMAL)
current_lines = int(self.output_text.index("end-1c").split(".")[0])
if current_lines > MAX_OUTPUT_LINES:
lines_to_delete = (
current_lines - MAX_OUTPUT_LINES + (MAX_OUTPUT_LINES // 2)
)
marker_exists = (
hasattr(self, "_output_truncated_marker_added")
and self._output_truncated_marker_added
)
if not marker_exists:
self.output_text.insert(
"1.0", "[... Log Truncated ...]\n", ("info",)
)
self._output_truncated_marker_added = True
self.output_text.delete(
"2.0" if marker_exists else "1.0", f"{lines_to_delete + 1}.0"
)
self.output_text.insert(tk.END, text, tag or ())
self.output_text.see(tk.END)
self.output_text.config(state=tk.DISABLED)
except tk.TclError as e:
self.logger.error(f"TclError appending output: {e}", exc_info=True)
except Exception:
self.logger.exception("Unexpected error appending output.")
def _on_close(self) -> None:
"""Handles the window close event."""
self.logger.info("Close requested. Checking running processes.")
if self.running_workers:
num = len(self.running_workers)
msg = f"{num} tool(s) running. Exit and terminate them?"
if messagebox.askyesno("Confirm Exit", msg, parent=self.root):
self.logger.warning(
f"User confirmed exit. Terminating {num} processes."
)
ids_to_term = list(self.running_workers.keys())
for run_id in ids_to_term:
worker = self.running_workers.get(run_id)
if worker and hasattr(worker, "terminate"):
self.logger.info(f"Requesting termination for {run_id}")
try:
worker.terminate()
except Exception:
self.logger.exception(f"Err terminating {run_id}")
# Consider a short delay before destroying?
self.root.destroy()
else:
self.logger.info("Shutdown cancelled.")
return
else:
self.logger.info("No running processes. Closing.")
self.root.destroy()
# --- End of MainWindow Class ---
# Standard boilerplate for testing this module directly (optional)
if __name__ == "__main__":
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
print("Running MainWindow directly for UI testing/layout checks...")
# Ensure dummy classes exist if imports failed during test run
if not PROCESS_WORKER_ENABLED:
class ProcessWorker:
def __init__(self, *a, **kw):
self._process = None # Need _process for PID update test
def run(self):
print("Dummy worker run")
import time
time.sleep(2)
def terminate(self):
print("Dummy worker terminate")
class ToolInfo:
display_name = "Dummy Tool (Import Failed)"
has_gui = False
tool_info = ToolInfo()
if not TOOL_DISCOVERY_ENABLED:
def discover_tools(path):
return {} # Dummy discover
from collections import namedtuple
ToolParameter = namedtuple(
"ToolParameter",
["name", "label", "type", "required", "default", "description", "options"],
)
ToolInfo = namedtuple(
"ToolInfo",
[
"id",
"display_name",
"description",
"command",
"working_dir",
"parameters",
"version",
"has_gui",
],
)
test_root = tk.Tk()
test_root.withdraw()
try:
app = MainWindow(test_root)
except Exception as e:
logging.exception("Failed to initialize MainWindow for testing.")
test_root.destroy()
sys.exit(1)
test_root.mainloop()