SXXXXXXX_DependencyAnalyzer/dependencyanalyzer/core/package_manager.py
2025-11-10 14:18:35 +01:00

599 lines
22 KiB
Python

# dependency_analyzer/core/package_manager.py
"""
Handles Python package management tasks.
This module includes functions for:
- Generating and reading requirements.txt files.
- Downloading packages for offline installation using pip.
- Creating installation scripts (.bat, .sh).
- Comparing project requirements with the currently installed packages.
- Updating packages in the local environment.
"""
import json
import logging
import os
import re
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from .analyzer import DependencyInfo # Import type alias for clarity
# --- Logger Configuration ---
logger = logging.getLogger(__name__)
# --- Optional 'packaging' library import for robust version/specifier parsing ---
try:
from packaging.requirements import Requirement
from packaging.version import InvalidVersion, parse as parse_version
PACKAGING_AVAILABLE = True
logger.debug("Successfully imported the 'packaging' library for robust parsing.")
except ImportError:
# Define dummy classes for type hinting and checks if 'packaging' is not available
class Requirement: # type: ignore
def __init__(self, *args, **kwargs):
raise NotImplementedError
class InvalidVersion(Exception): # type: ignore
pass
parse_version = None
PACKAGING_AVAILABLE = False
logger.warning(
"'packaging' library not found. Version comparison will be less robust."
)
def find_main_script(repo_path: Path) -> Optional[Path]:
"""
Finds the main executable script (__main__.py) in standard project locations.
Args:
repo_path: The root directory of the repository.
Returns:
The Path object to the main script if found, otherwise None.
"""
repo_name = repo_path.name
script_subfolder_name = repo_name.lower()
# Standard location: repo_root/repo_name/__main__.py
path_in_subdir = repo_path / script_subfolder_name / "__main__.py"
# Fallback location: repo_root/__main__.py
path_in_root = repo_path / "__main__.py"
logger.info(
f"Searching for main script at '{path_in_subdir}' or '{path_in_root}'"
)
if path_in_subdir.is_file():
logger.info(f"Main script found: {path_in_subdir}")
return path_in_subdir
if path_in_root.is_file():
logger.info(f"Main script found: {path_in_root}")
return path_in_root
logger.warning("Main script (__main__.py) not found in standard locations.")
return None
def generate_requirements_file(
repo_path: Path,
external_deps_info: DependencyInfo,
std_lib_deps_info: DependencyInfo,
) -> Path:
"""
Generates a detailed requirements.txt file in the repository's root.
Args:
repo_path: The root directory of the repository.
external_deps_info: Dictionary of detected external dependencies.
std_lib_deps_info: Dictionary of detected standard library modules.
Returns:
The path to the generated requirements.txt file.
"""
req_file_path = repo_path / "requirements.txt"
py_ver = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
logger.info(f"Generating '{req_file_path}'...")
try:
with open(req_file_path, "w", encoding="utf-8") as f:
f.write(f"# Requirements generated by DependencyAnalyzer for {repo_path.name}\n")
f.write(f"# Python Version (analysis env): {py_ver}\n\n")
f.write(f"# --- Standard Library Modules Used (part of Python {py_ver}) ---\n")
if not std_lib_deps_info:
f.write("# (No stdlib imports detected)\n")
else:
for name in sorted(std_lib_deps_info.keys()):
locs = std_lib_deps_info[name].get("locations", set())
# Ensure locs is a list for consistent sorting
loc_list = sorted(list(locs)) if isinstance(locs, set) else []
loc_str = ""
if loc_list:
# Show first 3 locations for brevity
preview = ", ".join(loc_list[:3])
ellipsis = ", ..." if len(loc_list) > 3 else ""
loc_str = f"Used in: {preview}{ellipsis}"
f.write(f"# {name} ({loc_str})\n".replace(" ()", ""))
f.write("\n# --- External Dependencies (for pip install) ---\n")
if not external_deps_info:
f.write("# (No external dependencies detected)\n")
else:
for pypi_name in sorted(external_deps_info.keys()):
info = external_deps_info[pypi_name]
ver = info.get("version")
orig_imp = info.get("original_import_name")
# Construct a requirement string, e.g., "package==1.2.3" or "package"
req_line = f"{pypi_name}=={ver}" if ver else pypi_name
locs = info.get("locations", set())
loc_list = sorted(list(locs)) if isinstance(locs, set) else []
comment_lines = []
if loc_list:
imp_as = f"(imported as '{orig_imp}') " if orig_imp else ""
preview = ", ".join(loc_list[:3])
ellipsis = ", ..." if len(loc_list) > 3 else ""
comment_lines.append(f"# Found {imp_as}in: {preview}{ellipsis}")
if not ver:
comment_lines.append("# Version not detected in analysis environment.")
if comment_lines:
f.write("\n".join(comment_lines) + "\n")
f.write(f"{req_line}\n\n")
logger.info(f"Successfully generated '{req_file_path}'.")
except IOError as e:
logger.exception(f"Error writing to '{req_file_path}': {e}")
raise
return req_file_path
def copy_requirements_to_packages_dir(repo_path: Path, packages_dir: Path) -> bool:
"""
Copies the main requirements.txt file to the offline packages directory.
"""
source_req = repo_path / "requirements.txt"
dest_req = packages_dir / "requirements.txt"
if not source_req.is_file():
logger.warning(f"Source '{source_req}' not found, cannot copy.")
return False
try:
shutil.copy2(source_req, dest_req)
logger.info(f"Copied '{source_req.name}' to '{packages_dir}'.")
return True
except (IOError, shutil.Error) as e:
logger.exception(f"Failed to copy '{source_req.name}': {e}")
return False
def download_packages(
repo_path: Path, requirements_file_path: Path
) -> Tuple[bool, str]:
"""
Downloads all required packages into a local '_req_packages' directory.
Args:
repo_path: The root directory of the repository.
requirements_file_path: Path to the requirements.txt file to use.
Returns:
A tuple (success_flag, summary_message).
"""
packages_dir = repo_path / "_req_packages"
packages_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Starting package download to '{packages_dir}'...")
if not requirements_file_path.is_file():
msg = f"Source requirements file '{requirements_file_path}' not found."
logger.error(msg)
return False, msg
copy_requirements_to_packages_dir(repo_path, packages_dir)
cmd = [
sys.executable, "-m", "pip", "download",
"-r", str(requirements_file_path.resolve()),
"-d", str(packages_dir.resolve()),
"--no-cache-dir",
"--disable-pip-version-check",
]
logger.debug(f"Running command: {' '.join(cmd)}")
try:
# Using subprocess.run for a simpler blocking call
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
timeout=600, # 10-minute timeout
check=True, # Raises CalledProcessError on non-zero exit codes
creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0),
)
if proc.stderr and proc.stderr.strip():
logger.warning(f"Pip download reported warnings:\n{proc.stderr.strip()}")
summary = f"All packages from '{requirements_file_path.name}' downloaded successfully."
logger.info(summary)
return True, summary
except FileNotFoundError:
msg = f"Command not found: '{sys.executable}'. Is Python/pip in PATH?"
logger.error(msg)
return False, msg
except subprocess.TimeoutExpired:
msg = "Package download timed out after 10 minutes."
logger.error(msg)
return False, msg
except subprocess.CalledProcessError as e:
error_details = f"Pip error (code {e.returncode}):\n{e.stderr.strip()}"
logger.error(error_details)
return False, f"Failed to download packages. {error_details}"
except Exception as e:
logger.exception("An unexpected error occurred during package download.")
return False, f"An unexpected error occurred: {e}"
def create_install_scripts(
repo_path: Path, requirements_file_path: Path
) -> Tuple[bool, str]:
"""
Creates install_packages.bat and install_packages.sh scripts.
"""
packages_dir = repo_path / "_req_packages"
packages_dir.mkdir(parents=True, exist_ok=True)
if not requirements_file_path.is_file():
msg = f"Source requirements file '{requirements_file_path}' not found."
return False, msg
if not copy_requirements_to_packages_dir(repo_path, packages_dir):
return False, "Failed to copy requirements.txt to packages directory."
req_file_rel_name = "requirements.txt"
# Windows Batch Script
bat_script_content = f"""@echo off
cls
echo ------------------------------------
echo Package Installer for {repo_path.name}
echo ------------------------------------
echo.
echo Checking for a valid Python pip module...
python -m pip --version >nul 2>&1
if %errorlevel% neq 0 (
echo WARNING: 'python -m pip' failed. Trying 'py -m pip'...
py -m pip --version >nul 2>&1
if %errorlevel% neq 0 (
goto :pip_error
) else (
set PYTHON_CMD=py
)
) else (
set PYTHON_CMD=python
)
echo Python pip found.
goto :install
:pip_error
echo ERROR: Python pip module not found.
echo Please ensure Python is installed and 'pip' is available in your PATH.
echo See: https://pip.pypa.io/en/stable/installation/
echo.
pause
exit /b 1
:install
echo.
echo Installing packages from local folder...
echo Source Directory: %~dp0
echo.
%PYTHON_CMD% -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file_rel_name}" --disable-pip-version-check
if %errorlevel% neq 0 (
goto :install_error
)
echo.
echo --------------------------
echo Installation Successful!
echo --------------------------
echo.
pause
exit /b 0
:install_error
echo.
echo --------------------------
echo ERROR: Installation Failed.
echo --------------------------
echo Please check the messages above for details.
pause
exit /b 1
"""
# Linux/macOS Shell Script
sh_script_content = f"""#!/bin/sh
# Shell script for installing packages on Linux/macOS
echo "------------------------------------"
echo "Package Installer for {repo_path.name}"
echo "------------------------------------"
echo ""
echo "Checking for a valid Python pip module..."
# Find a valid python command
if command -v python3 >/dev/null 2>&1 && python3 -m pip --version >/dev/null 2>&1; then
PYTHON_CMD="python3"
elif command -v python >/dev/null 2>&1 && python -m pip --version >/dev/null 2>&1; then
PYTHON_CMD="python"
else
echo "ERROR: Could not find a working Python pip installation for 'python' or 'python3'."
echo "Please ensure Python and pip are correctly installed."
echo "See: https://pip.pypa.io/en/stable/installation/"
exit 1
fi
echo "Found pip using: '$PYTHON_CMD'"
# Get the directory where the script is located
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
REQ_FILE="$SCRIPT_DIR/{req_file_rel_name}"
echo ""
echo "Installing packages from: '$SCRIPT_DIR'"
echo "Using requirements file: '$REQ_FILE'"
echo ""
"$PYTHON_CMD" -m pip install --no-index --find-links "$SCRIPT_DIR" -r "$REQ_FILE" --disable-pip-version-check
INSTALL_STATUS=$?
echo ""
if [ $INSTALL_STATUS -ne 0 ]; then
echo "--------------------------"
echo "ERROR: Installation Failed."
echo "--------------------------"
echo "Please check the error messages above."
exit 1
else
echo "--------------------------"
echo "Installation Successful!"
echo "--------------------------"
exit 0
fi
"""
try:
bat_path = packages_dir / "install_packages.bat"
sh_path = packages_dir / "install_packages.sh"
with open(bat_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write(bat_script_content)
logger.info(f"Created Windows install script: {bat_path}")
with open(sh_path, "w", encoding="utf-8", newline="\n") as f:
f.write(sh_script_content)
os.chmod(sh_path, 0o755) # Make the script executable on Unix-like systems
logger.info(f"Created Linux/macOS install script: {sh_path} (executable)")
return True, f"Installation scripts created in '{packages_dir}'."
except IOError as e:
logger.exception("Error creating installation scripts.")
return False, f"Error creating scripts: {e}"
def get_installed_packages() -> Dict[str, str]:
"""
Retrieves all packages installed in the current Python environment.
Returns:
A dictionary mapping normalized package names to their versions.
"""
installed: Dict[str, str] = {}
cmd = [
sys.executable, "-m", "pip", "list",
"--format=json", "--disable-pip-version-check",
]
logger.debug(f"Getting installed packages using command: {' '.join(cmd)}")
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
encoding="utf-8",
errors="ignore",
creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0),
)
packages = json.loads(proc.stdout)
for pkg in packages:
# Normalize names to lowercase and hyphens for reliable matching
normalized_name = pkg["name"].lower().replace("_", "-")
installed[normalized_name] = pkg["version"]
logger.info(f"Found {len(installed)} installed packages in the environment.")
except FileNotFoundError:
logger.error(f"Command not found: '{cmd[0]}'. Is Python/pip in PATH?")
except subprocess.CalledProcessError as e:
logger.error(f"'pip list' command failed (code {e.returncode}): {e.stderr}")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse 'pip list' JSON output: {e}")
except Exception as e:
logger.exception("Unexpected error getting installed packages.")
return installed
def _extract_pkg_name_from_spec(spec: str) -> Optional[str]:
"""
Helper to extract a package name from a requirement spec string.
"""
if PACKAGING_AVAILABLE:
try:
return Requirement(spec).name
except Exception:
# Fallback to regex if 'packaging' fails on a malformed line
pass
# Basic regex fallback for when 'packaging' is not available or fails
match = re.match(r"([a-zA-Z0-9._-]+)", spec)
return match.group(1) if match else None
def compare_requirements_with_installed(
requirements_file: Path, installed_packages: Dict[str, str]
) -> List[Dict[str, str]]:
"""
Compares a requirements file against the dictionary of installed packages.
"""
results: List[Dict[str, str]] = []
if not requirements_file.is_file():
logger.warning(f"Comparison failed: '{requirements_file}' not found.")
return results
logger.info(f"Comparing '{requirements_file.name}' with installed packages...")
with open(requirements_file, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
spec = line.strip()
if not spec or spec.startswith("#") or spec.startswith("-"):
continue
pkg_name = _extract_pkg_name_from_spec(spec)
if not pkg_name:
logger.warning(f"L{line_num}: Could not extract package name from '{spec}'. Skipping.")
continue
required_spec = "any"
req_obj = None
if PACKAGING_AVAILABLE:
try:
req_obj = Requirement(spec)
# Use str() to handle complex specifiers like '>=1.0,<2.0'
required_spec = str(req_obj.specifier) if req_obj.specifier else "any"
except Exception as e:
logger.warning(f"L{line_num}: Could not parse '{spec}' with 'packaging': {e}")
elif "==" in spec:
required_spec = spec.split("==", 1)[1].strip()
norm_name = pkg_name.lower().replace("_", "-")
installed_version = installed_packages.get(norm_name)
status = ""
if installed_version:
if PACKAGING_AVAILABLE and req_obj and req_obj.specifier:
try:
if parse_version(installed_version) in req_obj.specifier:
status = "OK"
else:
status = "Version Mismatch"
except InvalidVersion:
status = "Invalid Installed Version"
elif "==" in required_spec: # Basic check if packaging not available
status = "OK" if installed_version == required_spec else "Version Mismatch"
else:
status = "Installed (Version not specified)"
else:
status = "Not Installed"
results.append({
"package": pkg_name,
"required": required_spec,
"installed": installed_version or "Not Installed",
"status": status,
})
logger.info(f"Comparison finished with {len(results)} entries.")
return results
def update_system_packages(
packages_to_update: List[str], repo_path: Path
) -> Tuple[bool, str]:
"""
Updates or installs a list of packages using pip install --upgrade.
It reads the full specifier from the project's requirements file.
"""
if not packages_to_update:
return True, "No packages were specified for update."
req_file_path = repo_path / "requirements.txt"
if not req_file_path.is_file():
return False, f"Cannot update: '{req_file_path}' not found."
# Create a map of {normalized_name: original_spec_line} from requirements.txt
req_map: Dict[str, str] = {}
try:
with open(req_file_path, "r", encoding="utf-8") as f:
for line in f:
spec = line.strip()
if not spec or spec.startswith("#") or spec.startswith("-"):
continue
pkg_name = _extract_pkg_name_from_spec(spec)
if pkg_name:
req_map[pkg_name.lower().replace("_", "-")] = spec
except IOError as e:
return False, f"Error reading '{req_file_path}': {e}"
# Build a list of specs to pass to pip, based on the user's selection
final_specs_to_update: List[str] = []
for pkg_name in packages_to_update:
norm_name = pkg_name.lower().replace("_", "-")
if norm_name in req_map:
final_specs_to_update.append(req_map[norm_name])
else:
# Fallback: if not in requirements, just use the name
logger.warning(
f"'{pkg_name}' not found in requirements.txt. Upgrading by name only."
)
final_specs_to_update.append(pkg_name)
if not final_specs_to_update:
return True, "No matching packages found in requirements.txt to update."
# Use a temporary file to handle complex specifiers safely
temp_req_path = repo_path / "_temp_update_reqs.txt"
cmd = [
sys.executable, "-m", "pip", "install", "--upgrade",
"-r", str(temp_req_path.resolve()),
"--disable-pip-version-check",
]
try:
with open(temp_req_path, "w", encoding="utf-8") as f:
f.write("\n".join(final_specs_to_update))
logger.info(f"Attempting to update {len(final_specs_to_update)} package(s).")
logger.debug(f"Running command: {' '.join(cmd)}")
proc = subprocess.run(
cmd,
capture_output=True, text=True, encoding="utf-8", errors="ignore",
timeout=600, check=True,
creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0),
)
summary = f"Update completed for: {', '.join(packages_to_update)}."
logger.info(summary)
if proc.stderr and proc.stderr.strip():
logger.warning(f"Update process reported warnings:\n{proc.stderr.strip()}")
return True, summary
except subprocess.CalledProcessError as e:
error_msg = f"Update failed. Pip error (code {e.returncode}):\n{e.stderr.strip()}"
logger.error(error_msg)
return False, error_msg
except Exception as e:
logger.exception("An unexpected error occurred during package update.")
return False, f"An unexpected error occurred: {e}"
finally:
# Clean up the temporary file
if temp_req_path.exists():
try:
os.remove(temp_req_path)
except OSError as e:
logger.warning(f"Could not remove temporary file '{temp_req_path}': {e}")