336 lines
14 KiB
Python
336 lines
14 KiB
Python
# projectutility/core/registry_manager.py
|
|
|
|
import json
|
|
import os
|
|
import logging
|
|
import dataclasses
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
# --- Internal Imports ---
|
|
# Use relative import for modules within the same package ('core')
|
|
try:
|
|
from .registry_models import ToolRegistryEntry
|
|
except ImportError:
|
|
# This path might occur if the module is run directly or packaging is wrong.
|
|
# Add a fallback or clearer error for robustness if needed.
|
|
logging.getLogger(__name__).critical(
|
|
"Failed to import .registry_models. Registry operations will fail.",
|
|
exc_info=True
|
|
)
|
|
# Define a dummy dataclass to prevent NameErrors later if import fails
|
|
@dataclasses.dataclass
|
|
class ToolRegistryEntry:
|
|
id: str = "dummy_import_failed"
|
|
display_name: str = "Dummy"
|
|
type: str = "local"
|
|
run_command: List[str] = dataclasses.field(default_factory=list)
|
|
has_gui: bool = False
|
|
|
|
# --- Module Constants ---
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Calculate paths relative to this file's location
|
|
try:
|
|
# Path to this file: .../ProjectUtility/projectutility/core/registry_manager.py
|
|
_current_file_path = os.path.abspath(__file__)
|
|
# Path to the 'core' directory: .../ProjectUtility/projectutility/core
|
|
_core_dir = os.path.dirname(_current_file_path)
|
|
# Path to the 'projectutility' package source root: .../ProjectUtility/projectutility
|
|
APP_SOURCE_ROOT = os.path.dirname(_core_dir)
|
|
# Path to the 'config' directory inside the package source root
|
|
CONFIG_DIR = os.path.join(APP_SOURCE_ROOT, "config")
|
|
# Full path to the registry file
|
|
REGISTRY_FILENAME = os.path.join(CONFIG_DIR, "tools_registry.json")
|
|
|
|
logger.debug(f"Registry Manager: Calculated APP_SOURCE_ROOT: {APP_SOURCE_ROOT}")
|
|
logger.debug(f"Registry Manager: Calculated CONFIG_DIR: {CONFIG_DIR}")
|
|
logger.debug(f"Registry Manager: Registry Filename: {REGISTRY_FILENAME}")
|
|
|
|
except Exception as e:
|
|
logger.critical(
|
|
f"Failed to calculate configuration paths in registry_manager: {e}",
|
|
exc_info=True
|
|
)
|
|
# Fallback paths - Application might not work correctly
|
|
CONFIG_DIR = "config"
|
|
REGISTRY_FILENAME = os.path.join(CONFIG_DIR, "tools_registry.json")
|
|
logger.warning(f"Using fallback config path: {CONFIG_DIR}")
|
|
|
|
|
|
# --- Core Functions ---
|
|
|
|
def load_registry() -> List[ToolRegistryEntry]:
|
|
"""
|
|
Loads the tool registry from the JSON file defined by REGISTRY_FILENAME.
|
|
|
|
Reads the file, parses the JSON, validates each entry against the
|
|
ToolRegistryEntry model, and returns a list of valid tool entries.
|
|
|
|
Returns:
|
|
A list of validated ToolRegistryEntry objects. Returns an empty
|
|
list if the file is not found, cannot be parsed, or contains
|
|
no valid entries.
|
|
"""
|
|
if not os.path.isfile(REGISTRY_FILENAME):
|
|
logger.warning(
|
|
f"Registry file not found at calculated path: {REGISTRY_FILENAME}. "
|
|
f"No tools will be loaded from the registry."
|
|
)
|
|
return []
|
|
|
|
raw_data: Any = None
|
|
try:
|
|
with open(REGISTRY_FILENAME, "r", encoding="utf-8") as file_handle:
|
|
raw_data = json.load(file_handle)
|
|
logger.info(f"Successfully read registry file: {REGISTRY_FILENAME}")
|
|
|
|
except json.JSONDecodeError as json_err:
|
|
logger.error(
|
|
f"Failed to parse JSON in registry file {REGISTRY_FILENAME}. "
|
|
f"Error: {json_err}. Please check the file syntax.",
|
|
exc_info=True # Include traceback for JSON errors
|
|
)
|
|
return []
|
|
except IOError as io_err:
|
|
logger.error(
|
|
f"Failed to read registry file {REGISTRY_FILENAME}. "
|
|
f"Error: {io_err}",
|
|
exc_info=True
|
|
)
|
|
return []
|
|
except Exception as e:
|
|
# Catch any other unexpected errors during file load
|
|
logger.exception(
|
|
f"An unexpected error occurred while loading registry file "
|
|
f"{REGISTRY_FILENAME}: {e}"
|
|
)
|
|
return []
|
|
|
|
# Validate the overall structure (should be a list)
|
|
if not isinstance(raw_data, list):
|
|
logger.error(
|
|
f"Invalid format in registry file {REGISTRY_FILENAME}: "
|
|
f"Expected a JSON list (array) at the root, but found type "
|
|
f"'{type(raw_data).__name__}'. Cannot load tools."
|
|
)
|
|
return []
|
|
|
|
# --- Validate Individual Entries ---
|
|
valid_tools: List[ToolRegistryEntry] = []
|
|
# Get expected fields from the dataclass model for validation
|
|
model_fields = {f.name for f in dataclasses.fields(ToolRegistryEntry)}
|
|
required_keys = {
|
|
f.name for f in dataclasses.fields(ToolRegistryEntry)
|
|
if isinstance(f.default, dataclasses._MISSING_TYPE) and \
|
|
isinstance(f.default_factory, dataclasses._MISSING_TYPE)
|
|
}
|
|
# Manually add required keys if they have defaults but are conceptually required
|
|
# (e.g., run_command, has_gui might have defaults but are essential)
|
|
required_keys.update({"id", "display_name", "type", "run_command", "has_gui"})
|
|
logger.debug(f"Required keys for registry entry: {required_keys}")
|
|
|
|
for index, entry_dict in enumerate(raw_data):
|
|
entry_id = entry_dict.get('id', f'Unknown_at_index_{index}') # Get ID for logging
|
|
|
|
if not isinstance(entry_dict, dict):
|
|
logger.warning(
|
|
f"Registry entry at index {index} is not a dictionary (found "
|
|
f"'{type(entry_dict).__name__}'). Skipping."
|
|
)
|
|
continue
|
|
|
|
# Check for required keys first
|
|
missing_keys = required_keys - entry_dict.keys()
|
|
if missing_keys:
|
|
logger.warning(
|
|
f"Registry entry '{entry_id}' (index {index}) is missing "
|
|
f"required keys: {missing_keys}. Skipping."
|
|
)
|
|
continue
|
|
|
|
# --- Basic Type Validation (add more as needed) ---
|
|
validation_errors = []
|
|
if not isinstance(entry_dict["id"], str) or not entry_dict["id"]:
|
|
validation_errors.append("Invalid or empty 'id' (must be a non-empty string).")
|
|
if not isinstance(entry_dict["display_name"], str) or not entry_dict["display_name"]:
|
|
validation_errors.append("Invalid or empty 'display_name' (must be a non-empty string).")
|
|
if not isinstance(entry_dict["type"], str) or entry_dict["type"] not in ["git", "local"]:
|
|
validation_errors.append(f"Invalid 'type': '{entry_dict['type']}'. Must be 'git' or 'local'.")
|
|
if (not isinstance(entry_dict["run_command"], list) or
|
|
not entry_dict["run_command"] or
|
|
not all(isinstance(cmd_part, str) for cmd_part in entry_dict["run_command"])):
|
|
validation_errors.append("Invalid 'run_command' (must be a non-empty list of strings).")
|
|
if not isinstance(entry_dict.get("has_gui"), bool):
|
|
logger.warning(f"Registry entry '{entry_id}': Missing or invalid 'has_gui'. Assuming false.")
|
|
entry_dict["has_gui"] = False # Provide default if missing/invalid
|
|
|
|
# Git-specific validation
|
|
if entry_dict["type"] == "git":
|
|
if not isinstance(entry_dict.get("git_url"), str) or not entry_dict.get("git_url"):
|
|
validation_errors.append("Missing or invalid 'git_url' (required for type 'git').")
|
|
# git_ref defaults in model, no need to validate presence here unless required non-default
|
|
|
|
if validation_errors:
|
|
logger.warning(
|
|
f"Registry entry '{entry_id}' (index {index}) failed validation: "
|
|
f"{'; '.join(validation_errors)}. Skipping."
|
|
)
|
|
continue
|
|
|
|
# --- Attempt to create the dataclass instance ---
|
|
try:
|
|
# Filter the dictionary to only include keys defined in the dataclass model
|
|
# This prevents TypeError if extra keys are present in the JSON
|
|
filtered_dict = {k: v for k, v in entry_dict.items() if k in model_fields}
|
|
|
|
# Create the ToolRegistryEntry object
|
|
tool_entry = ToolRegistryEntry(**filtered_dict)
|
|
valid_tools.append(tool_entry)
|
|
logger.debug(f"Successfully validated and created entry for '{tool_entry.id}'.")
|
|
|
|
except (TypeError, ValueError) as creation_err:
|
|
# Catch errors during dataclass instantiation (e.g., wrong type after filtering)
|
|
logger.error(
|
|
f"Failed to create ToolRegistryEntry object for entry '{entry_id}' "
|
|
f"(index {index}) due to type/value mismatch: {creation_err}. Skipping.",
|
|
exc_info=True,
|
|
)
|
|
except Exception as e:
|
|
# Catch any other unexpected errors during object creation
|
|
logger.exception(
|
|
f"Unexpected error creating ToolRegistryEntry for '{entry_id}' "
|
|
f"(index {index}): {e}. Skipping."
|
|
)
|
|
|
|
logger.info(f"Loaded {len(valid_tools)} valid tool entries from the registry.")
|
|
return valid_tools
|
|
|
|
|
|
def save_registry(tools: List[ToolRegistryEntry]) -> bool:
|
|
"""
|
|
Saves the provided list of tool entries back to the JSON registry file.
|
|
|
|
Overwrites the existing file using an atomic write operation (write to
|
|
a temporary file, then replace the original) to prevent data corruption.
|
|
|
|
Args:
|
|
tools: A list of ToolRegistryEntry objects to be saved.
|
|
|
|
Returns:
|
|
True if the registry was saved successfully, False otherwise.
|
|
"""
|
|
logger.info(
|
|
f"Attempting to save {len(tools)} tool entries to registry file: "
|
|
f"{REGISTRY_FILENAME}"
|
|
)
|
|
|
|
# Convert list of dataclass objects to a list of dictionaries
|
|
data_to_save: List[Dict[str, Any]]
|
|
try:
|
|
data_to_save = [dataclasses.asdict(tool) for tool in tools]
|
|
except Exception as e:
|
|
# This should be unlikely if 'tools' contains valid dataclass instances
|
|
logger.exception(
|
|
f"Failed to convert tool registry entries to dictionary format: {e}. "
|
|
f"Cannot save registry."
|
|
)
|
|
return False
|
|
|
|
# Ensure the configuration directory exists before writing
|
|
try:
|
|
# exist_ok=True prevents an error if the directory already exists
|
|
os.makedirs(CONFIG_DIR, exist_ok=True)
|
|
logger.debug(f"Ensured config directory exists: {CONFIG_DIR}")
|
|
except OSError as e:
|
|
logger.error(
|
|
f"Failed to create config directory '{CONFIG_DIR}'. "
|
|
f"Cannot save registry file. Error: {e}",
|
|
exc_info=True
|
|
)
|
|
return False
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Unexpected error ensuring config directory '{CONFIG_DIR}' exists: {e}. "
|
|
f"Cannot save registry."
|
|
)
|
|
return False
|
|
|
|
|
|
# --- Atomic Write Operation ---
|
|
# Write to a temporary file first in the same directory
|
|
temp_filename = REGISTRY_FILENAME + ".tmp"
|
|
try:
|
|
with open(temp_filename, "w", encoding="utf-8") as temp_file:
|
|
# Use indent for readability, ensure_ascii=False for proper UTF-8 chars
|
|
json.dump(data_to_save, temp_file, indent=4, ensure_ascii=False)
|
|
logger.debug(f"Successfully wrote registry data to temporary file: {temp_filename}")
|
|
|
|
# If write succeeded, replace the original file with the temporary file
|
|
# os.replace is atomic on most modern systems
|
|
os.replace(temp_filename, REGISTRY_FILENAME)
|
|
logger.info(f"Registry saved successfully to {REGISTRY_FILENAME}")
|
|
return True
|
|
|
|
except (TypeError, ValueError) as json_err:
|
|
# Error during JSON serialization
|
|
logger.error(
|
|
f"Failed to serialize registry data to JSON format: {json_err}. "
|
|
f"Registry NOT saved.",
|
|
exc_info=True,
|
|
)
|
|
except (IOError, OSError) as file_err:
|
|
# Error during file writing or replacing
|
|
logger.error(
|
|
f"Failed to write or replace registry file {REGISTRY_FILENAME}: {file_err}. "
|
|
f"Registry NOT saved.",
|
|
exc_info=True,
|
|
)
|
|
except Exception as e:
|
|
# Catch any other unexpected errors during save
|
|
logger.exception(
|
|
f"An unexpected error occurred while saving the registry file "
|
|
f"{REGISTRY_FILENAME}: {e}. Registry NOT saved."
|
|
)
|
|
|
|
# --- Cleanup temporary file on failure ---
|
|
finally:
|
|
if os.path.exists(temp_filename):
|
|
try:
|
|
os.remove(temp_filename)
|
|
logger.debug(f"Removed temporary file after error: {temp_filename}")
|
|
except OSError as rm_err:
|
|
logger.error(
|
|
f"Failed to remove temporary registry file '{temp_filename}' "
|
|
f"after a save error: {rm_err}"
|
|
)
|
|
|
|
return False # Return False if any error occurred
|
|
|
|
|
|
def get_tool_config(
|
|
tool_id: str, registry_list: List[ToolRegistryEntry]
|
|
) -> Optional[ToolRegistryEntry]:
|
|
"""
|
|
Finds and returns the configuration for a specific tool ID from a list
|
|
of loaded registry entries.
|
|
|
|
Args:
|
|
tool_id: The unique identifier (string) of the tool to find.
|
|
registry_list: The list of loaded ToolRegistryEntry objects (typically
|
|
the result of calling load_registry()).
|
|
|
|
Returns:
|
|
The matching ToolRegistryEntry object if found in the list,
|
|
otherwise None.
|
|
"""
|
|
if not tool_id: # Handle empty tool_id case
|
|
logger.warning("get_tool_config called with an empty tool_id.")
|
|
return None
|
|
|
|
for tool_entry in registry_list:
|
|
if tool_entry.id == tool_id:
|
|
logger.debug(f"Found configuration for tool ID: {tool_id}")
|
|
return tool_entry
|
|
|
|
logger.debug(f"Configuration not found for tool ID: {tool_id} in the provided list.")
|
|
return None # Return None if no match was found |