# dependency_analyzer/core/package_manager.py """ Handles Python package management tasks. This module includes functions for: - Generating and reading requirements.txt files. - Downloading packages for offline installation using pip. - Creating installation scripts (.bat, .sh). - Comparing project requirements with the currently installed packages. - Updating packages in the local environment. """ import json import logging import os import re import shutil import subprocess import sys from pathlib import Path from typing import Dict, List, Optional, Tuple from .analyzer import DependencyInfo # Import type alias for clarity # --- Logger Configuration --- logger = logging.getLogger(__name__) # --- Optional 'packaging' library import for robust version/specifier parsing --- try: from packaging.requirements import Requirement from packaging.version import InvalidVersion, parse as parse_version PACKAGING_AVAILABLE = True logger.debug("Successfully imported the 'packaging' library for robust parsing.") except ImportError: # Define dummy classes for type hinting and checks if 'packaging' is not available class Requirement: # type: ignore def __init__(self, *args, **kwargs): raise NotImplementedError class InvalidVersion(Exception): # type: ignore pass parse_version = None PACKAGING_AVAILABLE = False logger.warning( "'packaging' library not found. Version comparison will be less robust." ) def find_main_script(repo_path: Path) -> Optional[Path]: """ Finds the main executable script (__main__.py) in standard project locations. Args: repo_path: The root directory of the repository. Returns: The Path object to the main script if found, otherwise None. """ repo_name = repo_path.name script_subfolder_name = repo_name.lower() # Standard location: repo_root/repo_name/__main__.py path_in_subdir = repo_path / script_subfolder_name / "__main__.py" # Fallback location: repo_root/__main__.py path_in_root = repo_path / "__main__.py" logger.info( f"Searching for main script at '{path_in_subdir}' or '{path_in_root}'" ) if path_in_subdir.is_file(): logger.info(f"Main script found: {path_in_subdir}") return path_in_subdir if path_in_root.is_file(): logger.info(f"Main script found: {path_in_root}") return path_in_root logger.warning("Main script (__main__.py) not found in standard locations.") return None def generate_requirements_file( repo_path: Path, external_deps_info: DependencyInfo, std_lib_deps_info: DependencyInfo, ) -> Path: """ Generates a detailed requirements.txt file in the repository's root. Args: repo_path: The root directory of the repository. external_deps_info: Dictionary of detected external dependencies. std_lib_deps_info: Dictionary of detected standard library modules. Returns: The path to the generated requirements.txt file. """ req_file_path = repo_path / "requirements.txt" py_ver = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" logger.info(f"Generating '{req_file_path}'...") try: with open(req_file_path, "w", encoding="utf-8") as f: f.write(f"# Requirements generated by DependencyAnalyzer for {repo_path.name}\n") f.write(f"# Python Version (analysis env): {py_ver}\n\n") f.write(f"# --- Standard Library Modules Used (part of Python {py_ver}) ---\n") if not std_lib_deps_info: f.write("# (No stdlib imports detected)\n") else: for name in sorted(std_lib_deps_info.keys()): locs = std_lib_deps_info[name].get("locations", set()) # Ensure locs is a list for consistent sorting loc_list = sorted(list(locs)) if isinstance(locs, set) else [] loc_str = "" if loc_list: # Show first 3 locations for brevity preview = ", ".join(loc_list[:3]) ellipsis = ", ..." if len(loc_list) > 3 else "" loc_str = f"Used in: {preview}{ellipsis}" f.write(f"# {name} ({loc_str})\n".replace(" ()", "")) f.write("\n# --- External Dependencies (for pip install) ---\n") if not external_deps_info: f.write("# (No external dependencies detected)\n") else: for pypi_name in sorted(external_deps_info.keys()): info = external_deps_info[pypi_name] ver = info.get("version") orig_imp = info.get("original_import_name") # Construct a requirement string, e.g., "package==1.2.3" or "package" req_line = f"{pypi_name}=={ver}" if ver else pypi_name locs = info.get("locations", set()) loc_list = sorted(list(locs)) if isinstance(locs, set) else [] comment_lines = [] if loc_list: imp_as = f"(imported as '{orig_imp}') " if orig_imp else "" preview = ", ".join(loc_list[:3]) ellipsis = ", ..." if len(loc_list) > 3 else "" comment_lines.append(f"# Found {imp_as}in: {preview}{ellipsis}") if not ver: comment_lines.append("# Version not detected in analysis environment.") if comment_lines: f.write("\n".join(comment_lines) + "\n") f.write(f"{req_line}\n\n") logger.info(f"Successfully generated '{req_file_path}'.") except IOError as e: logger.exception(f"Error writing to '{req_file_path}': {e}") raise return req_file_path def copy_requirements_to_packages_dir(repo_path: Path, packages_dir: Path) -> bool: """ Copies the main requirements.txt file to the offline packages directory. """ source_req = repo_path / "requirements.txt" dest_req = packages_dir / "requirements.txt" if not source_req.is_file(): logger.warning(f"Source '{source_req}' not found, cannot copy.") return False try: shutil.copy2(source_req, dest_req) logger.info(f"Copied '{source_req.name}' to '{packages_dir}'.") return True except (IOError, shutil.Error) as e: logger.exception(f"Failed to copy '{source_req.name}': {e}") return False def download_packages( repo_path: Path, requirements_file_path: Path ) -> Tuple[bool, str]: """ Downloads all required packages into a local '_req_packages' directory. Args: repo_path: The root directory of the repository. requirements_file_path: Path to the requirements.txt file to use. Returns: A tuple (success_flag, summary_message). """ packages_dir = repo_path / "_req_packages" packages_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Starting package download to '{packages_dir}'...") if not requirements_file_path.is_file(): msg = f"Source requirements file '{requirements_file_path}' not found." logger.error(msg) return False, msg copy_requirements_to_packages_dir(repo_path, packages_dir) cmd = [ sys.executable, "-m", "pip", "download", "-r", str(requirements_file_path.resolve()), "-d", str(packages_dir.resolve()), "--no-cache-dir", "--disable-pip-version-check", ] logger.debug(f"Running command: {' '.join(cmd)}") try: # Using subprocess.run for a simpler blocking call proc = subprocess.run( cmd, capture_output=True, text=True, encoding="utf-8", errors="ignore", timeout=600, # 10-minute timeout check=True, # Raises CalledProcessError on non-zero exit codes creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0), ) if proc.stderr and proc.stderr.strip(): logger.warning(f"Pip download reported warnings:\n{proc.stderr.strip()}") summary = f"All packages from '{requirements_file_path.name}' downloaded successfully." logger.info(summary) return True, summary except FileNotFoundError: msg = f"Command not found: '{sys.executable}'. Is Python/pip in PATH?" logger.error(msg) return False, msg except subprocess.TimeoutExpired: msg = "Package download timed out after 10 minutes." logger.error(msg) return False, msg except subprocess.CalledProcessError as e: error_details = f"Pip error (code {e.returncode}):\n{e.stderr.strip()}" logger.error(error_details) return False, f"Failed to download packages. {error_details}" except Exception as e: logger.exception("An unexpected error occurred during package download.") return False, f"An unexpected error occurred: {e}" def create_install_scripts( repo_path: Path, requirements_file_path: Path ) -> Tuple[bool, str]: """ Creates install_packages.bat and install_packages.sh scripts. """ packages_dir = repo_path / "_req_packages" packages_dir.mkdir(parents=True, exist_ok=True) if not requirements_file_path.is_file(): msg = f"Source requirements file '{requirements_file_path}' not found." return False, msg if not copy_requirements_to_packages_dir(repo_path, packages_dir): return False, "Failed to copy requirements.txt to packages directory." req_file_rel_name = "requirements.txt" # Windows Batch Script bat_script_content = f"""@echo off cls echo ------------------------------------ echo Package Installer for {repo_path.name} echo ------------------------------------ echo. echo Checking for a valid Python pip module... python -m pip --version >nul 2>&1 if %errorlevel% neq 0 ( echo WARNING: 'python -m pip' failed. Trying 'py -m pip'... py -m pip --version >nul 2>&1 if %errorlevel% neq 0 ( goto :pip_error ) else ( set PYTHON_CMD=py ) ) else ( set PYTHON_CMD=python ) echo Python pip found. goto :install :pip_error echo ERROR: Python pip module not found. echo Please ensure Python is installed and 'pip' is available in your PATH. echo See: https://pip.pypa.io/en/stable/installation/ echo. pause exit /b 1 :install echo. echo Installing packages from local folder... echo Source Directory: %~dp0 echo. %PYTHON_CMD% -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file_rel_name}" --disable-pip-version-check if %errorlevel% neq 0 ( goto :install_error ) echo. echo -------------------------- echo Installation Successful! echo -------------------------- echo. pause exit /b 0 :install_error echo. echo -------------------------- echo ERROR: Installation Failed. echo -------------------------- echo Please check the messages above for details. pause exit /b 1 """ # Linux/macOS Shell Script sh_script_content = f"""#!/bin/sh # Shell script for installing packages on Linux/macOS echo "------------------------------------" echo "Package Installer for {repo_path.name}" echo "------------------------------------" echo "" echo "Checking for a valid Python pip module..." # Find a valid python command if command -v python3 >/dev/null 2>&1 && python3 -m pip --version >/dev/null 2>&1; then PYTHON_CMD="python3" elif command -v python >/dev/null 2>&1 && python -m pip --version >/dev/null 2>&1; then PYTHON_CMD="python" else echo "ERROR: Could not find a working Python pip installation for 'python' or 'python3'." echo "Please ensure Python and pip are correctly installed." echo "See: https://pip.pypa.io/en/stable/installation/" exit 1 fi echo "Found pip using: '$PYTHON_CMD'" # Get the directory where the script is located SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" REQ_FILE="$SCRIPT_DIR/{req_file_rel_name}" echo "" echo "Installing packages from: '$SCRIPT_DIR'" echo "Using requirements file: '$REQ_FILE'" echo "" "$PYTHON_CMD" -m pip install --no-index --find-links "$SCRIPT_DIR" -r "$REQ_FILE" --disable-pip-version-check INSTALL_STATUS=$? echo "" if [ $INSTALL_STATUS -ne 0 ]; then echo "--------------------------" echo "ERROR: Installation Failed." echo "--------------------------" echo "Please check the error messages above." exit 1 else echo "--------------------------" echo "Installation Successful!" echo "--------------------------" exit 0 fi """ try: bat_path = packages_dir / "install_packages.bat" sh_path = packages_dir / "install_packages.sh" with open(bat_path, "w", encoding="utf-8", newline="\r\n") as f: f.write(bat_script_content) logger.info(f"Created Windows install script: {bat_path}") with open(sh_path, "w", encoding="utf-8", newline="\n") as f: f.write(sh_script_content) os.chmod(sh_path, 0o755) # Make the script executable on Unix-like systems logger.info(f"Created Linux/macOS install script: {sh_path} (executable)") return True, f"Installation scripts created in '{packages_dir}'." except IOError as e: logger.exception("Error creating installation scripts.") return False, f"Error creating scripts: {e}" def get_installed_packages() -> Dict[str, str]: """ Retrieves all packages installed in the current Python environment. Returns: A dictionary mapping normalized package names to their versions. """ installed: Dict[str, str] = {} cmd = [ sys.executable, "-m", "pip", "list", "--format=json", "--disable-pip-version-check", ] logger.debug(f"Getting installed packages using command: {' '.join(cmd)}") try: proc = subprocess.run( cmd, capture_output=True, text=True, check=True, encoding="utf-8", errors="ignore", creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0), ) packages = json.loads(proc.stdout) for pkg in packages: # Normalize names to lowercase and hyphens for reliable matching normalized_name = pkg["name"].lower().replace("_", "-") installed[normalized_name] = pkg["version"] logger.info(f"Found {len(installed)} installed packages in the environment.") except FileNotFoundError: logger.error(f"Command not found: '{cmd[0]}'. Is Python/pip in PATH?") except subprocess.CalledProcessError as e: logger.error(f"'pip list' command failed (code {e.returncode}): {e.stderr}") except json.JSONDecodeError as e: logger.error(f"Failed to parse 'pip list' JSON output: {e}") except Exception as e: logger.exception("Unexpected error getting installed packages.") return installed def _extract_pkg_name_from_spec(spec: str) -> Optional[str]: """ Helper to extract a package name from a requirement spec string. """ if PACKAGING_AVAILABLE: try: return Requirement(spec).name except Exception: # Fallback to regex if 'packaging' fails on a malformed line pass # Basic regex fallback for when 'packaging' is not available or fails match = re.match(r"([a-zA-Z0-9._-]+)", spec) return match.group(1) if match else None def compare_requirements_with_installed( requirements_file: Path, installed_packages: Dict[str, str] ) -> List[Dict[str, str]]: """ Compares a requirements file against the dictionary of installed packages. """ results: List[Dict[str, str]] = [] if not requirements_file.is_file(): logger.warning(f"Comparison failed: '{requirements_file}' not found.") return results logger.info(f"Comparing '{requirements_file.name}' with installed packages...") with open(requirements_file, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): spec = line.strip() if not spec or spec.startswith("#") or spec.startswith("-"): continue pkg_name = _extract_pkg_name_from_spec(spec) if not pkg_name: logger.warning(f"L{line_num}: Could not extract package name from '{spec}'. Skipping.") continue required_spec = "any" req_obj = None if PACKAGING_AVAILABLE: try: req_obj = Requirement(spec) # Use str() to handle complex specifiers like '>=1.0,<2.0' required_spec = str(req_obj.specifier) if req_obj.specifier else "any" except Exception as e: logger.warning(f"L{line_num}: Could not parse '{spec}' with 'packaging': {e}") elif "==" in spec: required_spec = spec.split("==", 1)[1].strip() norm_name = pkg_name.lower().replace("_", "-") installed_version = installed_packages.get(norm_name) status = "" if installed_version: if PACKAGING_AVAILABLE and req_obj and req_obj.specifier: try: if parse_version(installed_version) in req_obj.specifier: status = "OK" else: status = "Version Mismatch" except InvalidVersion: status = "Invalid Installed Version" elif "==" in required_spec: # Basic check if packaging not available status = "OK" if installed_version == required_spec else "Version Mismatch" else: status = "Installed (Version not specified)" else: status = "Not Installed" results.append({ "package": pkg_name, "required": required_spec, "installed": installed_version or "Not Installed", "status": status, }) logger.info(f"Comparison finished with {len(results)} entries.") return results def update_system_packages( packages_to_update: List[str], repo_path: Path ) -> Tuple[bool, str]: """ Updates or installs a list of packages using pip install --upgrade. It reads the full specifier from the project's requirements file. """ if not packages_to_update: return True, "No packages were specified for update." req_file_path = repo_path / "requirements.txt" if not req_file_path.is_file(): return False, f"Cannot update: '{req_file_path}' not found." # Create a map of {normalized_name: original_spec_line} from requirements.txt req_map: Dict[str, str] = {} try: with open(req_file_path, "r", encoding="utf-8") as f: for line in f: spec = line.strip() if not spec or spec.startswith("#") or spec.startswith("-"): continue pkg_name = _extract_pkg_name_from_spec(spec) if pkg_name: req_map[pkg_name.lower().replace("_", "-")] = spec except IOError as e: return False, f"Error reading '{req_file_path}': {e}" # Build a list of specs to pass to pip, based on the user's selection final_specs_to_update: List[str] = [] for pkg_name in packages_to_update: norm_name = pkg_name.lower().replace("_", "-") if norm_name in req_map: final_specs_to_update.append(req_map[norm_name]) else: # Fallback: if not in requirements, just use the name logger.warning( f"'{pkg_name}' not found in requirements.txt. Upgrading by name only." ) final_specs_to_update.append(pkg_name) if not final_specs_to_update: return True, "No matching packages found in requirements.txt to update." # Use a temporary file to handle complex specifiers safely temp_req_path = repo_path / "_temp_update_reqs.txt" cmd = [ sys.executable, "-m", "pip", "install", "--upgrade", "-r", str(temp_req_path.resolve()), "--disable-pip-version-check", ] try: with open(temp_req_path, "w", encoding="utf-8") as f: f.write("\n".join(final_specs_to_update)) logger.info(f"Attempting to update {len(final_specs_to_update)} package(s).") logger.debug(f"Running command: {' '.join(cmd)}") proc = subprocess.run( cmd, capture_output=True, text=True, encoding="utf-8", errors="ignore", timeout=600, check=True, creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0), ) summary = f"Update completed for: {', '.join(packages_to_update)}." logger.info(summary) if proc.stderr and proc.stderr.strip(): logger.warning(f"Update process reported warnings:\n{proc.stderr.strip()}") return True, summary except subprocess.CalledProcessError as e: error_msg = f"Update failed. Pip error (code {e.returncode}):\n{e.stderr.strip()}" logger.error(error_msg) return False, error_msg except Exception as e: logger.exception("An unexpected error occurred during package update.") return False, f"An unexpected error occurred: {e}" finally: # Clean up the temporary file if temp_req_path.exists(): try: os.remove(temp_req_path) except OSError as e: logger.warning(f"Could not remove temporary file '{temp_req_path}': {e}")