599 lines
22 KiB
Python
599 lines
22 KiB
Python
# dependency_analyzer/core/package_manager.py
|
|
"""
|
|
Handles Python package management tasks.
|
|
|
|
This module includes functions for:
|
|
- Generating and reading requirements.txt files.
|
|
- Downloading packages for offline installation using pip.
|
|
- Creating installation scripts (.bat, .sh).
|
|
- Comparing project requirements with the currently installed packages.
|
|
- Updating packages in the local environment.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
from .analyzer import DependencyInfo # Import type alias for clarity
|
|
|
|
# --- Logger Configuration ---
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Optional 'packaging' library import for robust version/specifier parsing ---
|
|
try:
|
|
from packaging.requirements import Requirement
|
|
from packaging.version import InvalidVersion, parse as parse_version
|
|
|
|
PACKAGING_AVAILABLE = True
|
|
logger.debug("Successfully imported the 'packaging' library for robust parsing.")
|
|
except ImportError:
|
|
# Define dummy classes for type hinting and checks if 'packaging' is not available
|
|
class Requirement: # type: ignore
|
|
def __init__(self, *args, **kwargs):
|
|
raise NotImplementedError
|
|
class InvalidVersion(Exception): # type: ignore
|
|
pass
|
|
parse_version = None
|
|
PACKAGING_AVAILABLE = False
|
|
logger.warning(
|
|
"'packaging' library not found. Version comparison will be less robust."
|
|
)
|
|
|
|
|
|
def find_main_script(repo_path: Path) -> Optional[Path]:
|
|
"""
|
|
Finds the main executable script (__main__.py) in standard project locations.
|
|
|
|
Args:
|
|
repo_path: The root directory of the repository.
|
|
|
|
Returns:
|
|
The Path object to the main script if found, otherwise None.
|
|
"""
|
|
repo_name = repo_path.name
|
|
script_subfolder_name = repo_name.lower()
|
|
|
|
# Standard location: repo_root/repo_name/__main__.py
|
|
path_in_subdir = repo_path / script_subfolder_name / "__main__.py"
|
|
# Fallback location: repo_root/__main__.py
|
|
path_in_root = repo_path / "__main__.py"
|
|
|
|
logger.info(
|
|
f"Searching for main script at '{path_in_subdir}' or '{path_in_root}'"
|
|
)
|
|
if path_in_subdir.is_file():
|
|
logger.info(f"Main script found: {path_in_subdir}")
|
|
return path_in_subdir
|
|
if path_in_root.is_file():
|
|
logger.info(f"Main script found: {path_in_root}")
|
|
return path_in_root
|
|
|
|
logger.warning("Main script (__main__.py) not found in standard locations.")
|
|
return None
|
|
|
|
|
|
def generate_requirements_file(
|
|
repo_path: Path,
|
|
external_deps_info: DependencyInfo,
|
|
std_lib_deps_info: DependencyInfo,
|
|
) -> Path:
|
|
"""
|
|
Generates a detailed requirements.txt file in the repository's root.
|
|
|
|
Args:
|
|
repo_path: The root directory of the repository.
|
|
external_deps_info: Dictionary of detected external dependencies.
|
|
std_lib_deps_info: Dictionary of detected standard library modules.
|
|
|
|
Returns:
|
|
The path to the generated requirements.txt file.
|
|
"""
|
|
req_file_path = repo_path / "requirements.txt"
|
|
py_ver = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
|
logger.info(f"Generating '{req_file_path}'...")
|
|
|
|
try:
|
|
with open(req_file_path, "w", encoding="utf-8") as f:
|
|
f.write(f"# Requirements generated by DependencyAnalyzer for {repo_path.name}\n")
|
|
f.write(f"# Python Version (analysis env): {py_ver}\n\n")
|
|
|
|
f.write(f"# --- Standard Library Modules Used (part of Python {py_ver}) ---\n")
|
|
if not std_lib_deps_info:
|
|
f.write("# (No stdlib imports detected)\n")
|
|
else:
|
|
for name in sorted(std_lib_deps_info.keys()):
|
|
locs = std_lib_deps_info[name].get("locations", set())
|
|
# Ensure locs is a list for consistent sorting
|
|
loc_list = sorted(list(locs)) if isinstance(locs, set) else []
|
|
loc_str = ""
|
|
if loc_list:
|
|
# Show first 3 locations for brevity
|
|
preview = ", ".join(loc_list[:3])
|
|
ellipsis = ", ..." if len(loc_list) > 3 else ""
|
|
loc_str = f"Used in: {preview}{ellipsis}"
|
|
f.write(f"# {name} ({loc_str})\n".replace(" ()", ""))
|
|
|
|
f.write("\n# --- External Dependencies (for pip install) ---\n")
|
|
if not external_deps_info:
|
|
f.write("# (No external dependencies detected)\n")
|
|
else:
|
|
for pypi_name in sorted(external_deps_info.keys()):
|
|
info = external_deps_info[pypi_name]
|
|
ver = info.get("version")
|
|
orig_imp = info.get("original_import_name")
|
|
|
|
# Construct a requirement string, e.g., "package==1.2.3" or "package"
|
|
req_line = f"{pypi_name}=={ver}" if ver else pypi_name
|
|
|
|
locs = info.get("locations", set())
|
|
loc_list = sorted(list(locs)) if isinstance(locs, set) else []
|
|
|
|
comment_lines = []
|
|
if loc_list:
|
|
imp_as = f"(imported as '{orig_imp}') " if orig_imp else ""
|
|
preview = ", ".join(loc_list[:3])
|
|
ellipsis = ", ..." if len(loc_list) > 3 else ""
|
|
comment_lines.append(f"# Found {imp_as}in: {preview}{ellipsis}")
|
|
|
|
if not ver:
|
|
comment_lines.append("# Version not detected in analysis environment.")
|
|
|
|
if comment_lines:
|
|
f.write("\n".join(comment_lines) + "\n")
|
|
f.write(f"{req_line}\n\n")
|
|
|
|
logger.info(f"Successfully generated '{req_file_path}'.")
|
|
except IOError as e:
|
|
logger.exception(f"Error writing to '{req_file_path}': {e}")
|
|
raise
|
|
return req_file_path
|
|
|
|
|
|
def copy_requirements_to_packages_dir(repo_path: Path, packages_dir: Path) -> bool:
|
|
"""
|
|
Copies the main requirements.txt file to the offline packages directory.
|
|
"""
|
|
source_req = repo_path / "requirements.txt"
|
|
dest_req = packages_dir / "requirements.txt"
|
|
|
|
if not source_req.is_file():
|
|
logger.warning(f"Source '{source_req}' not found, cannot copy.")
|
|
return False
|
|
try:
|
|
shutil.copy2(source_req, dest_req)
|
|
logger.info(f"Copied '{source_req.name}' to '{packages_dir}'.")
|
|
return True
|
|
except (IOError, shutil.Error) as e:
|
|
logger.exception(f"Failed to copy '{source_req.name}': {e}")
|
|
return False
|
|
|
|
|
|
def download_packages(
|
|
repo_path: Path, requirements_file_path: Path
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Downloads all required packages into a local '_req_packages' directory.
|
|
|
|
Args:
|
|
repo_path: The root directory of the repository.
|
|
requirements_file_path: Path to the requirements.txt file to use.
|
|
|
|
Returns:
|
|
A tuple (success_flag, summary_message).
|
|
"""
|
|
packages_dir = repo_path / "_req_packages"
|
|
packages_dir.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Starting package download to '{packages_dir}'...")
|
|
|
|
if not requirements_file_path.is_file():
|
|
msg = f"Source requirements file '{requirements_file_path}' not found."
|
|
logger.error(msg)
|
|
return False, msg
|
|
|
|
copy_requirements_to_packages_dir(repo_path, packages_dir)
|
|
|
|
cmd = [
|
|
sys.executable, "-m", "pip", "download",
|
|
"-r", str(requirements_file_path.resolve()),
|
|
"-d", str(packages_dir.resolve()),
|
|
"--no-cache-dir",
|
|
"--disable-pip-version-check",
|
|
]
|
|
|
|
logger.debug(f"Running command: {' '.join(cmd)}")
|
|
try:
|
|
# Using subprocess.run for a simpler blocking call
|
|
proc = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="ignore",
|
|
timeout=600, # 10-minute timeout
|
|
check=True, # Raises CalledProcessError on non-zero exit codes
|
|
creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0),
|
|
)
|
|
if proc.stderr and proc.stderr.strip():
|
|
logger.warning(f"Pip download reported warnings:\n{proc.stderr.strip()}")
|
|
|
|
summary = f"All packages from '{requirements_file_path.name}' downloaded successfully."
|
|
logger.info(summary)
|
|
return True, summary
|
|
|
|
except FileNotFoundError:
|
|
msg = f"Command not found: '{sys.executable}'. Is Python/pip in PATH?"
|
|
logger.error(msg)
|
|
return False, msg
|
|
except subprocess.TimeoutExpired:
|
|
msg = "Package download timed out after 10 minutes."
|
|
logger.error(msg)
|
|
return False, msg
|
|
except subprocess.CalledProcessError as e:
|
|
error_details = f"Pip error (code {e.returncode}):\n{e.stderr.strip()}"
|
|
logger.error(error_details)
|
|
return False, f"Failed to download packages. {error_details}"
|
|
except Exception as e:
|
|
logger.exception("An unexpected error occurred during package download.")
|
|
return False, f"An unexpected error occurred: {e}"
|
|
|
|
|
|
def create_install_scripts(
|
|
repo_path: Path, requirements_file_path: Path
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Creates install_packages.bat and install_packages.sh scripts.
|
|
"""
|
|
packages_dir = repo_path / "_req_packages"
|
|
packages_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if not requirements_file_path.is_file():
|
|
msg = f"Source requirements file '{requirements_file_path}' not found."
|
|
return False, msg
|
|
if not copy_requirements_to_packages_dir(repo_path, packages_dir):
|
|
return False, "Failed to copy requirements.txt to packages directory."
|
|
|
|
req_file_rel_name = "requirements.txt"
|
|
|
|
# Windows Batch Script
|
|
bat_script_content = f"""@echo off
|
|
cls
|
|
echo ------------------------------------
|
|
echo Package Installer for {repo_path.name}
|
|
echo ------------------------------------
|
|
echo.
|
|
echo Checking for a valid Python pip module...
|
|
python -m pip --version >nul 2>&1
|
|
if %errorlevel% neq 0 (
|
|
echo WARNING: 'python -m pip' failed. Trying 'py -m pip'...
|
|
py -m pip --version >nul 2>&1
|
|
if %errorlevel% neq 0 (
|
|
goto :pip_error
|
|
) else (
|
|
set PYTHON_CMD=py
|
|
)
|
|
) else (
|
|
set PYTHON_CMD=python
|
|
)
|
|
echo Python pip found.
|
|
goto :install
|
|
|
|
:pip_error
|
|
echo ERROR: Python pip module not found.
|
|
echo Please ensure Python is installed and 'pip' is available in your PATH.
|
|
echo See: https://pip.pypa.io/en/stable/installation/
|
|
echo.
|
|
pause
|
|
exit /b 1
|
|
|
|
:install
|
|
echo.
|
|
echo Installing packages from local folder...
|
|
echo Source Directory: %~dp0
|
|
echo.
|
|
%PYTHON_CMD% -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file_rel_name}" --disable-pip-version-check
|
|
if %errorlevel% neq 0 (
|
|
goto :install_error
|
|
)
|
|
|
|
echo.
|
|
echo --------------------------
|
|
echo Installation Successful!
|
|
echo --------------------------
|
|
echo.
|
|
pause
|
|
exit /b 0
|
|
|
|
:install_error
|
|
echo.
|
|
echo --------------------------
|
|
echo ERROR: Installation Failed.
|
|
echo --------------------------
|
|
echo Please check the messages above for details.
|
|
pause
|
|
exit /b 1
|
|
"""
|
|
|
|
# Linux/macOS Shell Script
|
|
sh_script_content = f"""#!/bin/sh
|
|
# Shell script for installing packages on Linux/macOS
|
|
|
|
echo "------------------------------------"
|
|
echo "Package Installer for {repo_path.name}"
|
|
echo "------------------------------------"
|
|
echo ""
|
|
echo "Checking for a valid Python pip module..."
|
|
|
|
# Find a valid python command
|
|
if command -v python3 >/dev/null 2>&1 && python3 -m pip --version >/dev/null 2>&1; then
|
|
PYTHON_CMD="python3"
|
|
elif command -v python >/dev/null 2>&1 && python -m pip --version >/dev/null 2>&1; then
|
|
PYTHON_CMD="python"
|
|
else
|
|
echo "ERROR: Could not find a working Python pip installation for 'python' or 'python3'."
|
|
echo "Please ensure Python and pip are correctly installed."
|
|
echo "See: https://pip.pypa.io/en/stable/installation/"
|
|
exit 1
|
|
fi
|
|
echo "Found pip using: '$PYTHON_CMD'"
|
|
|
|
# Get the directory where the script is located
|
|
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
|
REQ_FILE="$SCRIPT_DIR/{req_file_rel_name}"
|
|
|
|
echo ""
|
|
echo "Installing packages from: '$SCRIPT_DIR'"
|
|
echo "Using requirements file: '$REQ_FILE'"
|
|
echo ""
|
|
|
|
"$PYTHON_CMD" -m pip install --no-index --find-links "$SCRIPT_DIR" -r "$REQ_FILE" --disable-pip-version-check
|
|
INSTALL_STATUS=$?
|
|
|
|
echo ""
|
|
if [ $INSTALL_STATUS -ne 0 ]; then
|
|
echo "--------------------------"
|
|
echo "ERROR: Installation Failed."
|
|
echo "--------------------------"
|
|
echo "Please check the error messages above."
|
|
exit 1
|
|
else
|
|
echo "--------------------------"
|
|
echo "Installation Successful!"
|
|
echo "--------------------------"
|
|
exit 0
|
|
fi
|
|
"""
|
|
try:
|
|
bat_path = packages_dir / "install_packages.bat"
|
|
sh_path = packages_dir / "install_packages.sh"
|
|
|
|
with open(bat_path, "w", encoding="utf-8", newline="\r\n") as f:
|
|
f.write(bat_script_content)
|
|
logger.info(f"Created Windows install script: {bat_path}")
|
|
|
|
with open(sh_path, "w", encoding="utf-8", newline="\n") as f:
|
|
f.write(sh_script_content)
|
|
os.chmod(sh_path, 0o755) # Make the script executable on Unix-like systems
|
|
logger.info(f"Created Linux/macOS install script: {sh_path} (executable)")
|
|
|
|
return True, f"Installation scripts created in '{packages_dir}'."
|
|
except IOError as e:
|
|
logger.exception("Error creating installation scripts.")
|
|
return False, f"Error creating scripts: {e}"
|
|
|
|
|
|
def get_installed_packages() -> Dict[str, str]:
|
|
"""
|
|
Retrieves all packages installed in the current Python environment.
|
|
|
|
Returns:
|
|
A dictionary mapping normalized package names to their versions.
|
|
"""
|
|
installed: Dict[str, str] = {}
|
|
cmd = [
|
|
sys.executable, "-m", "pip", "list",
|
|
"--format=json", "--disable-pip-version-check",
|
|
]
|
|
logger.debug(f"Getting installed packages using command: {' '.join(cmd)}")
|
|
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
encoding="utf-8",
|
|
errors="ignore",
|
|
creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0),
|
|
)
|
|
packages = json.loads(proc.stdout)
|
|
for pkg in packages:
|
|
# Normalize names to lowercase and hyphens for reliable matching
|
|
normalized_name = pkg["name"].lower().replace("_", "-")
|
|
installed[normalized_name] = pkg["version"]
|
|
logger.info(f"Found {len(installed)} installed packages in the environment.")
|
|
except FileNotFoundError:
|
|
logger.error(f"Command not found: '{cmd[0]}'. Is Python/pip in PATH?")
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"'pip list' command failed (code {e.returncode}): {e.stderr}")
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse 'pip list' JSON output: {e}")
|
|
except Exception as e:
|
|
logger.exception("Unexpected error getting installed packages.")
|
|
|
|
return installed
|
|
|
|
|
|
def _extract_pkg_name_from_spec(spec: str) -> Optional[str]:
|
|
"""
|
|
Helper to extract a package name from a requirement spec string.
|
|
"""
|
|
if PACKAGING_AVAILABLE:
|
|
try:
|
|
return Requirement(spec).name
|
|
except Exception:
|
|
# Fallback to regex if 'packaging' fails on a malformed line
|
|
pass
|
|
|
|
# Basic regex fallback for when 'packaging' is not available or fails
|
|
match = re.match(r"([a-zA-Z0-9._-]+)", spec)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def compare_requirements_with_installed(
|
|
requirements_file: Path, installed_packages: Dict[str, str]
|
|
) -> List[Dict[str, str]]:
|
|
"""
|
|
Compares a requirements file against the dictionary of installed packages.
|
|
"""
|
|
results: List[Dict[str, str]] = []
|
|
if not requirements_file.is_file():
|
|
logger.warning(f"Comparison failed: '{requirements_file}' not found.")
|
|
return results
|
|
|
|
logger.info(f"Comparing '{requirements_file.name}' with installed packages...")
|
|
with open(requirements_file, "r", encoding="utf-8") as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
spec = line.strip()
|
|
if not spec or spec.startswith("#") or spec.startswith("-"):
|
|
continue
|
|
|
|
pkg_name = _extract_pkg_name_from_spec(spec)
|
|
if not pkg_name:
|
|
logger.warning(f"L{line_num}: Could not extract package name from '{spec}'. Skipping.")
|
|
continue
|
|
|
|
required_spec = "any"
|
|
req_obj = None
|
|
if PACKAGING_AVAILABLE:
|
|
try:
|
|
req_obj = Requirement(spec)
|
|
# Use str() to handle complex specifiers like '>=1.0,<2.0'
|
|
required_spec = str(req_obj.specifier) if req_obj.specifier else "any"
|
|
except Exception as e:
|
|
logger.warning(f"L{line_num}: Could not parse '{spec}' with 'packaging': {e}")
|
|
elif "==" in spec:
|
|
required_spec = spec.split("==", 1)[1].strip()
|
|
|
|
norm_name = pkg_name.lower().replace("_", "-")
|
|
installed_version = installed_packages.get(norm_name)
|
|
|
|
status = ""
|
|
if installed_version:
|
|
if PACKAGING_AVAILABLE and req_obj and req_obj.specifier:
|
|
try:
|
|
if parse_version(installed_version) in req_obj.specifier:
|
|
status = "OK"
|
|
else:
|
|
status = "Version Mismatch"
|
|
except InvalidVersion:
|
|
status = "Invalid Installed Version"
|
|
elif "==" in required_spec: # Basic check if packaging not available
|
|
status = "OK" if installed_version == required_spec else "Version Mismatch"
|
|
else:
|
|
status = "Installed (Version not specified)"
|
|
else:
|
|
status = "Not Installed"
|
|
|
|
results.append({
|
|
"package": pkg_name,
|
|
"required": required_spec,
|
|
"installed": installed_version or "Not Installed",
|
|
"status": status,
|
|
})
|
|
|
|
logger.info(f"Comparison finished with {len(results)} entries.")
|
|
return results
|
|
|
|
|
|
def update_system_packages(
|
|
packages_to_update: List[str], repo_path: Path
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Updates or installs a list of packages using pip install --upgrade.
|
|
It reads the full specifier from the project's requirements file.
|
|
"""
|
|
if not packages_to_update:
|
|
return True, "No packages were specified for update."
|
|
|
|
req_file_path = repo_path / "requirements.txt"
|
|
if not req_file_path.is_file():
|
|
return False, f"Cannot update: '{req_file_path}' not found."
|
|
|
|
# Create a map of {normalized_name: original_spec_line} from requirements.txt
|
|
req_map: Dict[str, str] = {}
|
|
try:
|
|
with open(req_file_path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
spec = line.strip()
|
|
if not spec or spec.startswith("#") or spec.startswith("-"):
|
|
continue
|
|
pkg_name = _extract_pkg_name_from_spec(spec)
|
|
if pkg_name:
|
|
req_map[pkg_name.lower().replace("_", "-")] = spec
|
|
except IOError as e:
|
|
return False, f"Error reading '{req_file_path}': {e}"
|
|
|
|
# Build a list of specs to pass to pip, based on the user's selection
|
|
final_specs_to_update: List[str] = []
|
|
for pkg_name in packages_to_update:
|
|
norm_name = pkg_name.lower().replace("_", "-")
|
|
if norm_name in req_map:
|
|
final_specs_to_update.append(req_map[norm_name])
|
|
else:
|
|
# Fallback: if not in requirements, just use the name
|
|
logger.warning(
|
|
f"'{pkg_name}' not found in requirements.txt. Upgrading by name only."
|
|
)
|
|
final_specs_to_update.append(pkg_name)
|
|
|
|
if not final_specs_to_update:
|
|
return True, "No matching packages found in requirements.txt to update."
|
|
|
|
# Use a temporary file to handle complex specifiers safely
|
|
temp_req_path = repo_path / "_temp_update_reqs.txt"
|
|
cmd = [
|
|
sys.executable, "-m", "pip", "install", "--upgrade",
|
|
"-r", str(temp_req_path.resolve()),
|
|
"--disable-pip-version-check",
|
|
]
|
|
|
|
try:
|
|
with open(temp_req_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(final_specs_to_update))
|
|
|
|
logger.info(f"Attempting to update {len(final_specs_to_update)} package(s).")
|
|
logger.debug(f"Running command: {' '.join(cmd)}")
|
|
|
|
proc = subprocess.run(
|
|
cmd,
|
|
capture_output=True, text=True, encoding="utf-8", errors="ignore",
|
|
timeout=600, check=True,
|
|
creationflags=(subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0),
|
|
)
|
|
|
|
summary = f"Update completed for: {', '.join(packages_to_update)}."
|
|
logger.info(summary)
|
|
if proc.stderr and proc.stderr.strip():
|
|
logger.warning(f"Update process reported warnings:\n{proc.stderr.strip()}")
|
|
return True, summary
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
error_msg = f"Update failed. Pip error (code {e.returncode}):\n{e.stderr.strip()}"
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
except Exception as e:
|
|
logger.exception("An unexpected error occurred during package update.")
|
|
return False, f"An unexpected error occurred: {e}"
|
|
finally:
|
|
# Clean up the temporary file
|
|
if temp_req_path.exists():
|
|
try:
|
|
os.remove(temp_req_path)
|
|
except OSError as e:
|
|
logger.warning(f"Could not remove temporary file '{temp_req_path}': {e}") |