# projectutility/core/registry_manager.py import json import os import sys # Import sys to check for frozen attribute import logging import dataclasses from typing import List, Optional, Dict, Any # --- Internal Imports --- try: from .registry_models import ToolRegistryEntry except ImportError: logging.getLogger(__name__).critical( "Failed to import .registry_models. Registry operations will fail.", exc_info=True, ) @dataclasses.dataclass class ToolRegistryEntry: id: str = "dummy_import_failed" display_name: str = "Dummy" type: str = "local" run_command: List[str] = dataclasses.field(default_factory=list) has_gui: bool = False # --- Module Constants --- logger = logging.getLogger(__name__) # --- Determine if Running as Frozen Executable --- IS_FROZEN = getattr(sys, "frozen", False) # --- Calculate Paths --- # This logic determines where tools_registry.json and other config files reside. # It mirrors the logic in __main__.py for CONFIG_DIR_MAIN for consistency. if IS_FROZEN: # In frozen mode, config files managed by the app (like user-modifiable registry) # should be relative to the executable's directory. _config_base_path = os.path.dirname(sys.executable) CONFIG_DIR = os.path.join(_config_base_path, "config") logger.debug( f"Registry Manager (FROZEN): Base path for config: {_config_base_path}" ) else: # In script mode, paths are relative to this file's location within the package structure. # Path to this file: .../ProjectUtility/projectutility/core/registry_manager.py _current_file_path = os.path.abspath(__file__) # Path to the 'core' directory: .../ProjectUtility/projectutility/core _core_dir = os.path.dirname(_current_file_path) # Path to the 'projectutility' package source root: .../ProjectUtility/projectutility _app_source_root = os.path.dirname(_core_dir) CONFIG_DIR = os.path.join( _app_source_root, "config" ) # Config dir inside the package source logger.debug( f"Registry Manager (SCRIPT): Package source root for config: {_app_source_root}" ) REGISTRY_FILENAME = os.path.join(CONFIG_DIR, "tools_registry.json") logger.debug(f"Registry Manager: Final CONFIG_DIR: {CONFIG_DIR}") logger.debug(f"Registry Manager: Final REGISTRY_FILENAME: {REGISTRY_FILENAME}") # --- Core Functions --- def load_registry() -> List[ToolRegistryEntry]: """ Loads the tool registry from the JSON file defined by REGISTRY_FILENAME. Reads the file, parses the JSON, validates each entry against the ToolRegistryEntry model, and returns a list of valid tool entries. Returns: A list of validated ToolRegistryEntry objects. Returns an empty list if the file is not found, cannot be parsed, or contains no valid entries. """ if not os.path.isfile(REGISTRY_FILENAME): logger.warning( f"Registry file not found at calculated path: {REGISTRY_FILENAME}. " f"This might be expected on first run if the file is user-generated " f"or if PyInstaller did not bundle a default one to this location." ) # If you provide a default tools_registry.json with PyInstaller (e.g., via --add-data), # ensure it's copied to the correct `CONFIG_DIR` relative to the executable. # Or, handle creating a default one here if it doesn't exist. return [] raw_data: Any = None try: with open(REGISTRY_FILENAME, "r", encoding="utf-8") as file_handle: raw_data = json.load(file_handle) logger.info(f"Successfully read registry file: {REGISTRY_FILENAME}") except json.JSONDecodeError as json_err: logger.error( f"Failed to parse JSON in registry file {REGISTRY_FILENAME}. " f"Error: {json_err}. Please check the file syntax.", exc_info=True, # Include traceback for JSON errors ) return [] except IOError as io_err: logger.error( f"Failed to read registry file {REGISTRY_FILENAME}. " f"Error: {io_err}", exc_info=True, ) return [] except Exception as e: # Catch any other unexpected errors during file load logger.exception( f"An unexpected error occurred while loading registry file " f"{REGISTRY_FILENAME}: {e}" ) return [] # Validate the overall structure (should be a list) if not isinstance(raw_data, list): logger.error( f"Invalid format in registry file {REGISTRY_FILENAME}: " f"Expected a JSON list (array) at the root, but found type " f"'{type(raw_data).__name__}'. Cannot load tools." ) return [] # --- Validate Individual Entries --- valid_tools: List[ToolRegistryEntry] = [] # Get expected fields from the dataclass model for validation model_fields = {f.name for f in dataclasses.fields(ToolRegistryEntry)} required_keys = { f.name for f in dataclasses.fields(ToolRegistryEntry) if isinstance(f.default, dataclasses._MISSING_TYPE) and isinstance(f.default_factory, dataclasses._MISSING_TYPE) } # Manually add required keys if they have defaults but are conceptually required required_keys.update({"id", "display_name", "type", "run_command", "has_gui"}) # logger.debug(f"Required keys for registry entry: {required_keys}") # Already logged in previous version for index, entry_dict in enumerate(raw_data): entry_id = entry_dict.get( "id", f"Unknown_at_index_{index}" ) # Get ID for logging if not isinstance(entry_dict, dict): logger.warning( f"Registry entry at index {index} is not a dictionary (found " f"'{type(entry_dict).__name__}'). Skipping." ) continue # Check for required keys first missing_keys = required_keys - entry_dict.keys() if missing_keys: logger.warning( f"Registry entry '{entry_id}' (index {index}) is missing " f"required keys: {missing_keys}. Skipping." ) continue # --- Basic Type Validation (add more as needed) --- validation_errors = [] if not isinstance(entry_dict["id"], str) or not entry_dict["id"]: validation_errors.append( "Invalid or empty 'id' (must be a non-empty string)." ) if ( not isinstance(entry_dict["display_name"], str) or not entry_dict["display_name"] ): validation_errors.append( "Invalid or empty 'display_name' (must be a non-empty string)." ) if not isinstance(entry_dict["type"], str) or entry_dict["type"] not in [ "git", "local", ]: validation_errors.append( f"Invalid 'type': '{entry_dict['type']}'. Must be 'git' or 'local'." ) if ( not isinstance(entry_dict["run_command"], list) or not entry_dict["run_command"] or not all( isinstance(cmd_part, str) for cmd_part in entry_dict["run_command"] ) ): validation_errors.append( "Invalid 'run_command' (must be a non-empty list of strings)." ) if not isinstance(entry_dict.get("has_gui"), bool): logger.warning( f"Registry entry '{entry_id}': Missing or invalid 'has_gui'. Assuming false." ) entry_dict["has_gui"] = False # Provide default if missing/invalid # Git-specific validation if entry_dict["type"] == "git": if not isinstance(entry_dict.get("git_url"), str) or not entry_dict.get( "git_url" ): validation_errors.append( "Missing or invalid 'git_url' (required for type 'git')." ) if validation_errors: logger.warning( f"Registry entry '{entry_id}' (index {index}) failed validation: " f"{'; '.join(validation_errors)}. Skipping." ) continue try: filtered_dict = {k: v for k, v in entry_dict.items() if k in model_fields} tool_entry = ToolRegistryEntry(**filtered_dict) valid_tools.append(tool_entry) # logger.debug(f"Successfully validated and created entry for '{tool_entry.id}'.") # Already logged except (TypeError, ValueError) as creation_err: logger.error( f"Failed to create ToolRegistryEntry object for entry '{entry_id}' " f"(index {index}) due to type/value mismatch: {creation_err}. Skipping.", exc_info=True, ) except Exception as e: logger.exception( f"Unexpected error creating ToolRegistryEntry for '{entry_id}' " f"(index {index}): {e}. Skipping." ) logger.info(f"Loaded {len(valid_tools)} valid tool entries from the registry.") return valid_tools def save_registry(tools: List[ToolRegistryEntry]) -> bool: """ Saves the provided list of tool entries back to the JSON registry file. Overwrites the existing file using an atomic write operation (write to a temporary file, then replace the original) to prevent data corruption. Args: tools: A list of ToolRegistryEntry objects to be saved. Returns: True if the registry was saved successfully, False otherwise. """ logger.info( f"Attempting to save {len(tools)} tool entries to registry file: " f"{REGISTRY_FILENAME}" ) data_to_save: List[Dict[str, Any]] try: data_to_save = [dataclasses.asdict(tool) for tool in tools] except Exception as e: logger.exception( f"Failed to convert tool registry entries to dictionary format: {e}. " f"Cannot save registry." ) return False # Ensure the configuration directory exists before writing try: os.makedirs(CONFIG_DIR, exist_ok=True) # CONFIG_DIR is now frozen-aware logger.debug(f"Ensured config directory exists: {CONFIG_DIR}") except OSError as e: logger.error( f"Failed to create config directory '{CONFIG_DIR}'. " f"Cannot save registry file. Error: {e}", exc_info=True, ) return False except Exception as e: logger.exception( f"Unexpected error ensuring config directory '{CONFIG_DIR}' exists: {e}. " f"Cannot save registry." ) return False temp_filename = REGISTRY_FILENAME + ".tmp" try: with open(temp_filename, "w", encoding="utf-8") as temp_file: json.dump(data_to_save, temp_file, indent=4, ensure_ascii=False) logger.debug( f"Successfully wrote registry data to temporary file: {temp_filename}" ) os.replace(temp_filename, REGISTRY_FILENAME) logger.info(f"Registry saved successfully to {REGISTRY_FILENAME}") return True except (TypeError, ValueError) as json_err: logger.error( f"Failed to serialize registry data to JSON format: {json_err}. " f"Registry NOT saved.", exc_info=True, ) except (IOError, OSError) as file_err: logger.error( f"Failed to write or replace registry file {REGISTRY_FILENAME}: {file_err}. " f"Registry NOT saved.", exc_info=True, ) except Exception as e: logger.exception( f"An unexpected error occurred while saving the registry file " f"{REGISTRY_FILENAME}: {e}. Registry NOT saved." ) finally: if os.path.exists(temp_filename): try: os.remove(temp_filename) logger.debug(f"Removed temporary file after error: {temp_filename}") except OSError as rm_err: logger.error( f"Failed to remove temporary registry file '{temp_filename}' " f"after a save error: {rm_err}" ) return False def get_tool_config( tool_id: str, registry_list: List[ToolRegistryEntry] ) -> Optional[ToolRegistryEntry]: """ Finds and returns the configuration for a specific tool ID from a list of loaded registry entries. Args: tool_id: The unique identifier (string) of the tool to find. registry_list: The list of loaded ToolRegistryEntry objects (typically the result of calling load_registry()). Returns: The matching ToolRegistryEntry object if found in the list, otherwise None. """ if not tool_id: logger.warning("get_tool_config called with an empty tool_id.") return None for tool_entry in registry_list: if tool_entry.id == tool_id: # logger.debug(f"Found configuration for tool ID: {tool_id}") # Already logged return tool_entry logger.debug( f"Configuration not found for tool ID: {tool_id} in the provided list." ) return None