# ProjectUtility/gui/process_worker.py import subprocess import threading import queue import logging import os import sys # To check platform for startupinfo from typing import Dict, Any, List # Assuming ToolInfo is available via core.models # Need to handle potential ImportError if core isn't fully set up yet, # though MainWindow should prevent instantiation if discovery failed. try: from core.models import ToolInfo, ToolParameter except ImportError: logging.getLogger(__name__).critical("Cannot import ToolInfo from core.models! ProcessWorker will likely fail.") # Define dummy classes if needed for static analysis or basic structure from collections import namedtuple ToolInfo = namedtuple("ToolInfo", ["id", "display_name", "description", "command", "working_dir", "parameters", "version"]) ToolParameter = namedtuple("ToolParameter", ["name", "label", "type", "required", "default", "description", "options"]) class ProcessWorker: """ Executes a tool's command in a separate thread using subprocess. Captures stdout and stderr streams and sends updates back to the main GUI thread via a queue. """ def __init__(self, run_id: str, tool_info: ToolInfo, parameters: Dict[str, Any], output_queue: queue.Queue) -> None: """ Initializes the ProcessWorker. Args: run_id: A unique identifier for this specific execution instance. tool_info: The ToolInfo object containing details about the tool to run. parameters: A dictionary of parameter values provided by the user (keys should match ToolParameter names). output_queue: The queue.Queue instance to send messages back to the GUI. """ self.run_id = run_id self.tool_info = tool_info self.parameters = parameters self.output_queue = output_queue self.logger = logging.getLogger(f"{__name__}.{run_id}") # Logger specific to this run self._process: subprocess.Popen | None = None self._stdout_thread: threading.Thread | None = None self._stderr_thread: threading.Thread | None = None self._stop_event = threading.Event() # Used for potential future cancellation signal self.logger.info(f"Worker initialized for tool: {self.tool_info.display_name}") def _build_command_list(self) -> List[str]: """Constructs the full command list including parameters.""" command = list(self.tool_info.command) # Start with the base command self.logger.debug(f"Base command: {command}") self.logger.debug(f"Received parameters: {self.parameters}") # Iterate through the defined parameters for the tool for param_def in self.tool_info.parameters: param_name = param_def.name param_type = param_def.type value = self.parameters.get(param_name, param_def.default) # Get user value or default # Skip if parameter is not required and has no value (None) if not param_def.required and value is None: self.logger.debug(f"Skipping optional parameter '{param_name}' with no value.") continue # Handle required parameter missing a value (should ideally be caught by GUI) if param_def.required and value is None: self.logger.error(f"Required parameter '{param_name}' is missing a value! This might cause the tool to fail.") # Decide strategy: skip, raise error, or let the tool handle it? # Let's log error and continue, tool might handle it or fail. # Alternatively, we could raise an exception here to prevent running. # raise ValueError(f"Required parameter '{param_name}' is missing a value.") continue # Skip adding it to the command line # --- Format parameter for command line (simple --name value convention) --- # Boolean parameters: Add flag only if True if param_type == "boolean": if bool(value): # Check truthiness command.append(f"--{param_name}") self.logger.debug(f"Adding boolean flag: --{param_name}") else: self.logger.debug(f"Skipping boolean flag '{param_name}' as its value is False.") # Other types: Add --name and value elif value is not None: # Ensure we have a value before adding --name and value command.append(f"--{param_name}") command.append(str(value)) # Convert value to string for command line self.logger.debug(f"Adding parameter: --{param_name} {str(value)}") else: # This case should ideally not be reached due to earlier checks self.logger.warning(f"Parameter '{param_name}' has None value unexpectedly. Skipping.") self.logger.info(f"Final command constructed: {command}") return command def _stream_reader(self, stream, stream_type: str) -> None: """Reads lines from a stream and puts them onto the output queue.""" try: # Using readline() is generally safe for text-mode streams for line in iter(stream.readline, ''): if self._stop_event.is_set(): # Check if cancellation was requested self.logger.debug(f"Stop event set, stopping {stream_type} reader.") break if line: # Ensure line is not empty # self.logger.debug(f"Read {stream_type}: {line.strip()}") # Can be noisy self.output_queue.put({ "type": stream_type, "run_id": self.run_id, "data": line # Send the full line including newline }) else: # Empty line might indicate end of stream in some cases, # but iter(stream.readline, '') should handle EOF correctly. pass self.logger.debug(f"{stream_type} stream reading finished.") except ValueError as e: # Can happen if the stream is closed while reading (e.g., process killed) self.logger.warning(f"ValueError reading {stream_type} stream (process likely terminated): {e}") except Exception as e: self.logger.exception(f"Error reading {stream_type} stream.") # Send an error message to the GUI about the reader failure self.output_queue.put({ "type": "stderr", # Report as stderr "run_id": self.run_id, "data": f"*** Error in ProjectUtility reading {stream_type}: {e} ***\n" }) finally: # Ensure the stream is closed if possible (though Popen should handle it) try: stream.close() except Exception: pass # Ignore errors during close def run(self) -> None: """Executes the tool's command and manages output streaming.""" self.logger.info("Worker thread started. Preparing to execute command.") final_exit_code: int | None = None # Use None to indicate potential failure before execution try: command_list = self._build_command_list() # --- Platform specific startup info (hide console window on Windows) --- startupinfo = None if sys.platform == "win32": startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE # Hide console window # CREATE_NO_WINDOW is another option, potentially more robust # creationflags = subprocess.CREATE_NO_WINDOW # --- Execute the command using subprocess.Popen --- self.logger.info(f"Executing: {' '.join(command_list)} in {self.tool_info.working_dir}") self.output_queue.put({ # Inform GUI about the exact command being run "type": "status", "run_id": self.run_id, "data": f"Executing: {' '.join(command_list)}" }) self._process = subprocess.Popen( command_list, cwd=self.tool_info.working_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, # Read streams as text (UTF-8 by default usually) encoding='utf-8', # Be explicit about encoding errors='replace', # Handle potential decoding errors gracefully bufsize=1, # Line-buffered startupinfo=startupinfo # Pass startupinfo for Windows # creationflags=creationflags # Alternative for Windows ) self.logger.info(f"Process started with PID: {self._process.pid}") # --- Start threads to read stdout and stderr --- if self._process.stdout: self._stdout_thread = threading.Thread( target=self._stream_reader, args=(self._process.stdout, "stdout"), daemon=True # Daemon threads won't block program exit ) self._stdout_thread.start() self.logger.debug("Stdout reader thread started.") else: self.logger.warning("Process stdout stream is not available.") if self._process.stderr: self._stderr_thread = threading.Thread( target=self._stream_reader, args=(self._process.stderr, "stderr"), daemon=True ) self._stderr_thread.start() self.logger.debug("Stderr reader thread started.") else: self.logger.warning("Process stderr stream is not available.") # --- Wait for process to complete --- # wait() blocks this worker thread, not the GUI thread final_exit_code = self._process.wait() self.logger.info(f"Process {self._process.pid} finished with exit code: {final_exit_code}") # --- Wait for reader threads to finish --- # They should finish naturally as streams close after process terminates if self._stdout_thread and self._stdout_thread.is_alive(): self.logger.debug("Waiting for stdout reader thread to join...") self._stdout_thread.join(timeout=2.0) # Add a timeout if self._stdout_thread.is_alive(): self.logger.warning("Stdout reader thread did not join within timeout.") if self._stderr_thread and self._stderr_thread.is_alive(): self.logger.debug("Waiting for stderr reader thread to join...") self._stderr_thread.join(timeout=2.0) if self._stderr_thread.is_alive(): self.logger.warning("Stderr reader thread did not join within timeout.") except FileNotFoundError as e: self.logger.error(f"Command execution failed: Command not found. {e}") self.output_queue.put({ "type": "stderr", "run_id": self.run_id, "data": f"ERROR: Command not found. Please ensure '{command_list[0]}' is installed and in the system's PATH or the working directory.\nDetails: {e}\n" }) final_exit_code = -1 # Use a conventional error code except PermissionError as e: self.logger.error(f"Command execution failed: Permission denied. {e}") self.output_queue.put({ "type": "stderr", "run_id": self.run_id, "data": f"ERROR: Permission denied to execute command.\nDetails: {e}\n" }) final_exit_code = -2 except Exception as e: # Catch any other unexpected errors during setup or execution self.logger.exception("An unexpected error occurred during process execution.") self.output_queue.put({ "type": "stderr", "run_id": self.run_id, "data": f"*** ProjectUtility Worker Error: An unexpected error occurred ***\n{e}\n" }) if self._process and self._process.poll() is None: # If process started but an error occurred in the worker, try to terminate it self.logger.warning("Terminating process due to worker error.") self.terminate() # Attempt graceful termination final_exit_code = -3 # Indicate worker error finally: # --- Send final status message --- # Ensure this runs even if errors occurred before/during execution if final_exit_code is None: # This means a critical error happened *before* wait() could be called successfully final_exit_code = -4 # Indicate failure before process completion check self.logger.error("Process did not complete normally or worker failed before wait().") self.output_queue.put({ "type": "finished", "run_id": self.run_id, "exit_code": final_exit_code }) self.logger.info(f"Worker thread finished for run_id: {self.run_id}. Final exit code reported: {final_exit_code}") # Clean up process reference self._process = None def terminate(self) -> None: """Requests termination of the running subprocess.""" if self._process and self._process.poll() is None: # Check if process exists and is running self.logger.warning(f"Attempting to terminate process {self._process.pid} for run {self.run_id}...") self._stop_event.set() # Signal reader threads to stop trying to read try: self._process.terminate() # Sends SIGTERM (Unix) or TerminateProcess (Windows) # Optionally add a timeout and then call self._process.kill() if terminate doesn't work self.logger.info(f"Sent terminate signal to process {self._process.pid}.") # Note: Termination might not be immediate. The 'finished' message # will still be sent when wait() eventually returns (likely with non-zero code). except Exception as e: self.logger.exception(f"Error while trying to terminate process {self._process.pid}: {e}") else: self.logger.info(f"Process for run {self.run_id} is not running or does not exist. Cannot terminate.")