453 lines
18 KiB
Python
453 lines
18 KiB
Python
# projectutility/core/tool_discovery.py
|
|
|
|
import os
|
|
import sys # Import sys
|
|
import json
|
|
import logging
|
|
import shutil
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
# --- Internal Imports ---
|
|
try:
|
|
from .models import ToolInfo, ToolParameter
|
|
from .registry_models import ToolRegistryEntry
|
|
from .registry_manager import load_registry
|
|
except ImportError as e:
|
|
logging.getLogger(__name__).critical(
|
|
f"Failed to import core modules (models/registry). Discovery cannot proceed: {e}",
|
|
exc_info=True,
|
|
)
|
|
from collections import namedtuple
|
|
|
|
ToolParameter = namedtuple(
|
|
"ToolParameter",
|
|
["name", "label", "type", "required", "default", "description", "options"],
|
|
)
|
|
ToolInfo = namedtuple(
|
|
"ToolInfo",
|
|
[
|
|
"id",
|
|
"display_name",
|
|
"description",
|
|
"command",
|
|
"working_dir",
|
|
"parameters",
|
|
"version",
|
|
"has_gui",
|
|
],
|
|
)
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class ToolRegistryEntry:
|
|
pass
|
|
|
|
def load_registry():
|
|
return []
|
|
|
|
|
|
# --- Module Constants ---
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Determine if Running as Frozen Executable ---
|
|
IS_FROZEN = getattr(sys, "frozen", False)
|
|
|
|
# --- Calculate Base Paths ---
|
|
# APP_ROOT_DIR is the base for MANAGED_TOOLS_DIR and, in script mode, for TOOLS_DIR.
|
|
# In frozen mode, it's the directory containing the executable.
|
|
# In script mode, it's the repository root.
|
|
_app_root_dir_calculated: str
|
|
if IS_FROZEN:
|
|
_app_root_dir_calculated = os.path.dirname(sys.executable)
|
|
logger.debug(
|
|
f"Tool Discovery (FROZEN): Executable directory (used as App Root): {_app_root_dir_calculated}"
|
|
)
|
|
else:
|
|
_current_file_path = os.path.abspath(__file__) # .../core/tool_discovery.py
|
|
_core_dir = os.path.dirname(_current_file_path) # .../core
|
|
_app_source_root = os.path.dirname(_core_dir) # .../projectutility (package)
|
|
_app_root_dir_calculated = os.path.dirname(_app_source_root) # Repository Root
|
|
logger.debug(
|
|
f"Tool Discovery (SCRIPT): Repository root: {_app_root_dir_calculated}"
|
|
)
|
|
|
|
APP_ROOT_DIR = _app_root_dir_calculated
|
|
|
|
# MANAGED_TOOLS_DIR: For Git tools, always relative to APP_ROOT_DIR (exe dir or repo root)
|
|
MANAGED_TOOLS_DIR = os.path.join(APP_ROOT_DIR, "managed_tools")
|
|
|
|
# TOOLS_DIR: For 'local' tools.
|
|
# In script mode, relative to APP_ROOT_DIR (repo root).
|
|
# In frozen mode, local tools must be bundled by PyInstaller.
|
|
# Assume they are bundled into a 'tools' directory within sys._MEIPASS.
|
|
if IS_FROZEN:
|
|
if hasattr(sys, "_MEIPASS"):
|
|
TOOLS_DIR = os.path.join(sys._MEIPASS, "tools")
|
|
logger.debug(
|
|
f"Tool Discovery (FROZEN): Local tools directory expected in _MEIPASS: {TOOLS_DIR}"
|
|
)
|
|
# PyInstaller command needs to include: --add-data "path/to/your/tools:tools"
|
|
else:
|
|
# Fallback if _MEIPASS is not set (should not happen in a normal frozen app)
|
|
TOOLS_DIR = os.path.join(
|
|
APP_ROOT_DIR, "tools_frozen_fallback"
|
|
) # Or handle as error
|
|
logger.error(
|
|
"Tool Discovery (FROZEN): sys._MEIPASS not found! Local tools path may be incorrect."
|
|
)
|
|
else:
|
|
TOOLS_DIR = os.path.join(APP_ROOT_DIR, "tools")
|
|
logger.debug(f"Tool Discovery (SCRIPT): Local tools directory: {TOOLS_DIR}")
|
|
|
|
logger.debug(
|
|
f"Tool Discovery: Final APP_ROOT_DIR (Repo Root or Exe Dir): {APP_ROOT_DIR}"
|
|
)
|
|
logger.debug(f"Tool Discovery: Final Local tools directory (TOOLS_DIR): {TOOLS_DIR}")
|
|
logger.debug(
|
|
f"Tool Discovery: Final Managed tools directory (MANAGED_TOOLS_DIR): {MANAGED_TOOLS_DIR}"
|
|
)
|
|
|
|
|
|
# --- Helper Functions --- (unchanged from your version)
|
|
|
|
|
|
def _parse_parameters(param_list: List[Dict[str, Any]]) -> List[ToolParameter]:
|
|
"""
|
|
Parses a list of parameter dictionaries (from JSON) into ToolParameter objects.
|
|
|
|
Args:
|
|
param_list: A list where each item is a dictionary defining a parameter.
|
|
|
|
Returns:
|
|
A list of validated ToolParameter objects. Returns an empty list if
|
|
input is invalid or parsing fails for all entries.
|
|
"""
|
|
parsed_params: List[ToolParameter] = []
|
|
if not isinstance(param_list, list):
|
|
logger.warning(
|
|
f"Invalid 'parameters' format provided to _parse_parameters: "
|
|
f"Expected a list, got {type(param_list)}. Cannot parse parameters."
|
|
)
|
|
return []
|
|
|
|
required_keys = {"name", "label", "type", "required"}
|
|
|
|
for index, param_dict in enumerate(param_list):
|
|
if not isinstance(param_dict, dict):
|
|
logger.warning(
|
|
f"Skipping invalid parameter entry at index {index}: "
|
|
f"Expected a dictionary, got {type(param_dict).__name__}."
|
|
)
|
|
continue
|
|
|
|
missing_keys = required_keys - param_dict.keys()
|
|
if missing_keys:
|
|
logger.warning(
|
|
f"Skipping parameter definition at index {index} due to missing "
|
|
f"required keys: {missing_keys}. Parameter dict: {param_dict}"
|
|
)
|
|
continue
|
|
|
|
try:
|
|
parameter = ToolParameter(
|
|
name=str(param_dict["name"]),
|
|
label=str(param_dict["label"]),
|
|
type=str(param_dict["type"]).lower(),
|
|
required=bool(param_dict["required"]),
|
|
default=param_dict.get("default"),
|
|
description=param_dict.get("description"),
|
|
options=param_dict.get("options"),
|
|
)
|
|
parsed_params.append(parameter)
|
|
# logger.debug(f"Successfully parsed parameter: {parameter.name}") # Already logged
|
|
except (TypeError, KeyError, ValueError) as e:
|
|
logger.warning(
|
|
f"Skipping parameter definition at index {index} due to parsing "
|
|
f"error: {e}. Parameter dict: {param_dict}"
|
|
)
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Unexpected error parsing parameter definition at index {index}: "
|
|
f"{param_dict}"
|
|
)
|
|
return parsed_params
|
|
|
|
|
|
def _load_parameters_from_file(param_file_path: str) -> List[ToolParameter]:
|
|
"""
|
|
Loads and parses tool parameter definitions from a specified JSON file.
|
|
|
|
The JSON file should contain a root object with a key named "parameters"
|
|
whose value is a list of parameter definition dictionaries.
|
|
|
|
Args:
|
|
param_file_path: The absolute path to the JSON file containing parameter definitions.
|
|
For frozen apps, this path should be correctly resolved (e.g., within _MEIPASS
|
|
if the param file is bundled with a Git tool).
|
|
|
|
Returns:
|
|
A list of parsed ToolParameter objects. Returns an empty list if the
|
|
file is not found, cannot be parsed, or has an invalid format.
|
|
"""
|
|
if not os.path.isfile(param_file_path):
|
|
logger.warning(f"Parameter definition file not found: {param_file_path}")
|
|
return []
|
|
|
|
try:
|
|
with open(param_file_path, "r", encoding="utf-8") as file_handle:
|
|
data = json.load(file_handle)
|
|
|
|
if not isinstance(data, dict):
|
|
logger.warning(
|
|
f"Invalid format in parameter file {param_file_path}: "
|
|
f"Expected a JSON object (dictionary) at the root, "
|
|
f"found {type(data).__name__}."
|
|
)
|
|
return []
|
|
if "parameters" not in data:
|
|
logger.warning(
|
|
f"Invalid format in parameter file {param_file_path}: "
|
|
f"Missing required 'parameters' key in the root object."
|
|
)
|
|
return []
|
|
if not isinstance(data["parameters"], list):
|
|
logger.warning(
|
|
f"Invalid format in parameter file {param_file_path}: "
|
|
f"The 'parameters' key should map to a JSON list (array), "
|
|
f"found {type(data['parameters']).__name__}."
|
|
)
|
|
return []
|
|
|
|
logger.info(
|
|
f"Successfully loaded parameter definitions from: {param_file_path}"
|
|
)
|
|
return _parse_parameters(data["parameters"])
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(
|
|
f"Failed to parse JSON in parameter file {param_file_path}: {e}",
|
|
exc_info=True,
|
|
)
|
|
return []
|
|
except IOError as e:
|
|
logger.error(
|
|
f"Failed to read parameter file {param_file_path}: {e}", exc_info=True
|
|
)
|
|
return []
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"An unexpected error occurred while loading or parsing "
|
|
f"parameter file {param_file_path}: {e}"
|
|
)
|
|
return []
|
|
|
|
|
|
# --- Main Discovery Function --- (Logic for path resolution within this function is now based on the frozen-aware TOOLS_DIR)
|
|
|
|
|
|
def discover_tools() -> Dict[str, ToolInfo]:
|
|
"""
|
|
Discovers available tools based on the central tools registry.
|
|
|
|
Reads the tool registry, processes each enabled entry, resolves paths
|
|
and working directories for 'local' and 'git' tools, loads parameters,
|
|
and constructs ToolInfo objects for available tools.
|
|
|
|
`TOOLS_DIR` and `MANAGED_TOOLS_DIR` (and `APP_ROOT_DIR`) are now module-level
|
|
constants that are aware of the execution mode (script vs. frozen).
|
|
|
|
Returns:
|
|
A dictionary mapping tool IDs (str) to their corresponding
|
|
ToolInfo objects for all enabled and available tools.
|
|
"""
|
|
discovered_tools: Dict[str, ToolInfo] = {}
|
|
logger.info("Starting tool discovery process based on the registry...")
|
|
|
|
registry: List[ToolRegistryEntry] = load_registry()
|
|
if not registry:
|
|
logger.warning("Tool registry is empty or failed to load. No tools discovered.")
|
|
return discovered_tools
|
|
|
|
for entry in registry:
|
|
tool_id = entry.id
|
|
tool_params: List[ToolParameter] = []
|
|
working_dir_abs: Optional[str] = None
|
|
is_available: bool = False
|
|
final_command: List[str] = list(entry.run_command)
|
|
|
|
if not entry.enabled:
|
|
logger.debug(f"Skipping tool '{tool_id}': Marked as disabled in registry.")
|
|
continue
|
|
|
|
logger.debug(
|
|
f"Processing registry entry: '{entry.display_name}' (ID: {tool_id}, Type: {entry.type})"
|
|
)
|
|
|
|
if entry.type == "local":
|
|
if not final_command:
|
|
logger.warning(
|
|
f"Local tool '{tool_id}': 'run_command' is empty. Skipping."
|
|
)
|
|
continue
|
|
|
|
executable_or_script = final_command[0]
|
|
script_argument = final_command[1] if len(final_command) > 1 else None
|
|
resolved_script_path_abs: Optional[str] = None
|
|
|
|
# TOOLS_DIR is now correctly set for frozen or script mode.
|
|
# For frozen mode, it points inside sys._MEIPASS/tools.
|
|
# PyInstaller must be instructed to copy the local tool files/directories there.
|
|
# Example: --add-data "path/to/your_actual_tools_dir/dummy_script:tools/dummy_script"
|
|
|
|
if not script_argument and shutil.which(executable_or_script):
|
|
working_dir_abs = APP_ROOT_DIR # APP_ROOT_DIR is exe_dir (frozen) or repo_root (script)
|
|
is_available = True
|
|
logger.debug(
|
|
f"Local tool '{tool_id}': Command '{executable_or_script}' found in PATH. "
|
|
f"Set working directory to App Root: '{working_dir_abs}'"
|
|
)
|
|
elif script_argument:
|
|
# Script argument is relative to TOOLS_DIR (e.g., "dummy_script/dummy.py")
|
|
potential_script_path = os.path.join(TOOLS_DIR, script_argument)
|
|
potential_script_path_abs = os.path.abspath(potential_script_path)
|
|
|
|
if os.path.isfile(potential_script_path_abs):
|
|
resolved_script_path_abs = potential_script_path_abs
|
|
working_dir_abs = os.path.dirname(resolved_script_path_abs)
|
|
final_command[1] = resolved_script_path_abs
|
|
is_available = True
|
|
logger.debug(
|
|
f"Local tool '{tool_id}': Found script argument '{script_argument}' "
|
|
f"relative to TOOLS_DIR ('{TOOLS_DIR}').\n"
|
|
f" Absolute Script Path: {resolved_script_path_abs}\n"
|
|
f" Working Directory: {working_dir_abs}\n"
|
|
f" Updated Command: {final_command}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Local tool '{tool_id}': Script argument '{script_argument}' "
|
|
f"from run_command[1] not found at resolved path: "
|
|
f"'{potential_script_path_abs}'. Tool marked as unavailable."
|
|
)
|
|
is_available = False
|
|
elif not script_argument:
|
|
# Command itself is a script relative to TOOLS_DIR (e.g. "my_tool_scripts/run.bat")
|
|
potential_script_path = os.path.join(TOOLS_DIR, executable_or_script)
|
|
potential_script_path_abs = os.path.abspath(potential_script_path)
|
|
|
|
if os.path.isfile(potential_script_path_abs):
|
|
resolved_script_path_abs = potential_script_path_abs
|
|
working_dir_abs = os.path.dirname(resolved_script_path_abs)
|
|
final_command[0] = resolved_script_path_abs
|
|
is_available = True
|
|
logger.debug(
|
|
f"Local tool '{tool_id}': Found command '{executable_or_script}' "
|
|
f"as a script relative to TOOLS_DIR ('{TOOLS_DIR}').\n"
|
|
f" Absolute Script Path: {resolved_script_path_abs}\n"
|
|
f" Working Directory: {working_dir_abs}\n"
|
|
f" Updated Command: {final_command}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Local tool '{tool_id}': Command '{executable_or_script}' "
|
|
f"not in PATH and not found as relative script in TOOLS_DIR ('{TOOLS_DIR}'). "
|
|
f"Tool marked as unavailable."
|
|
)
|
|
is_available = False
|
|
|
|
if is_available and entry.parameters is not None:
|
|
if isinstance(entry.parameters, list):
|
|
logger.debug(
|
|
f"Local tool '{tool_id}': Found inline parameter definitions."
|
|
)
|
|
tool_params = _parse_parameters(entry.parameters)
|
|
else:
|
|
logger.warning(
|
|
f"Local tool '{tool_id}': 'parameters' field is defined but not a list. Ignoring."
|
|
)
|
|
|
|
elif entry.type == "git":
|
|
# MANAGED_TOOLS_DIR is now correctly exe_dir/managed_tools (frozen) or repo_root/managed_tools (script)
|
|
local_dir = entry.local_dir_name if entry.local_dir_name else tool_id
|
|
local_repo_path_abs = os.path.abspath(
|
|
os.path.join(MANAGED_TOOLS_DIR, local_dir)
|
|
)
|
|
working_dir_abs = local_repo_path_abs
|
|
|
|
if os.path.isdir(local_repo_path_abs):
|
|
is_available = True
|
|
logger.debug(
|
|
f"Git tool '{tool_id}': Repository directory exists at: {local_repo_path_abs}. "
|
|
f"Working directory set."
|
|
)
|
|
if entry.parameters_definition_file:
|
|
param_file_rel_path = entry.parameters_definition_file
|
|
# Parameter file is relative to the root of the *cloned Git repository*
|
|
param_file_abs_path = os.path.join(
|
|
local_repo_path_abs, param_file_rel_path
|
|
)
|
|
logger.debug(
|
|
f"Git tool '{tool_id}': Attempting to load parameters from file: {param_file_abs_path}"
|
|
)
|
|
tool_params = _load_parameters_from_file(param_file_abs_path)
|
|
else:
|
|
logger.debug(
|
|
f"Git tool '{tool_id}': No parameter definition file specified."
|
|
)
|
|
else:
|
|
is_available = False
|
|
logger.warning(
|
|
f"Git tool '{tool_id}': Expected repository directory not found at "
|
|
f"'{local_repo_path_abs}'. Tool needs cloning/update. Marked as unavailable."
|
|
)
|
|
else:
|
|
logger.error(
|
|
f"Skipping tool '{tool_id}': Unknown tool type '{entry.type}'. Supported: 'local', 'git'."
|
|
)
|
|
continue
|
|
|
|
if is_available and working_dir_abs is not None:
|
|
if not os.path.isabs(working_dir_abs):
|
|
logger.error(
|
|
f"Internal Error: Working directory '{working_dir_abs}' for tool '{tool_id}' is not absolute. Skipping."
|
|
)
|
|
continue
|
|
try:
|
|
# logger.debug(f"Creating ToolInfo for '{tool_id}' with details...") # Already logged
|
|
tool_info = ToolInfo(
|
|
id=tool_id,
|
|
display_name=entry.display_name,
|
|
description=entry.description or "",
|
|
command=final_command,
|
|
working_dir=working_dir_abs,
|
|
parameters=tool_params,
|
|
version=entry.version,
|
|
has_gui=entry.has_gui,
|
|
)
|
|
discovered_tools[tool_id] = tool_info
|
|
logger.info(
|
|
f"Successfully discovered and processed tool: '{tool_info.display_name}' "
|
|
f"(ID: {tool_id}, Type: {entry.type})"
|
|
)
|
|
except (ValueError, TypeError) as info_err:
|
|
logger.error(
|
|
f"Failed to create ToolInfo for '{tool_id}': {info_err}. Skipping.",
|
|
exc_info=True,
|
|
)
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Unexpected error creating ToolInfo for '{tool_id}': {e}. Skipping."
|
|
)
|
|
else:
|
|
if entry.enabled:
|
|
logger.debug(
|
|
f"Tool '{tool_id}' processed but marked unavailable "
|
|
f"(is_available={is_available}, working_dir_abs='{working_dir_abs}'). "
|
|
f"Not added to discovered tools."
|
|
)
|
|
|
|
logger.info(
|
|
f"Tool discovery finished. Found {len(discovered_tools)} enabled/available tools."
|
|
)
|
|
return discovered_tools
|