354 lines
13 KiB
Python
354 lines
13 KiB
Python
# projectutility/core/registry_manager.py
|
|
|
|
import json
|
|
import os
|
|
import sys # Import sys to check for frozen attribute
|
|
import logging
|
|
import dataclasses
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
# --- Internal Imports ---
|
|
try:
|
|
from .registry_models import ToolRegistryEntry
|
|
except ImportError:
|
|
logging.getLogger(__name__).critical(
|
|
"Failed to import .registry_models. Registry operations will fail.",
|
|
exc_info=True,
|
|
)
|
|
|
|
@dataclasses.dataclass
|
|
class ToolRegistryEntry:
|
|
id: str = "dummy_import_failed"
|
|
display_name: str = "Dummy"
|
|
type: str = "local"
|
|
run_command: List[str] = dataclasses.field(default_factory=list)
|
|
has_gui: bool = False
|
|
|
|
|
|
# --- Module Constants ---
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Determine if Running as Frozen Executable ---
|
|
IS_FROZEN = getattr(sys, "frozen", False)
|
|
|
|
# --- Calculate Paths ---
|
|
# This logic determines where tools_registry.json and other config files reside.
|
|
# It mirrors the logic in __main__.py for CONFIG_DIR_MAIN for consistency.
|
|
if IS_FROZEN:
|
|
# In frozen mode, config files managed by the app (like user-modifiable registry)
|
|
# should be relative to the executable's directory.
|
|
_config_base_path = os.path.dirname(sys.executable)
|
|
CONFIG_DIR = os.path.join(_config_base_path, "config")
|
|
logger.debug(
|
|
f"Registry Manager (FROZEN): Base path for config: {_config_base_path}"
|
|
)
|
|
else:
|
|
# In script mode, paths are relative to this file's location within the package structure.
|
|
# Path to this file: .../ProjectUtility/projectutility/core/registry_manager.py
|
|
_current_file_path = os.path.abspath(__file__)
|
|
# Path to the 'core' directory: .../ProjectUtility/projectutility/core
|
|
_core_dir = os.path.dirname(_current_file_path)
|
|
# Path to the 'projectutility' package source root: .../ProjectUtility/projectutility
|
|
_app_source_root = os.path.dirname(_core_dir)
|
|
CONFIG_DIR = os.path.join(
|
|
_app_source_root, "config"
|
|
) # Config dir inside the package source
|
|
logger.debug(
|
|
f"Registry Manager (SCRIPT): Package source root for config: {_app_source_root}"
|
|
)
|
|
|
|
REGISTRY_FILENAME = os.path.join(CONFIG_DIR, "tools_registry.json")
|
|
|
|
logger.debug(f"Registry Manager: Final CONFIG_DIR: {CONFIG_DIR}")
|
|
logger.debug(f"Registry Manager: Final REGISTRY_FILENAME: {REGISTRY_FILENAME}")
|
|
|
|
|
|
# --- Core Functions ---
|
|
|
|
|
|
def load_registry() -> List[ToolRegistryEntry]:
|
|
"""
|
|
Loads the tool registry from the JSON file defined by REGISTRY_FILENAME.
|
|
|
|
Reads the file, parses the JSON, validates each entry against the
|
|
ToolRegistryEntry model, and returns a list of valid tool entries.
|
|
|
|
Returns:
|
|
A list of validated ToolRegistryEntry objects. Returns an empty
|
|
list if the file is not found, cannot be parsed, or contains
|
|
no valid entries.
|
|
"""
|
|
if not os.path.isfile(REGISTRY_FILENAME):
|
|
logger.warning(
|
|
f"Registry file not found at calculated path: {REGISTRY_FILENAME}. "
|
|
f"This might be expected on first run if the file is user-generated "
|
|
f"or if PyInstaller did not bundle a default one to this location."
|
|
)
|
|
# If you provide a default tools_registry.json with PyInstaller (e.g., via --add-data),
|
|
# ensure it's copied to the correct `CONFIG_DIR` relative to the executable.
|
|
# Or, handle creating a default one here if it doesn't exist.
|
|
return []
|
|
|
|
raw_data: Any = None
|
|
try:
|
|
with open(REGISTRY_FILENAME, "r", encoding="utf-8") as file_handle:
|
|
raw_data = json.load(file_handle)
|
|
logger.info(f"Successfully read registry file: {REGISTRY_FILENAME}")
|
|
|
|
except json.JSONDecodeError as json_err:
|
|
logger.error(
|
|
f"Failed to parse JSON in registry file {REGISTRY_FILENAME}. "
|
|
f"Error: {json_err}. Please check the file syntax.",
|
|
exc_info=True, # Include traceback for JSON errors
|
|
)
|
|
return []
|
|
except IOError as io_err:
|
|
logger.error(
|
|
f"Failed to read registry file {REGISTRY_FILENAME}. " f"Error: {io_err}",
|
|
exc_info=True,
|
|
)
|
|
return []
|
|
except Exception as e:
|
|
# Catch any other unexpected errors during file load
|
|
logger.exception(
|
|
f"An unexpected error occurred while loading registry file "
|
|
f"{REGISTRY_FILENAME}: {e}"
|
|
)
|
|
return []
|
|
|
|
# Validate the overall structure (should be a list)
|
|
if not isinstance(raw_data, list):
|
|
logger.error(
|
|
f"Invalid format in registry file {REGISTRY_FILENAME}: "
|
|
f"Expected a JSON list (array) at the root, but found type "
|
|
f"'{type(raw_data).__name__}'. Cannot load tools."
|
|
)
|
|
return []
|
|
|
|
# --- Validate Individual Entries ---
|
|
valid_tools: List[ToolRegistryEntry] = []
|
|
# Get expected fields from the dataclass model for validation
|
|
model_fields = {f.name for f in dataclasses.fields(ToolRegistryEntry)}
|
|
required_keys = {
|
|
f.name
|
|
for f in dataclasses.fields(ToolRegistryEntry)
|
|
if isinstance(f.default, dataclasses._MISSING_TYPE)
|
|
and isinstance(f.default_factory, dataclasses._MISSING_TYPE)
|
|
}
|
|
# Manually add required keys if they have defaults but are conceptually required
|
|
required_keys.update({"id", "display_name", "type", "run_command", "has_gui"})
|
|
# logger.debug(f"Required keys for registry entry: {required_keys}") # Already logged in previous version
|
|
|
|
for index, entry_dict in enumerate(raw_data):
|
|
entry_id = entry_dict.get(
|
|
"id", f"Unknown_at_index_{index}"
|
|
) # Get ID for logging
|
|
|
|
if not isinstance(entry_dict, dict):
|
|
logger.warning(
|
|
f"Registry entry at index {index} is not a dictionary (found "
|
|
f"'{type(entry_dict).__name__}'). Skipping."
|
|
)
|
|
continue
|
|
|
|
# Check for required keys first
|
|
missing_keys = required_keys - entry_dict.keys()
|
|
if missing_keys:
|
|
logger.warning(
|
|
f"Registry entry '{entry_id}' (index {index}) is missing "
|
|
f"required keys: {missing_keys}. Skipping."
|
|
)
|
|
continue
|
|
|
|
# --- Basic Type Validation (add more as needed) ---
|
|
validation_errors = []
|
|
if not isinstance(entry_dict["id"], str) or not entry_dict["id"]:
|
|
validation_errors.append(
|
|
"Invalid or empty 'id' (must be a non-empty string)."
|
|
)
|
|
if (
|
|
not isinstance(entry_dict["display_name"], str)
|
|
or not entry_dict["display_name"]
|
|
):
|
|
validation_errors.append(
|
|
"Invalid or empty 'display_name' (must be a non-empty string)."
|
|
)
|
|
if not isinstance(entry_dict["type"], str) or entry_dict["type"] not in [
|
|
"git",
|
|
"local",
|
|
]:
|
|
validation_errors.append(
|
|
f"Invalid 'type': '{entry_dict['type']}'. Must be 'git' or 'local'."
|
|
)
|
|
if (
|
|
not isinstance(entry_dict["run_command"], list)
|
|
or not entry_dict["run_command"]
|
|
or not all(
|
|
isinstance(cmd_part, str) for cmd_part in entry_dict["run_command"]
|
|
)
|
|
):
|
|
validation_errors.append(
|
|
"Invalid 'run_command' (must be a non-empty list of strings)."
|
|
)
|
|
if not isinstance(entry_dict.get("has_gui"), bool):
|
|
logger.warning(
|
|
f"Registry entry '{entry_id}': Missing or invalid 'has_gui'. Assuming false."
|
|
)
|
|
entry_dict["has_gui"] = False # Provide default if missing/invalid
|
|
|
|
# Git-specific validation
|
|
if entry_dict["type"] == "git":
|
|
if not isinstance(entry_dict.get("git_url"), str) or not entry_dict.get(
|
|
"git_url"
|
|
):
|
|
validation_errors.append(
|
|
"Missing or invalid 'git_url' (required for type 'git')."
|
|
)
|
|
|
|
if validation_errors:
|
|
logger.warning(
|
|
f"Registry entry '{entry_id}' (index {index}) failed validation: "
|
|
f"{'; '.join(validation_errors)}. Skipping."
|
|
)
|
|
continue
|
|
|
|
try:
|
|
filtered_dict = {k: v for k, v in entry_dict.items() if k in model_fields}
|
|
tool_entry = ToolRegistryEntry(**filtered_dict)
|
|
valid_tools.append(tool_entry)
|
|
# logger.debug(f"Successfully validated and created entry for '{tool_entry.id}'.") # Already logged
|
|
|
|
except (TypeError, ValueError) as creation_err:
|
|
logger.error(
|
|
f"Failed to create ToolRegistryEntry object for entry '{entry_id}' "
|
|
f"(index {index}) due to type/value mismatch: {creation_err}. Skipping.",
|
|
exc_info=True,
|
|
)
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Unexpected error creating ToolRegistryEntry for '{entry_id}' "
|
|
f"(index {index}): {e}. Skipping."
|
|
)
|
|
|
|
logger.info(f"Loaded {len(valid_tools)} valid tool entries from the registry.")
|
|
return valid_tools
|
|
|
|
|
|
def save_registry(tools: List[ToolRegistryEntry]) -> bool:
|
|
"""
|
|
Saves the provided list of tool entries back to the JSON registry file.
|
|
|
|
Overwrites the existing file using an atomic write operation (write to
|
|
a temporary file, then replace the original) to prevent data corruption.
|
|
|
|
Args:
|
|
tools: A list of ToolRegistryEntry objects to be saved.
|
|
|
|
Returns:
|
|
True if the registry was saved successfully, False otherwise.
|
|
"""
|
|
logger.info(
|
|
f"Attempting to save {len(tools)} tool entries to registry file: "
|
|
f"{REGISTRY_FILENAME}"
|
|
)
|
|
|
|
data_to_save: List[Dict[str, Any]]
|
|
try:
|
|
data_to_save = [dataclasses.asdict(tool) for tool in tools]
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Failed to convert tool registry entries to dictionary format: {e}. "
|
|
f"Cannot save registry."
|
|
)
|
|
return False
|
|
|
|
# Ensure the configuration directory exists before writing
|
|
try:
|
|
os.makedirs(CONFIG_DIR, exist_ok=True) # CONFIG_DIR is now frozen-aware
|
|
logger.debug(f"Ensured config directory exists: {CONFIG_DIR}")
|
|
except OSError as e:
|
|
logger.error(
|
|
f"Failed to create config directory '{CONFIG_DIR}'. "
|
|
f"Cannot save registry file. Error: {e}",
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Unexpected error ensuring config directory '{CONFIG_DIR}' exists: {e}. "
|
|
f"Cannot save registry."
|
|
)
|
|
return False
|
|
|
|
temp_filename = REGISTRY_FILENAME + ".tmp"
|
|
try:
|
|
with open(temp_filename, "w", encoding="utf-8") as temp_file:
|
|
json.dump(data_to_save, temp_file, indent=4, ensure_ascii=False)
|
|
logger.debug(
|
|
f"Successfully wrote registry data to temporary file: {temp_filename}"
|
|
)
|
|
|
|
os.replace(temp_filename, REGISTRY_FILENAME)
|
|
logger.info(f"Registry saved successfully to {REGISTRY_FILENAME}")
|
|
return True
|
|
|
|
except (TypeError, ValueError) as json_err:
|
|
logger.error(
|
|
f"Failed to serialize registry data to JSON format: {json_err}. "
|
|
f"Registry NOT saved.",
|
|
exc_info=True,
|
|
)
|
|
except (IOError, OSError) as file_err:
|
|
logger.error(
|
|
f"Failed to write or replace registry file {REGISTRY_FILENAME}: {file_err}. "
|
|
f"Registry NOT saved.",
|
|
exc_info=True,
|
|
)
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"An unexpected error occurred while saving the registry file "
|
|
f"{REGISTRY_FILENAME}: {e}. Registry NOT saved."
|
|
)
|
|
finally:
|
|
if os.path.exists(temp_filename):
|
|
try:
|
|
os.remove(temp_filename)
|
|
logger.debug(f"Removed temporary file after error: {temp_filename}")
|
|
except OSError as rm_err:
|
|
logger.error(
|
|
f"Failed to remove temporary registry file '{temp_filename}' "
|
|
f"after a save error: {rm_err}"
|
|
)
|
|
return False
|
|
|
|
|
|
def get_tool_config(
|
|
tool_id: str, registry_list: List[ToolRegistryEntry]
|
|
) -> Optional[ToolRegistryEntry]:
|
|
"""
|
|
Finds and returns the configuration for a specific tool ID from a list
|
|
of loaded registry entries.
|
|
|
|
Args:
|
|
tool_id: The unique identifier (string) of the tool to find.
|
|
registry_list: The list of loaded ToolRegistryEntry objects (typically
|
|
the result of calling load_registry()).
|
|
|
|
Returns:
|
|
The matching ToolRegistryEntry object if found in the list,
|
|
otherwise None.
|
|
"""
|
|
if not tool_id:
|
|
logger.warning("get_tool_config called with an empty tool_id.")
|
|
return None
|
|
|
|
for tool_entry in registry_list:
|
|
if tool_entry.id == tool_id:
|
|
# logger.debug(f"Found configuration for tool ID: {tool_id}") # Already logged
|
|
return tool_entry
|
|
|
|
logger.debug(
|
|
f"Configuration not found for tool ID: {tool_id} in the provided list."
|
|
)
|
|
return None
|