SXXXXXXX_ProjectUtility/projectutility/core/tool_discovery.py
2025-05-05 14:38:19 +02:00

457 lines
22 KiB
Python

# projectutility/core/tool_discovery.py
import os
import json
import logging
import shutil
from typing import Dict, List, Any, Optional
# --- Internal Imports ---
# Use relative imports for modules within the same package ('core')
try:
from .models import ToolInfo, ToolParameter
from .registry_models import ToolRegistryEntry
from .registry_manager import load_registry # We need this to get the raw registry data
except ImportError as e:
# Log critical error if core components cannot be imported
logging.getLogger(__name__).critical(
f"Failed to import core modules (models/registry). Discovery cannot proceed: {e}",
exc_info=True
)
# Define dummies to prevent NameErrors if possible, though discovery will fail
from collections import namedtuple
ToolParameter = namedtuple("ToolParameter", ["name", "label", "type", "required", "default", "description", "options"])
ToolInfo = namedtuple("ToolInfo", ["id", "display_name", "description", "command", "working_dir", "parameters", "version", "has_gui"])
from dataclasses import dataclass
@dataclass
class ToolRegistryEntry: pass
def load_registry(): return []
# --- Module Constants ---
logger = logging.getLogger(__name__)
# Calculate paths relative to this file's location
try:
# Path to this file: .../ProjectUtility/projectutility/core/tool_discovery.py
_current_file_path = os.path.abspath(__file__)
# Path to the 'core' directory: .../ProjectUtility/projectutility/core
_core_dir = os.path.dirname(_current_file_path)
# Path to the 'projectutility' package source root: .../ProjectUtility/projectutility
_app_source_root = os.path.dirname(_core_dir)
# Path to the REPOSITORY ROOT: .../ProjectUtility/
APP_ROOT_DIR = os.path.dirname(_app_source_root)
# Define tool directories relative to the REPOSITORY ROOT
TOOLS_DIR = os.path.join(APP_ROOT_DIR, "tools") # For 'local' type tools
MANAGED_TOOLS_DIR = os.path.join(APP_ROOT_DIR, "managed_tools") # For 'git' type tools
logger.debug(f"Tool Discovery: Calculated APP_ROOT_DIR (Repo Root): {APP_ROOT_DIR}")
logger.debug(f"Tool Discovery: Local tools directory (TOOLS_DIR): {TOOLS_DIR}")
logger.debug(f"Tool Discovery: Managed tools directory (MANAGED_TOOLS_DIR): {MANAGED_TOOLS_DIR}")
except Exception as e:
logger.critical(
f"Failed to calculate base paths in tool_discovery: {e}. Tool paths will be incorrect.",
exc_info=True
)
# Fallback paths - discovery will likely fail for relative paths
APP_ROOT_DIR = os.getcwd()
TOOLS_DIR = os.path.join(APP_ROOT_DIR, "tools")
MANAGED_TOOLS_DIR = os.path.join(APP_ROOT_DIR, "managed_tools")
logger.warning(f"Using fallback paths: Repo Root='{APP_ROOT_DIR}', Tools='{TOOLS_DIR}', Managed='{MANAGED_TOOLS_DIR}'")
# --- Helper Functions ---
def _parse_parameters(param_list: List[Dict[str, Any]]) -> List[ToolParameter]:
"""
Parses a list of parameter dictionaries (from JSON) into ToolParameter objects.
Args:
param_list: A list where each item is a dictionary defining a parameter.
Returns:
A list of validated ToolParameter objects. Returns an empty list if
input is invalid or parsing fails for all entries.
"""
parsed_params: List[ToolParameter] = []
if not isinstance(param_list, list):
logger.warning(
f"Invalid 'parameters' format provided to _parse_parameters: "
f"Expected a list, got {type(param_list)}. Cannot parse parameters."
)
return [] # Return empty list if the input structure is wrong
required_keys = {"name", "label", "type", "required"}
for index, param_dict in enumerate(param_list):
if not isinstance(param_dict, dict):
logger.warning(
f"Skipping invalid parameter entry at index {index}: "
f"Expected a dictionary, got {type(param_dict).__name__}."
)
continue
# Check for required keys within the parameter dictionary
missing_keys = required_keys - param_dict.keys()
if missing_keys:
logger.warning(
f"Skipping parameter definition at index {index} due to missing "
f"required keys: {missing_keys}. Parameter dict: {param_dict}"
)
continue
try:
# Create the ToolParameter object, performing basic type conversions
parameter = ToolParameter(
name=str(param_dict["name"]),
label=str(param_dict["label"]),
type=str(param_dict["type"]).lower(), # Ensure type is lowercase
required=bool(param_dict["required"]),
default=param_dict.get("default"), # Get optional key 'default'
description=param_dict.get("description"), # Get optional key 'description'
options=param_dict.get("options") # Get optional key 'options'
)
# --- Optional: Add more validation for parameter type/options ---
# Example: Check if 'options' key exists and is a dict if present
# if parameter.options is not None and not isinstance(parameter.options, dict):
# logger.warning(f"Parameter '{parameter.name}': 'options' field should be a dictionary.")
# # Decide how to handle: nullify options, skip param, etc.
# Example: Check if file filter options are structured correctly
# if parameter.type == 'file' and parameter.options and 'filter' in parameter.options:
# if not isinstance(parameter.options['filter'], list):
# logger.warning(...)
parsed_params.append(parameter)
logger.debug(f"Successfully parsed parameter: {parameter.name}")
except (TypeError, KeyError, ValueError) as e:
# Catch errors during creation (e.g., bool conversion fails)
logger.warning(
f"Skipping parameter definition at index {index} due to parsing "
f"error: {e}. Parameter dict: {param_dict}"
)
except Exception as e:
# Catch any other unexpected errors
logger.exception(
f"Unexpected error parsing parameter definition at index {index}: "
f"{param_dict}"
)
return parsed_params
def _load_parameters_from_file(param_file_path: str) -> List[ToolParameter]:
"""
Loads and parses tool parameter definitions from a specified JSON file.
The JSON file should contain a root object with a key named "parameters"
whose value is a list of parameter definition dictionaries.
Args:
param_file_path: The absolute path to the JSON file containing parameter definitions.
Returns:
A list of parsed ToolParameter objects. Returns an empty list if the
file is not found, cannot be parsed, or has an invalid format.
"""
if not os.path.isfile(param_file_path):
logger.warning(f"Parameter definition file not found: {param_file_path}")
return []
try:
with open(param_file_path, "r", encoding="utf-8") as file_handle:
data = json.load(file_handle)
# Validate the structure: must be a dict with a 'parameters' key holding a list
if not isinstance(data, dict):
logger.warning(
f"Invalid format in parameter file {param_file_path}: "
f"Expected a JSON object (dictionary) at the root, "
f"found {type(data).__name__}."
)
return []
if "parameters" not in data:
logger.warning(
f"Invalid format in parameter file {param_file_path}: "
f"Missing required 'parameters' key in the root object."
)
return []
if not isinstance(data["parameters"], list):
logger.warning(
f"Invalid format in parameter file {param_file_path}: "
f"The 'parameters' key should map to a JSON list (array), "
f"found {type(data['parameters']).__name__}."
)
return []
# Pass the list associated with the 'parameters' key to the parser
logger.info(f"Successfully loaded parameter definitions from: {param_file_path}")
return _parse_parameters(data["parameters"])
except json.JSONDecodeError as e:
logger.error(
f"Failed to parse JSON in parameter file {param_file_path}: {e}",
exc_info=True
)
return []
except IOError as e:
logger.error(
f"Failed to read parameter file {param_file_path}: {e}",
exc_info=True
)
return []
except Exception as e:
# Catch any other unexpected errors during load/parse
logger.exception(
f"An unexpected error occurred while loading or parsing "
f"parameter file {param_file_path}: {e}"
)
return []
# --- Main Discovery Function ---
def discover_tools() -> Dict[str, ToolInfo]:
"""
Discovers available tools based on the central tools registry.
Reads the tool registry, processes each enabled entry, resolves paths
and working directories for 'local' and 'git' tools, loads parameters,
and constructs ToolInfo objects for available tools.
Returns:
A dictionary mapping tool IDs (str) to their corresponding
ToolInfo objects for all enabled and available tools.
"""
discovered_tools: Dict[str, ToolInfo] = {}
logger.info("Starting tool discovery process based on the registry...")
# Load the raw registry entries
registry: List[ToolRegistryEntry] = load_registry()
if not registry:
logger.warning("Tool registry is empty or failed to load. No tools discovered.")
return discovered_tools # Return empty dict
# Iterate through each entry in the loaded registry
for entry in registry:
tool_id = entry.id
tool_params: List[ToolParameter] = []
working_dir_abs: Optional[str] = None
is_available: bool = False # Flag to track if tool is found/valid
# Start with a copy of the command from registry; may be modified (e.g., abs path)
final_command: List[str] = list(entry.run_command)
# Skip processing if the tool is explicitly disabled in the registry
if not entry.enabled:
logger.debug(f"Skipping tool '{tool_id}': Marked as disabled in registry.")
continue
logger.debug(f"Processing registry entry: '{entry.display_name}' (ID: {tool_id}, Type: {entry.type})")
# --- Handle Tool Based on its Type ('local' or 'git') ---
if entry.type == "local":
# --- Logic for 'local' tools ---
if not final_command: # Should be caught by registry validation, but double-check
logger.warning(f"Local tool '{tool_id}': 'run_command' is empty. Skipping.")
continue
executable_or_script = final_command[0]
# Potential script might be the second argument (e.g., "python script.py")
script_argument = final_command[1] if len(final_command) > 1 else None
resolved_script_path_abs: Optional[str] = None # Store absolute path if found
# Case 1: Command is an executable found in the system's PATH
# Example: run_command: ["notepad.exe"]
# Example: run_command: ["git", "status"] (here 'git' is in PATH)
# This case assumes no specific script path relative to TOOLS_DIR is involved *as the primary target*.
# We check shutil.which() only if script_argument is None, to avoid matching 'python' when a script is given.
if not script_argument and shutil.which(executable_or_script):
# If the command itself is in PATH, the working directory
# often doesn't matter or should be the repo root by default.
working_dir_abs = APP_ROOT_DIR # Default working dir for PATH commands
is_available = True
logger.debug(
f"Local tool '{tool_id}': Command '{executable_or_script}' found in PATH. "
f"Set working directory to repo root: '{working_dir_abs}'"
)
# Case 2: A script is provided as an argument, relative to TOOLS_DIR
# Example: run_command: ["python", "my_tool/main.py"]
# Example: run_command: ["powershell.exe", "./scripts/backup.ps1"]
elif script_argument:
potential_script_path = os.path.join(TOOLS_DIR, script_argument)
potential_script_path_abs = os.path.abspath(potential_script_path)
if os.path.isfile(potential_script_path_abs):
resolved_script_path_abs = potential_script_path_abs
# Working directory should typically be the script's directory
working_dir_abs = os.path.dirname(resolved_script_path_abs)
# Modify the command list to use the absolute path for the script argument
final_command[1] = resolved_script_path_abs
is_available = True
logger.debug(
f"Local tool '{tool_id}': Found script argument '{script_argument}' "
f"relative to TOOLS_DIR ('{TOOLS_DIR}').\n"
f" Absolute Script Path: {resolved_script_path_abs}\n"
f" Working Directory: {working_dir_abs}\n"
f" Updated Command: {final_command}"
)
else:
logger.warning(
f"Local tool '{tool_id}': Script argument '{script_argument}' "
f"specified in run_command[1] was not found at the resolved path: "
f"'{potential_script_path_abs}'. Tool marked as unavailable."
)
is_available = False # Mark as unavailable if script not found
# Case 3: The command itself is a script relative to TOOLS_DIR (less common)
# Example: run_command: ["scripts/run_me.bat"]
elif not script_argument: # Only check this if case 1 & 2 didn't apply
potential_script_path = os.path.join(TOOLS_DIR, executable_or_script)
potential_script_path_abs = os.path.abspath(potential_script_path)
if os.path.isfile(potential_script_path_abs):
resolved_script_path_abs = potential_script_path_abs
# Working directory should typically be the script's directory
working_dir_abs = os.path.dirname(resolved_script_path_abs)
# Modify the command list to use the absolute path for the script itself
final_command[0] = resolved_script_path_abs
is_available = True
logger.debug(
f"Local tool '{tool_id}': Found command '{executable_or_script}' "
f"as a script relative to TOOLS_DIR ('{TOOLS_DIR}').\n"
f" Absolute Script Path: {resolved_script_path_abs}\n"
f" Working Directory: {working_dir_abs}\n"
f" Updated Command: {final_command}"
)
else:
# If not in PATH (Case 1) and not a script relative to TOOLS_DIR (Case 3)
logger.warning(
f"Local tool '{tool_id}': Command '{executable_or_script}' "
f"was not found in PATH and not found as a relative script "
f"within TOOLS_DIR ('{TOOLS_DIR}'). Tool marked as unavailable."
)
is_available = False
# Load inline parameters if defined in the registry entry for local tools
if is_available and entry.parameters is not None:
if isinstance(entry.parameters, list):
logger.debug(f"Local tool '{tool_id}': Found inline parameter definitions.")
tool_params = _parse_parameters(entry.parameters)
else:
logger.warning(f"Local tool '{tool_id}': 'parameters' field is defined but is not a list. Ignoring.")
elif entry.type == "git":
# --- Logic for 'git' tools ---
# Determine the expected local path for the cloned repository
local_dir = entry.local_dir_name if entry.local_dir_name else tool_id
local_repo_path_abs = os.path.abspath(os.path.join(MANAGED_TOOLS_DIR, local_dir))
# The working directory for git tools is always the root of their repository
working_dir_abs = local_repo_path_abs
# Check if the repository directory actually exists locally
if os.path.isdir(local_repo_path_abs):
is_available = True # Tool is considered available if cloned
logger.debug(
f"Git tool '{tool_id}': Repository directory exists at: {local_repo_path_abs}. "
f"Working directory set."
)
# Load parameters from the definition file, if specified
if entry.parameters_definition_file:
param_file_rel_path = entry.parameters_definition_file
param_file_abs_path = os.path.join(local_repo_path_abs, param_file_rel_path)
logger.debug(f"Git tool '{tool_id}': Attempting to load parameters from file: {param_file_abs_path}")
tool_params = _load_parameters_from_file(param_file_abs_path)
else:
logger.debug(f"Git tool '{tool_id}': No parameter definition file specified.")
else:
# If the directory doesn't exist, the tool is unavailable until cloned/updated
is_available = False
logger.warning(
f"Git tool '{tool_id}': Expected repository directory not found at "
f"'{local_repo_path_abs}'. The tool needs to be cloned or updated. "
f"Marked as unavailable for execution."
)
# We don't try to load parameters if the repo doesn't exist
else:
# Handle unknown tool types defined in the registry
logger.error(
f"Skipping tool '{tool_id}': Encountered unknown tool type '{entry.type}' "
f"in the registry. Supported types are 'local' and 'git'."
)
continue # Skip to the next registry entry
# --- Create ToolInfo Object if Available ---
# The tool must be marked as available AND have a valid working directory
if is_available and working_dir_abs is not None:
if not os.path.isabs(working_dir_abs):
logger.error(f"Internal Error: Calculated working directory '{working_dir_abs}' for tool '{tool_id}' is not absolute. Skipping tool.")
continue
try:
# Log the details being used to create the ToolInfo object
logger.debug(f"Creating ToolInfo for '{tool_id}' with:")
logger.debug(f" - display_name: {entry.display_name}")
logger.debug(f" - description: {entry.description or ''}")
logger.debug(f" - command: {final_command}")
logger.debug(f" - working_dir: {working_dir_abs}")
logger.debug(f" - parameters count: {len(tool_params)}")
logger.debug(f" - version: {entry.version}")
logger.debug(f" - has_gui: {entry.has_gui}")
# Instantiate the ToolInfo dataclass
tool_info = ToolInfo(
id=tool_id,
display_name=entry.display_name,
description=entry.description or "", # Use empty string if None
command=final_command, # Use the potentially modified command list
working_dir=working_dir_abs, # Use the calculated absolute working directory
parameters=tool_params, # Use the loaded/parsed parameters
version=entry.version,
has_gui=entry.has_gui,
)
# Add the created ToolInfo object to the dictionary
discovered_tools[tool_id] = tool_info
logger.info(
f"Successfully discovered and processed tool: '{tool_info.display_name}' "
f"(ID: {tool_id}, Type: {entry.type})"
)
except (ValueError, TypeError) as info_err:
# Catch errors during ToolInfo instantiation (e.g., from __post_init__)
logger.error(
f"Failed to create ToolInfo object for '{tool_id}' due to validation "
f"error: {info_err}. Skipping tool.",
exc_info=True
)
except Exception as e:
# Catch any other unexpected errors during ToolInfo creation
logger.exception(
f"An unexpected error occurred while creating ToolInfo for "
f"'{tool_id}': {e}. Skipping tool."
)
else:
# Log if a tool was processed but deemed unavailable or had issues
if entry.enabled: # Only log if it wasn't skipped due to being disabled
logger.debug(
f"Tool '{tool_id}' was processed but marked as unavailable "
f"(is_available={is_available}, working_dir_abs='{working_dir_abs}'). "
f"It will not be added to the list of discovered tools."
)
# --- Final Log Message ---
logger.info(
f"Tool discovery process finished. Found {len(discovered_tools)} "
f"enabled and available tools."
)
return discovered_tools