SXXXXXXX_DependencyAnalyzer/dependencyanalyzer/core.py
VALLONGOL 28ad2c66fd Chore: Stop tracking files based on .gitignore update.
Untracked files matching the following rules:
- Rule "*.pyc": 4 files
2025-05-06 13:41:12 +02:00

501 lines
33 KiB
Python

# dependencyanalyzer/core.py
import ast
import os
import shutil # For copying files
import subprocess
import sys
import json
import logging
import importlib.util
from pathlib import Path
import sysconfig
import re
from typing import List, Dict, Set, Tuple, Optional, Union, Any, Callable
# --- Logger Configuration ---
logger = logging.getLogger(__name__)
if not logger.hasHandlers():
if not logging.getLogger().hasHandlers():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Standard Library Detection Logic ---
if sys.version_info >= (3, 10):
STANDARD_LIBRARY_MODULES: Set[str] = sys.stdlib_module_names
logger.info("Using sys.stdlib_module_names for standard library module list (Python 3.10+).")
else:
logger.info("Using a predefined list for standard library modules (Python < 3.10).")
STANDARD_LIBRARY_MODULES = {
'abc', 'aifc', 'argparse', 'array', 'ast', 'asynchat', 'asyncio', 'asyncore',
'atexit', 'audioop', 'base64', 'bdb', 'binascii', 'binhex', 'bisect', 'builtins',
'bz2', 'calendar', 'cgi', 'cgitb', 'chunk', 'cmath', 'cmd', 'code', 'codecs',
'codeop', 'collections', 'colorsys', 'compileall', 'concurrent', 'configparser',
'contextlib', 'contextvars', 'copy', 'copyreg', 'cProfile', 'crypt', 'csv',
'ctypes', 'curses', 'dataclasses', 'datetime', 'dbm', 'decimal', 'difflib',
'dis', 'distutils', 'doctest', 'email', 'encodings', 'ensurepip', 'enum', 'errno',
'faulthandler', 'fcntl', 'filecmp', 'fileinput', 'fnmatch', 'formatter',
'fractions', 'ftplib', 'functools', 'gc', 'getopt', 'getpass', 'gettext',
'glob', 'graphlib', 'grp', 'gzip', 'hashlib', 'heapq', 'hmac', 'html', 'http',
'idlelib', 'imaplib', 'imghdr', 'imp', 'importlib', 'inspect', 'io',
'ipaddress', 'itertools', 'json', 'keyword', 'lib2to3', 'linecache', 'locale',
'logging', 'lzma', 'mailbox', 'mailcap', 'marshal', 'math', 'mimetypes', 'mmap',
'modulefinder', 'multiprocessing', 'netrc', 'nis', 'nntplib', 'numbers',
'operator', 'optparse', 'os', 'ossaudiodev', 'parser', 'pathlib', 'pdb',
'pickle', 'pickletools', 'pipes', 'pkgutil', 'platform', 'plistlib', 'poplib',
'posix', 'pprint', 'profile', 'pstats', 'pty', 'pwd', 'py_compile', 'pyclbr',
'pydoc', 'pydoc_data', 'pyexpat', 'queue', 'quopri', 'random', 're',
'readline', 'reprlib', 'resource', 'rlcompleter', 'runpy', 'sched', 'secrets',
'select', 'selectors', 'shelve', 'shlex', 'shutil', 'signal', 'site',
'smtpd', 'smtplib', 'sndhdr', 'socket', 'socketserver', 'spwd', 'sqlite3',
'ssl', 'stat', 'statistics', 'string', 'stringprep', 'struct', 'subprocess',
'sunau', 'symtable', 'sys', 'sysconfig', 'syslog', 'tabnanny', 'tarfile',
'telnetlib', 'tempfile', 'termios', 'textwrap', 'threading', 'time', 'timeit',
'tkinter', 'token', 'tokenize', 'trace', 'traceback', 'tracemalloc', 'tty',
'turtle', 'turtledemo', 'types', 'typing', 'unicodedata', 'unittest', 'urllib',
'uu', 'uuid', 'venv', 'warnings', 'wave', 'weakref', 'webbrowser', 'wsgiref',
'xdrlib', 'xml', 'xmlrpc', 'zipapp', 'zipfile', 'zipimport', 'zlib',
'_thread', '_collections_abc', '_json'
}
_CACHED_STD_LIB_PATHS: Optional[Set[str]] = None
def get_standard_library_paths() -> Set[str]:
global _CACHED_STD_LIB_PATHS
if _CACHED_STD_LIB_PATHS is not None:
return _CACHED_STD_LIB_PATHS
paths = set()
try:
stdlib_path = sysconfig.get_path('stdlib')
if stdlib_path and os.path.isdir(stdlib_path): paths.add(os.path.normpath(stdlib_path))
platstdlib_path = sysconfig.get_path('platstdlib')
if platstdlib_path and os.path.isdir(platstdlib_path): paths.add(os.path.normpath(platstdlib_path))
main_lib_path = os.path.normpath(os.path.join(sys.prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}"))
if os.path.isdir(main_lib_path): paths.add(main_lib_path)
if sys.platform == "win32":
dlls_path = os.path.join(sys.prefix, "DLLs")
if os.path.isdir(dlls_path): paths.add(os.path.normpath(dlls_path))
else:
lib_dynload_path = os.path.join(sys.exec_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}", "lib-dynload")
if os.path.isdir(lib_dynload_path): paths.add(os.path.normpath(lib_dynload_path))
framework_prefix = sysconfig.get_config_var('PYTHONFRAMEWORKPREFIX')
if framework_prefix and isinstance(framework_prefix, str) and os.path.isdir(framework_prefix):
fw_path = os.path.normpath(os.path.join(framework_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}"))
if os.path.isdir(fw_path): paths.add(fw_path)
except Exception as e:
logger.warning(f"Error determining some standard library paths with sysconfig: {e}")
try:
paths.add(os.path.normpath(os.path.dirname(os.__file__)))
paths.add(os.path.normpath(os.path.dirname(sysconfig.__file__)))
except NameError: logger.error("Failed to get even basic standard library paths.")
_CACHED_STD_LIB_PATHS = {p for p in paths if p}
logger.debug(f"Cached standard library paths: {_CACHED_STD_LIB_PATHS}")
return _CACHED_STD_LIB_PATHS
def is_path_in_standard_library(file_path_str: Optional[str]) -> bool:
if not file_path_str: return False
std_lib_paths = get_standard_library_paths()
if not std_lib_paths: logger.warning("Standard library paths could not be determined."); return False
normalized_file_path = os.path.normpath(file_path_str)
path_parts = Path(normalized_file_path).parts
if 'site-packages' in path_parts or 'dist-packages' in path_parts:
logger.debug(f"Path '{normalized_file_path}' contains 'site-packages' or 'dist-packages', considered non-standard.")
return False
for std_path in std_lib_paths:
try:
if normalized_file_path.startswith(std_path + os.sep):
logger.debug(f"Path '{normalized_file_path}' starts with standard path '{std_path}'.")
return True
except Exception as e: logger.debug(f"Error checking path {normalized_file_path} against {std_path}: {e}"); continue
logger.debug(f"Path '{normalized_file_path}' not found within standard library paths: {list(std_lib_paths)}")
return False
def _is_standard_library(module_name: str) -> bool:
if module_name in STANDARD_LIBRARY_MODULES: logger.debug(f"Module '{module_name}' is in STANDARD_LIBRARY_MODULES list."); return True
try: spec = importlib.util.find_spec(module_name)
except (ImportError, ValueError, ModuleNotFoundError) as e: logger.warning(f"Could not process module '{module_name}' for spec: {e}. Assuming non-standard."); return False
if spec is None: logger.debug(f"No spec found for module '{module_name}'. Assuming non-standard."); return False
origin = spec.origin; loader_name = spec.loader.__class__.__name__ if spec.loader else "UnknownLoader"
logger.debug(f"Module '{module_name}': origin='{origin}', loader='{loader_name}'")
if origin == 'built-in' or origin == 'frozen': logger.debug(f"Module '{module_name}' is '{origin}'. Classified as standard."); return True
if origin is None:
if loader_name in ('BuiltinImporter', 'FrozenImporter', 'SourcelessFileLoader'):
if hasattr(spec, 'submodule_search_locations') and spec.submodule_search_locations and isinstance(spec.submodule_search_locations, list) and spec.submodule_search_locations[0]:
search_loc = spec.submodule_search_locations[0]
if is_path_in_standard_library(search_loc): logger.debug(f"Module '{module_name}' (origin=None, loader='{loader_name}') search location '{search_loc}' is standard."); return True
else: logger.debug(f"Module '{module_name}' (origin=None, loader='{loader_name}') classified as standard by loader type."); return True
logger.debug(f"Module '{module_name}' (origin=None, loader='{loader_name}') classified as non-standard."); return False
if is_path_in_standard_library(origin): logger.debug(f"Module '{module_name}' (origin: '{origin}') IS in a standard library path."); return True
else: logger.debug(f"Module '{module_name}' (origin: '{origin}') IS NOT in a standard library path."); return False
MODULE_NAME_TO_PACKAGE_NAME_MAP: Dict[str, str] = { "PIL": "Pillow" }
FALSE_POSITIVE_EXTERNAL_MODULES: Set[str] = { "mpl_toolkits" }
class ImportExtractor(ast.NodeVisitor):
def __init__(self, file_path_str: str): super().__init__(); self.file_path_str = file_path_str; self.imported_modules: Set[Tuple[str, str]] = set()
def visit_Import(self, node: ast.Import):
for alias in node.names:
module_name = alias.name.split('.')[0]
if module_name: self.imported_modules.add((module_name, self.file_path_str))
self.generic_visit(node)
def visit_ImportFrom(self, node: ast.ImportFrom):
if node.module and node.level == 0:
module_name = node.module.split('.')[0];
if module_name: self.imported_modules.add((module_name, self.file_path_str))
elif node.level > 0: logger.debug(f"Ignoring relative import in {self.file_path_str} at line {node.lineno}")
self.generic_visit(node)
DependencyInfo = Dict[str, Dict[str, Union[Set[str], Optional[str], str]]]
def find_project_modules_and_dependencies(repo_path: Path) -> Tuple[DependencyInfo, DependencyInfo]:
all_imports_locations: Dict[str, Set[str]] = {}; project_modules: Set[str] = set()
logger.info(f"Starting comprehensive analysis of all .py files in: {repo_path}")
try:
for item in repo_path.rglob('*'):
if item.is_dir() and (item / '__init__.py').exists():
try:
relative_dir = item.relative_to(repo_path);
if relative_dir.parts: project_modules.add(relative_dir.parts[0])
except ValueError: pass
elif item.is_file() and item.suffix == '.py' and item.name != '__init__.py':
try:
relative_file = item.relative_to(repo_path)
if len(relative_file.parts) == 1: project_modules.add(item.stem)
elif len(relative_file.parts) > 1 and (item.parent / '__init__.py').exists(): project_modules.add(relative_file.parts[0])
except ValueError: pass
except Exception as e: logger.error(f"Error during project module identification: {e}")
logger.debug(f"Potential project modules identified: {project_modules}")
excluded_dirs = {'venv', '.venv', 'env', '.env', 'docs', 'tests', 'test', 'site-packages', 'dist-packages', '__pycache__', '.git', '.hg', '.svn', '.tox', '.nox', 'build', 'dist', '*.egg-info'}
for root, dirs, files in os.walk(repo_path, topdown=True):
dirs[:] = [d for d in dirs if d not in excluded_dirs]
for file_name in files:
if file_name.endswith(".py"):
file_path_obj = Path(root) / file_name
try: relative_file_path_str = str(file_path_obj.relative_to(repo_path))
except ValueError: relative_file_path_str = str(file_path_obj); logger.warning(f"Could not make path relative for {file_path_obj}, using absolute.")
logger.debug(f"Analyzing file: {relative_file_path_str}")
try:
with open(file_path_obj, 'r', encoding='utf-8', errors='ignore') as f_content: source_code = f_content.read()
tree = ast.parse(source_code, filename=str(file_path_obj))
extractor = ImportExtractor(file_path_str=relative_file_path_str)
extractor.visit(tree)
for module_name, rel_path in extractor.imported_modules:
if not module_name: continue
locations = all_imports_locations.setdefault(module_name, set()); locations.add(rel_path)
except SyntaxError as e: logger.warning(f"Syntax error in {relative_file_path_str}: {e}. Skipping file.")
except Exception as e: logger.error(f"Error processing file {relative_file_path_str}: {e}")
logger.info(f"Total unique top-level imports found: {list(all_imports_locations.keys())}")
std_lib_imports_info: DependencyInfo = {}; external_dependencies_info: DependencyInfo = {}
for imported_module_name, locations_set in all_imports_locations.items():
if imported_module_name in project_modules: logger.debug(f"Ignoring '{imported_module_name}' as a project module."); continue
if imported_module_name in FALSE_POSITIVE_EXTERNAL_MODULES: logger.info(f"Ignoring '{imported_module_name}' as a false positive."); continue
if _is_standard_library(imported_module_name):
logger.debug(f"Module '{imported_module_name}' classified as STANDARD library."); std_lib_imports_info[imported_module_name] = {'locations': locations_set, 'version': None}
else:
pypi_package_name = MODULE_NAME_TO_PACKAGE_NAME_MAP.get(imported_module_name, imported_module_name)
original_import_name_if_mapped = imported_module_name if pypi_package_name != imported_module_name else None
logger.info(f"Module '{imported_module_name}' (PyPI: '{pypi_package_name}') classified as EXTERNAL dependency.")
current_version: Optional[str] = None
try: current_version = importlib.metadata.version(pypi_package_name); logger.debug(f"Found installed version for '{pypi_package_name}': {current_version}")
except importlib.metadata.PackageNotFoundError: logger.warning(f"External package '{pypi_package_name}' (from import '{imported_module_name}') not found. Version unknown.")
except Exception as e: logger.error(f"Error getting version for '{pypi_package_name}' (from import '{imported_module_name}'): {e}")
dep_data = external_dependencies_info.setdefault(pypi_package_name, {'locations': set(), 'version': current_version, 'original_import_name': None})
dep_data['locations'].update(locations_set) # type: ignore
if original_import_name_if_mapped and dep_data.get('original_import_name') is None: dep_data['original_import_name'] = original_import_name_if_mapped
if dep_data.get('version') is None and current_version is not None: dep_data['version'] = current_version
logger.info(f"Identified {len(std_lib_imports_info)} stdlib modules used."); logger.info(f"Identified {len(external_dependencies_info)} unique external dependencies.")
return std_lib_imports_info, external_dependencies_info
def find_main_script(repo_path: Path) -> Optional[Path]: # RE-ADDED THIS FUNCTION
repo_name = repo_path.name; script_subfolder_name = repo_name.lower()
potential_script_path_subdir = repo_path / script_subfolder_name / "__main__.py"
potential_script_path_root = repo_path / "__main__.py"
logger.info(f"Looking for main script at: {potential_script_path_subdir} or {potential_script_path_root}")
if potential_script_path_subdir.is_file(): logger.info(f"Main script found: {potential_script_path_subdir}"); return potential_script_path_subdir
elif potential_script_path_root.is_file(): logger.info(f"Main script found in repository root: {potential_script_path_root}"); return potential_script_path_root
else: logger.warning(f"Main script (__main__.py) not found in {potential_script_path_subdir} or {potential_script_path_root}."); return None
def generate_requirements_file(repo_path: Path, external_deps_info: DependencyInfo, std_lib_deps_info: DependencyInfo) -> Path:
requirements_file_path = repo_path / "requirements.txt"
python_version_str = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
try:
with open(requirements_file_path, 'w', encoding='utf-8') as f:
f.write(f"# Requirements automatically generated by DependencyAnalyzer\n")
f.write(f"# Project: {repo_path.name}\n")
f.write(f"# Python Version (of analysis environment): {python_version_str}\n")
f.write("# Use 'pip install -r requirements.txt' to install external dependencies listed below.\n\n")
f.write(f"# --- Standard Library Modules Used (part of Python {python_version_str}) ---\n")
if not std_lib_deps_info: f.write("# No standard library modules explicitly imported or detected by analysis.\n")
else:
for dep_name in sorted(std_lib_deps_info.keys()):
info = std_lib_deps_info[dep_name]; locations = info.get('locations', set())
loc_list = sorted(list(locations)) if isinstance(locations, set) else []
loc_comment = f"Used in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else ""
f.write(f"# {dep_name} ({loc_comment.strip()})\n".replace(" ()","").strip())
f.write("\n# --- External Dependencies (install with pip) ---\n")
if not external_deps_info: f.write("# No external dependencies found by analysis.\n")
else:
for pypi_name in sorted(external_deps_info.keys()):
info = external_deps_info[pypi_name]; locations = info.get('locations', set()); version = info.get('version'); orig_import = info.get('original_import_name')
loc_list = sorted(list(locations)) if isinstance(locations, set) else []
imp_as = f"(imported as '{orig_import}') " if orig_import else ""
loc_comment = f"# Found {imp_as}in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else ""
ver_comment = f"# Detected version (in analysis env): {version}" if version else "# Version not detected (likely not installed in analysis env)"
if loc_comment: f.write(f"{loc_comment}\n")
f.write(f"{ver_comment}\n"); f.write(f"{pypi_name}\n\n")
logger.info(f"Successfully generated {requirements_file_path} with detailed dependency info.")
except IOError as e: logger.error(f"Error writing requirements.txt: {e}"); raise
return requirements_file_path
def _copy_requirements_to_packages_dir(repo_path: Path, packages_dir: Path) -> bool:
"""Helper to copy requirements.txt to the _req_packages directory."""
source_req_file = repo_path / "requirements.txt"
dest_req_file = packages_dir / "requirements.txt"
if source_req_file.exists():
try:
shutil.copy2(source_req_file, dest_req_file) # copy2 preserves metadata
logger.info(f"Copied '{source_req_file.name}' to '{packages_dir}'.")
return True
except Exception as e:
logger.error(f"Failed to copy '{source_req_file.name}' to '{packages_dir}': {e}")
return False
else:
logger.warning(f"Source '{source_req_file.name}' does not exist. Cannot copy.")
return False
def download_packages(repo_path: Path, requirements_file_path_in_repo: Path) -> Tuple[bool, str]:
packages_dir = repo_path / "_req_packages"; packages_dir.mkdir(parents=True, exist_ok=True)
if not requirements_file_path_in_repo.exists() or requirements_file_path_in_repo.stat().st_size == 0:
return True, f"Source requirements file '{requirements_file_path_in_repo.name}' is empty or does not exist. No packages to download."
_copy_requirements_to_packages_dir(repo_path, packages_dir) # Copy it first
# Pip download command will use the requirements.txt from the repo root for consistency of source
command = [ sys.executable, "-m", "pip", "download", "-r", str(requirements_file_path_in_repo.resolve()),
"-d", str(packages_dir.resolve()), "--no-cache-dir", "--disable-pip-version-check" ]
logger.info(f"Executing command: {' '.join(command)}")
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8',
creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0)
stdout, stderr = process.communicate(timeout=300)
if process.returncode == 0:
msg = f"Packages downloaded successfully to '{packages_dir}'."
logger.info(msg);
if stdout:
logger.debug(f"Pip download stdout:\n{stdout}");
if stderr:
logger.warning(f"Pip download stderr (though successful):\n{stderr}")
return True, msg
else:
error_msg = f"Error downloading packages. Pip exit code: {process.returncode}\nStderr:\n{stderr}\nStdout:\n{stdout}"
logger.error(error_msg); return False, error_msg
except subprocess.TimeoutExpired:
error_msg = "Package download command timed out.";
logger.error(error_msg);
if process:
process.kill();
return False, error_msg
except FileNotFoundError: error_msg = "Error: 'pip' (via sys.executable -m pip) command not found."; logger.error(error_msg); return False, error_msg
except Exception as e: error_msg = f"An unexpected error occurred during package download: {e}"; logger.error(error_msg); return False, error_msg
def create_install_scripts(repo_path: Path, requirements_file_path_in_repo: Path) -> Tuple[bool, str]:
packages_dir = repo_path / "_req_packages"; packages_dir.mkdir(parents=True, exist_ok=True)
if not requirements_file_path_in_repo.exists():
return False, f"Source requirements file '{requirements_file_path_in_repo.name}' does not exist. Cannot create install scripts reliably."
# Copy requirements.txt to _req_packages so scripts can use it relatively
if not _copy_requirements_to_packages_dir(repo_path, packages_dir):
return False, "Failed to copy requirements.txt to packages directory. Install scripts may not work."
# Scripts will now refer to 'requirements.txt' in their own directory (_req_packages)
local_req_file_name = "requirements.txt"
bat_content = f"""@echo off
echo Checking for pip...
python -m pip --version >nul 2>&1
if %errorlevel% neq 0 (
echo ERROR: pip is not installed or not found in PATH.
echo Please install pip and ensure it's accessible to your Python installation.
echo For more info, see: https://pip.pypa.io/en/stable/installation/
pause
exit /b 1
)
echo pip found.
echo Installing packages from local folder '{packages_dir.name}'...
echo Using requirements file: {local_req_file_name}
echo Finding packages in: %~dp0
REM Use pip from the current environment
python -m pip install --no-index --find-links "%~dp0" -r "{local_req_file_name}"
if %errorlevel% neq 0 (
echo ERROR: Failed to install packages. See output above for details.
pause
exit /b 1
)
echo Packages installed successfully.
pause
"""
sh_content = f"""#!/bin/bash
echo "Checking for pip..."
if ! python -m pip --version > /dev/null 2>&1 && ! python3 -m pip --version > /dev/null 2>&1; then
echo "ERROR: pip is not installed or not found in PATH for python/python3."
echo "Please install pip and ensure it's accessible."
echo "For more info, see: https://pip.pypa.io/en/stable/installation/"
exit 1
fi
echo "pip found."
echo "Installing packages from local folder '{packages_dir.name}'..."
SCRIPT_DIR="$( cd "$( dirname "${{BASH_SOURCE[0]}}" )" &> /dev/null && pwd )"
echo "Using requirements file: {local_req_file_name}"
echo "Finding packages in: $SCRIPT_DIR"
# Determine python command
PYTHON_CMD="python"
if ! command -v python >/dev/null 2>&1 ; then
if command -v python3 >/dev/null 2>&1 ; then
PYTHON_CMD="python3"
else
echo "ERROR: Neither 'python' nor 'python3' command found in PATH."
exit 1
fi
fi
# Use pip from the current environment
"$PYTHON_CMD" -m pip install --no-index --find-links "$SCRIPT_DIR" -r "$SCRIPT_DIR/{local_req_file_name}"
if [ $? -ne 0 ]; then
echo "ERROR: Failed to install packages. See output above for details."
exit 1
fi
echo "Packages installed successfully."
"""
try:
bat_file_path = packages_dir / "install_packages.bat";
with open(bat_file_path, 'w', encoding='utf-8', newline='\r\n') as f: f.write(bat_content); logger.info(f"Created {bat_file_path}")
sh_file_path = packages_dir / "install_packages.sh";
with open(sh_file_path, 'w', encoding='utf-8', newline='\n') as f: f.write(sh_content); os.chmod(sh_file_path, 0o755); logger.info(f"Created {sh_file_path} and made it executable.")
return True, f"Installation scripts created in '{packages_dir}'."
except IOError as e: error_msg = f"Error creating installation scripts: {e}"; logger.error(error_msg); return False, error_msg
def get_installed_packages() -> Dict[str, str]:
# ... (unchanged) ...
installed_packages: Dict[str, str] = {};
try:
command = [sys.executable, "-m", "pip", "list", "--format=json", "--disable-pip-version-check"]
process = subprocess.run( command, capture_output=True, text=True, check=True, encoding='utf-8', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 )
packages_json = json.loads(process.stdout)
for pkg_info in packages_json: installed_packages[pkg_info['name'].lower().replace('_', '-')] = pkg_info['version'] # Normalize name
logger.info(f"Found {len(installed_packages)} installed packages.")
except Exception as e: logger.error(f"Error getting installed packages: {e}")
return installed_packages
try:
from packaging.requirements import Requirement
from packaging.version import parse as parse_version, InvalidVersion
PACKAGING_AVAILABLE = True; logger.info("Imported 'packaging' library.")
except ImportError:
PACKAGING_AVAILABLE = False; Requirement = None; parse_version = None; InvalidVersion = None # type: ignore
logger.warning("'packaging' library not found. Version comparison will be basic.")
def compare_requirements_with_installed( requirements_file: Path, installed_packages: Dict[str, str]) -> List[Dict[str, str]]:
# ... (unchanged from last full version, uses PyPI names from requirements.txt) ...
comparison_results: List[Dict[str, str]] = []
if not requirements_file.exists(): logger.warning(f"Requirements file {requirements_file} not found for comparison."); return comparison_results
with open(requirements_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line_content = line.strip()
if not line_content or line_content.startswith('#') or line_content.startswith('-'): continue
pypi_package_spec = line_content; package_name_to_check = pypi_package_spec; required_specifier_str = "any"; req_obj = None
if PACKAGING_AVAILABLE:
try: req_obj = Requirement(pypi_package_spec); package_name_to_check = req_obj.name; required_specifier_str = str(req_obj.specifier) if req_obj.specifier else "any"
except Exception as e:
logger.warning(f"Could not parse req string '{pypi_package_spec}' line {line_num} with 'packaging': {e}.");
match = re.match(r"([a-zA-Z0-9._-]+)", pypi_package_spec);
if match:
package_name_to_check = match.group(1)
else:
match_with_spec = re.match(r"([a-zA-Z0-D._-]+)\s*([<>=!~]=?)\s*([0-9a-zA-Z.]+)", pypi_package_spec)
if match_with_spec: package_name_to_check, required_specifier_str = match_with_spec.group(1), match_with_spec.group(2) + match_with_spec.group(3)
else:
match_name_only = re.match(r"([a-zA-Z0-9._-]+)", pypi_package_spec);
if match_name_only:
package_name_to_check = match_name_only.group(1)
normalized_package_name_lookup = package_name_to_check.lower().replace('_', '-')
installed_version_str = installed_packages.get(normalized_package_name_lookup)
status = ""; result_item = {"package": package_name_to_check, "required": required_specifier_str, "installed": installed_version_str if installed_version_str else "Not installed"}
if installed_version_str:
if PACKAGING_AVAILABLE and req_obj and req_obj.specifier:
try:
installed_v = parse_version(installed_version_str);
if installed_v in req_obj.specifier:
status = "OK (matches specifier)"
else:
status = "Mismatch (installed version does not meet specifier)"
except InvalidVersion: status = "Installed (installed version format invalid)"
except Exception as e: status = "Error comparing versions"; logger.error(f"Compare error {package_name_to_check}: {e}")
elif required_specifier_str != "any" and "==" in required_specifier_str:
expected_version = required_specifier_str.split('==', 1)[1].strip()
if installed_version_str == expected_version: status = "OK (basic check)"
else: status = "Mismatch (basic check)"
else: status = "Installed (version not checked against specifier)"
else: status = "Not installed"
result_item["status"] = status; comparison_results.append(result_item)
logger.info(f"Package comparison complete. Results: {len(comparison_results)} items."); logger.debug(f"Comparison results: {comparison_results}")
return comparison_results
def update_system_packages(packages_to_update: List[str], repo_path: Path) -> Tuple[bool, str]:
# ... (unchanged from last full version, uses PyPI names) ...
if not packages_to_update: return True, "No packages specified for update."
requirements_file = repo_path / "requirements.txt";
if not requirements_file.exists(): return False, f"Cannot update: {requirements_file} does not exist."
temp_req_content: List[str] = []; all_requirements_lines: Dict[str, str] = {}
with open(requirements_file, 'r', encoding='utf-8') as f_req:
package_spec_line = ""
for line in f_req:
stripped_line = line.strip()
if not stripped_line or stripped_line.startswith('-') or stripped_line.startswith('#'): continue
package_spec_line = stripped_line
pkg_name_from_line = ""
if PACKAGING_AVAILABLE:
try: req = Requirement(package_spec_line); pkg_name_from_line = req.name
except:
match = re.match(r"([a-zA-Z0-9._-]+)", package_spec_line);
if match:
pkg_name_from_line = match.group(1)
else:
match = re.match(r"([a-zA-Z0-9._-]+)", package_spec_line)
if match: pkg_name_from_line = match.group(1)
if pkg_name_from_line: all_requirements_lines[pkg_name_from_line.lower().replace('_', '-')] = package_spec_line
package_spec_line = ""
for pkg_name_to_update in packages_to_update:
normalized_pkg_name = pkg_name_to_update.lower().replace('_','-')
if normalized_pkg_name in all_requirements_lines: temp_req_content.append(all_requirements_lines[normalized_pkg_name])
else: logger.warning(f"Package '{pkg_name_to_update}' for update not found in {requirements_file}. Will upgrade by name."); temp_req_content.append(pkg_name_to_update)
if not temp_req_content: return True, "No packages to update match the requirements file content."
temp_req_file_path = repo_path / "_temp_update_reqs.txt"
try:
with open(temp_req_file_path, 'w', encoding='utf-8') as f_temp:
for req_line in temp_req_content: f_temp.write(f"{req_line}\n")
command = [ sys.executable, "-m", "pip", "install", "--upgrade", "-r", str(temp_req_file_path.resolve()), "--disable-pip-version-check" ]
logger.info(f"Executing update command: {' '.join(command)}")
process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0 )
stdout, stderr = process.communicate(timeout=600)
if process.returncode == 0:
msg = f"Packages update completed for: {', '.join(packages_to_update)}.\nPIP Output:\n{stdout[:1000]}...";
logger.info(msg);
if stderr:
logger.warning(f"PIP Update Stderr:\n{stderr[:1000]}...");
return True, msg
else: error_msg = f"Error updating. Pip exit code: {process.returncode}\nStderr:\n{stderr}\nStdout:\n{stdout}"; logger.error(error_msg); return False, error_msg
except subprocess.TimeoutExpired:
error_msg = "Package update command timed out.";
logger.error(error_msg);
if process:
process.kill();
return False, error_msg
finally:
if temp_req_file_path.exists():
try: os.remove(temp_req_file_path); logger.debug(f"Removed temp file: {temp_req_file_path}")
except OSError as e: logger.warning(f"Could not remove temp file {temp_req_file_path}: {e}")