488 lines
32 KiB
Python
488 lines
32 KiB
Python
# dependencyanalyzer/core.py
|
|
import ast
|
|
import os
|
|
import shutil # For copying files
|
|
import subprocess
|
|
import sys
|
|
import json
|
|
import logging
|
|
import importlib.util
|
|
from pathlib import Path
|
|
import sysconfig
|
|
import re
|
|
from typing import List, Dict, Set, Tuple, Optional, Union, Any, Callable
|
|
|
|
# --- Logger Configuration ---
|
|
logger = logging.getLogger(__name__) # Use __name__ for logger hierarchy
|
|
|
|
# --- Standard Library Detection Logic ---
|
|
if sys.version_info >= (3, 10):
|
|
try:
|
|
STANDARD_LIBRARY_MODULES: frozenset[str] = sys.stdlib_module_names
|
|
logger.debug("Using sys.stdlib_module_names for standard library module list (Python 3.10+).")
|
|
except AttributeError:
|
|
logger.warning("sys.stdlib_module_names not found despite Python 3.10+. Using predefined list.")
|
|
STANDARD_LIBRARY_MODULES = frozenset() # Will be populated below
|
|
else:
|
|
logger.debug("Using a predefined list for standard library modules (Python < 3.10).")
|
|
# Predefined list (assign directly here)
|
|
_PREDEFINED_STDLIBS = {
|
|
'abc', 'aifc', 'argparse', 'array', 'ast', 'asynchat', 'asyncio', 'asyncore',
|
|
'atexit', 'audioop', 'base64', 'bdb', 'binascii', 'binhex', 'bisect', 'builtins',
|
|
'bz2', 'calendar', 'cgi', 'cgitb', 'chunk', 'cmath', 'cmd', 'code', 'codecs',
|
|
'codeop', 'collections', 'colorsys', 'compileall', 'concurrent', 'configparser',
|
|
'contextlib', 'contextvars', 'copy', 'copyreg', 'cProfile', 'crypt', 'csv',
|
|
'ctypes', 'curses', 'dataclasses', 'datetime', 'dbm', 'decimal', 'difflib',
|
|
'dis', 'distutils', 'doctest', 'email', 'encodings', 'ensurepip', 'enum', 'errno',
|
|
'faulthandler', 'fcntl', 'filecmp', 'fileinput', 'fnmatch', 'formatter',
|
|
'fractions', 'ftplib', 'functools', 'gc', 'getopt', 'getpass', 'gettext',
|
|
'glob', 'graphlib', 'grp', 'gzip', 'hashlib', 'heapq', 'hmac', 'html', 'http',
|
|
'idlelib', 'imaplib', 'imghdr', 'imp', 'importlib', 'inspect', 'io',
|
|
'ipaddress', 'itertools', 'json', 'keyword', 'lib2to3', 'linecache', 'locale',
|
|
'logging', 'lzma', 'mailbox', 'mailcap', 'marshal', 'math', 'mimetypes', 'mmap',
|
|
'modulefinder', 'multiprocessing', 'netrc', 'nis', 'nntplib', 'numbers',
|
|
'operator', 'optparse', 'os', 'ossaudiodev', 'parser', 'pathlib', 'pdb',
|
|
'pickle', 'pickletools', 'pipes', 'pkgutil', 'platform', 'plistlib', 'poplib',
|
|
'posix', 'pprint', 'profile', 'pstats', 'pty', 'pwd', 'py_compile', 'pyclbr',
|
|
'pydoc', 'pydoc_data', 'pyexpat', 'queue', 'quopri', 'random', 're',
|
|
'readline', 'reprlib', 'resource', 'rlcompleter', 'runpy', 'sched', 'secrets',
|
|
'select', 'selectors', 'shelve', 'shlex', 'shutil', 'signal', 'site',
|
|
'smtpd', 'smtplib', 'sndhdr', 'socket', 'socketserver', 'spwd', 'sqlite3',
|
|
'ssl', 'stat', 'statistics', 'string', 'stringprep', 'struct', 'subprocess',
|
|
'sunau', 'symtable', 'sys', 'sysconfig', 'syslog', 'tabnanny', 'tarfile',
|
|
'telnetlib', 'tempfile', 'termios', 'textwrap', 'threading', 'time', 'timeit',
|
|
'tkinter', 'token', 'tokenize', 'trace', 'traceback', 'tracemalloc', 'tty',
|
|
'turtle', 'turtledemo', 'types', 'typing', 'unicodedata', 'unittest', 'urllib',
|
|
'uu', 'uuid', 'venv', 'warnings', 'wave', 'weakref', 'webbrowser', 'wsgiref',
|
|
'xdrlib', 'xml', 'xmlrpc', 'zipapp', 'zipfile', 'zipimport', 'zlib',
|
|
'_thread', '_collections_abc', '_json', '_datetime', '_weakrefset', '_strptime',
|
|
'_socket', '_ssl', '_struct', '_queue', '_pickle', '_lsprof', '_heapq', '_hashlib',
|
|
'_csv', '_bz2', '_codecs', '_bisect', '_blake2', '_asyncio', '_ast', '_abc',
|
|
}
|
|
STANDARD_LIBRARY_MODULES = frozenset(_PREDEFINED_STDLIBS)
|
|
|
|
# Final check/assignment if Python 3.10+ but stdlib_module_names failed
|
|
if sys.version_info >= (3, 10) and (not isinstance(STANDARD_LIBRARY_MODULES, frozenset) or not STANDARD_LIBRARY_MODULES):
|
|
logger.error("Could not determine standard library modules for Python 3.10+. Stdlib check may be unreliable.")
|
|
# Assign empty or predefined as last resort? Let's stick with empty to signal failure.
|
|
STANDARD_LIBRARY_MODULES = frozenset()
|
|
|
|
_CACHED_STD_LIB_PATHS: Optional[Set[str]] = None
|
|
|
|
def get_standard_library_paths() -> Set[str]:
|
|
"""Retrieves and caches normalized paths for the Python standard library."""
|
|
global _CACHED_STD_LIB_PATHS
|
|
if _CACHED_STD_LIB_PATHS is not None: return _CACHED_STD_LIB_PATHS
|
|
paths = set(); logger.debug("Determining standard library paths...")
|
|
try:
|
|
for path_name in ('stdlib', 'platstdlib'):
|
|
try:
|
|
path_val = sysconfig.get_path(path_name);
|
|
if path_val and os.path.isdir(path_val): paths.add(os.path.normpath(path_val)); logger.debug(f"Found stdlib path ({path_name}): {path_val}")
|
|
except Exception as e: logger.warning(f"Could not get sysconfig path '{path_name}': {e}")
|
|
prefix_lib_path = os.path.normpath(os.path.join(sys.prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}"))
|
|
if os.path.isdir(prefix_lib_path): paths.add(prefix_lib_path); logger.debug(f"Found stdlib path (prefix): {prefix_lib_path}")
|
|
if sys.platform == "win32":
|
|
dlls_path = os.path.join(sys.prefix, "DLLs");
|
|
if os.path.isdir(dlls_path): paths.add(os.path.normpath(dlls_path)); logger.debug(f"Found stdlib path (DLLs): {dlls_path}")
|
|
else:
|
|
dynload_path = os.path.join(sys.exec_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}", "lib-dynload");
|
|
if os.path.isdir(dynload_path): paths.add(os.path.normpath(dynload_path)); logger.debug(f"Found stdlib path (dynload): {dynload_path}")
|
|
fw_prefix = sysconfig.get_config_var('PYTHONFRAMEWORKPREFIX');
|
|
if fw_prefix and isinstance(fw_prefix, str) and os.path.isdir(fw_prefix):
|
|
fw_path = os.path.normpath(os.path.join(fw_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}"))
|
|
if os.path.isdir(fw_path): paths.add(fw_path); logger.debug(f"Found stdlib path (Framework): {fw_path}")
|
|
if not paths:
|
|
logger.warning("Sysconfig paths failed, attempting fallback using 'os' and 'sysconfig' module locations.")
|
|
try: paths.add(os.path.normpath(os.path.dirname(os.__file__)))
|
|
except Exception: logger.error("Could not determine path for 'os' module.")
|
|
try: paths.add(os.path.normpath(os.path.dirname(sysconfig.__file__)))
|
|
except Exception: logger.error("Could not determine path for 'sysconfig' module.")
|
|
except Exception as e: logger.exception(f"Unexpected error determining standard library paths: {e}")
|
|
_CACHED_STD_LIB_PATHS = {p for p in paths if p};
|
|
if not _CACHED_STD_LIB_PATHS: logger.error("Failed to determine ANY standard library paths.")
|
|
else: logger.debug(f"Final cached standard library paths: {_CACHED_STD_LIB_PATHS}")
|
|
return _CACHED_STD_LIB_PATHS
|
|
|
|
def is_path_in_standard_library(file_path_str: Optional[str]) -> bool:
|
|
"""Checks if a file path string is within any known standard library directory."""
|
|
if not file_path_str: return False
|
|
std_lib_paths = get_standard_library_paths();
|
|
if not std_lib_paths: return False
|
|
try:
|
|
norm_file_path = os.path.normpath(os.path.abspath(file_path_str))
|
|
path_parts = Path(norm_file_path).parts
|
|
if 'site-packages' in path_parts or 'dist-packages' in path_parts: return False
|
|
for std_path in std_lib_paths:
|
|
norm_std_path = os.path.normpath(os.path.abspath(std_path))
|
|
if norm_file_path.startswith(norm_std_path + os.sep): return True
|
|
except Exception as e: logger.warning(f"Error during path comparison for '{file_path_str}': {e}"); return False
|
|
return False
|
|
|
|
def _is_standard_library(module_name: str) -> bool:
|
|
"""Checks if a module name belongs to the standard library."""
|
|
if module_name in STANDARD_LIBRARY_MODULES: logger.debug(f"'{module_name}' in predefined list."); return True
|
|
try: spec = importlib.util.find_spec(module_name)
|
|
except: logger.debug(f"Spec finding failed for '{module_name}'. Assuming non-standard."); return False
|
|
if spec is None: logger.debug(f"No spec for '{module_name}'. Assuming non-standard."); return False
|
|
origin = spec.origin; logger.debug(f"'{module_name}': origin='{origin}'")
|
|
if origin == 'built-in' or origin == 'frozen': logger.debug(f"'{module_name}' is '{origin}'."); return True
|
|
if is_path_in_standard_library(origin): logger.debug(f"'{module_name}' origin '{origin}' IS stdlib path."); return True
|
|
logger.debug(f"'{module_name}' origin '{origin}' NOT stdlib path."); return False
|
|
|
|
MODULE_NAME_TO_PACKAGE_NAME_MAP: Dict[str, str] = { "PIL": "Pillow" }
|
|
FALSE_POSITIVE_EXTERNAL_MODULES: Set[str] = { "mpl_toolkits" }
|
|
|
|
class ImportExtractor(ast.NodeVisitor):
|
|
"""AST visitor extracts (module_name, relative_file_path_string) tuples."""
|
|
def __init__(self, file_path_str: str): super().__init__(); self.file_path_str = file_path_str; self.imported_modules: Set[Tuple[str, str]] = set()
|
|
def visit_Import(self, node: ast.Import):
|
|
for alias in node.names:
|
|
module_name = alias.name.split('.')[0];
|
|
if module_name: self.imported_modules.add((module_name, self.file_path_str))
|
|
self.generic_visit(node)
|
|
def visit_ImportFrom(self, node: ast.ImportFrom):
|
|
if node.module and node.level == 0:
|
|
module_name = node.module.split('.')[0];
|
|
if module_name: self.imported_modules.add((module_name, self.file_path_str))
|
|
DependencyInfo = Dict[str, Dict[str, Union[Set[str], Optional[str], str]]]
|
|
|
|
def find_project_modules_and_dependencies(repo_path: Path) -> Tuple[DependencyInfo, DependencyInfo]:
|
|
"""Analyzes Python files, returns info on standard and external dependencies."""
|
|
all_imports_locations: Dict[str, Set[str]] = {}; project_modules: Set[str] = set()
|
|
logger.info(f"Starting analysis: Identifying project modules in '{repo_path}'...")
|
|
try: # Simple project module identification
|
|
for item in repo_path.rglob('*'):
|
|
if item.is_dir() and (item / '__init__.py').exists():
|
|
try:
|
|
rel_dir = item.relative_to(repo_path);
|
|
if rel_dir.parts: project_modules.add(rel_dir.parts[0])
|
|
except ValueError: pass
|
|
elif item.is_file() and item.suffix == '.py' and item.name != '__init__.py':
|
|
try:
|
|
rel_file = item.relative_to(repo_path)
|
|
if len(rel_file.parts) == 1: project_modules.add(item.stem)
|
|
elif len(rel_file.parts) > 1 and (item.parent / '__init__.py').exists(): project_modules.add(rel_file.parts[0])
|
|
except ValueError: pass
|
|
except Exception as e: logger.error(f"Error identifying project modules: {e}")
|
|
logger.debug(f"Potential project modules: {project_modules}")
|
|
|
|
logger.info(f"Analyzing Python files for imports...")
|
|
excluded_dirs = {'venv', '.venv', 'env', '.env', 'docs', 'tests', 'test', 'site-packages', 'dist-packages', '__pycache__', '.git', '.hg', '.svn', '.tox', '.nox', 'build', 'dist', '*.egg-info'}
|
|
file_count = 0
|
|
for root, dirs, files in os.walk(repo_path, topdown=True):
|
|
dirs[:] = [d for d in dirs if d not in excluded_dirs and not d.startswith('.')]
|
|
for file_name in files:
|
|
if file_name.endswith(".py"):
|
|
file_path = Path(root) / file_name; file_count += 1
|
|
try: rel_path_str = str(file_path.relative_to(repo_path))
|
|
except ValueError: rel_path_str = str(file_path); logger.warning(f"Path not relative: {file_path}")
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: source = f.read()
|
|
tree = ast.parse(source, filename=str(file_path))
|
|
extractor = ImportExtractor(file_path_str=rel_path_str); extractor.visit(tree)
|
|
for module, rel_path in extractor.imported_modules:
|
|
if module: all_imports_locations.setdefault(module, set()).add(rel_path)
|
|
except SyntaxError as e: logger.warning(f"Syntax error in '{rel_path_str}': {e}. Skipping.")
|
|
except Exception as e: logger.exception(f"Error processing file '{rel_path_str}': {e}")
|
|
logger.info(f"Analyzed {file_count} Python files. Found {len(all_imports_locations)} unique top-level imports.")
|
|
|
|
logger.info("Classifying imports and fetching versions...")
|
|
std_libs: DependencyInfo = {}; external_deps: DependencyInfo = {}
|
|
for imp_module, locs in all_imports_locations.items():
|
|
if imp_module in project_modules: logger.debug(f"Skipping project module: '{imp_module}'"); continue
|
|
if imp_module in FALSE_POSITIVE_EXTERNAL_MODULES: logger.info(f"Skipping known false positive: '{imp_module}'"); continue
|
|
if _is_standard_library(imp_module): logger.debug(f"'{imp_module}' is standard."); std_libs[imp_module] = {'locations': locs, 'version': None}
|
|
else:
|
|
pypi_name = MODULE_NAME_TO_PACKAGE_NAME_MAP.get(imp_module, imp_module)
|
|
orig_imp = imp_module if pypi_name != imp_module else None
|
|
logger.debug(f"'{imp_module}' (PyPI: '{pypi_name}') is external. Fetching version...")
|
|
version: Optional[str] = None
|
|
try: version = importlib.metadata.version(pypi_name)
|
|
except: logger.warning(f"Version for '{pypi_name}' not found.")
|
|
dep_data = external_deps.setdefault(pypi_name, {'locations': set(), 'version': version, 'original_import_name': None})
|
|
dep_data['locations'].update(locs); # type: ignore
|
|
if orig_imp and dep_data.get('original_import_name') is None: dep_data['original_import_name'] = orig_imp
|
|
if dep_data.get('version') is None and version is not None: dep_data['version'] = version
|
|
logger.info(f"Classification complete: {len(std_libs)} stdlib used, {len(external_deps)} unique external dependencies.")
|
|
return std_libs, external_deps
|
|
|
|
# --- find_main_script DEFINITION ---
|
|
def find_main_script(repo_path: Path) -> Optional[Path]:
|
|
"""Finds the main script (__main__.py) in standard locations."""
|
|
repo_name = repo_path.name; script_subfolder_name = repo_name.lower()
|
|
path_subdir = repo_path / script_subfolder_name / "__main__.py"
|
|
path_root = repo_path / "__main__.py"
|
|
logger.info(f"Looking for main script at: '{path_subdir}' or '{path_root}'")
|
|
if path_subdir.is_file(): logger.info(f"Main script found: {path_subdir}"); return path_subdir
|
|
elif path_root.is_file(): logger.info(f"Main script found: {path_root}"); return path_root
|
|
else: logger.warning(f"Main script (__main__.py) not found in standard locations."); return None
|
|
|
|
def generate_requirements_file(repo_path: Path, external_deps_info: DependencyInfo, std_lib_deps_info: DependencyInfo) -> Path:
|
|
"""Generates requirements.txt with detailed comments."""
|
|
req_file_path = repo_path / "requirements.txt"; py_ver = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
|
logger.info(f"Generating '{req_file_path}'...")
|
|
try:
|
|
with open(req_file_path, 'w', encoding='utf-8') as f:
|
|
f.write(f"# Requirements generated by DependencyAnalyzer for {repo_path.name}\n"); f.write(f"# Python Version (analysis env): {py_ver}\n\n")
|
|
f.write(f"# --- Standard Library Modules Used (part of Python {py_ver}) ---\n")
|
|
if not std_lib_deps_info: f.write("# (No stdlib imports detected)\n")
|
|
else:
|
|
for name in sorted(std_lib_deps_info.keys()):
|
|
locs=std_lib_deps_info[name].get('locations',set()); loc_list=sorted(list(locs)) if isinstance(locs,set) else []
|
|
loc_str=f"Used in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else ""
|
|
f.write(f"# {name} ({loc_str})\n".replace(" ()",""))
|
|
f.write("\n# --- External Dependencies (for pip install) ---\n")
|
|
if not external_deps_info: f.write("# (No external dependencies detected)\n")
|
|
else:
|
|
for pypi_name in sorted(external_deps_info.keys()):
|
|
info=external_deps_info[pypi_name]; locs=info.get('locations',set()); ver=info.get('version'); orig_imp=info.get('original_import_name')
|
|
loc_list=sorted(list(locs)) if isinstance(locs,set) else []
|
|
imp_as=f"(imported as '{orig_imp}') " if orig_imp else ""
|
|
loc_str=f"# Found {imp_as}in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else "# Location info unavailable"
|
|
ver_str=f"# Detected version: {ver}" if ver else "# Version not detected in analysis env"
|
|
f.write(f"{loc_str}\n{ver_str}\n{pypi_name}\n\n")
|
|
logger.info(f"Successfully generated '{req_file_path}'.")
|
|
except IOError as e: logger.exception(f"Error writing requirements.txt: {e}"); raise
|
|
return req_file_path
|
|
|
|
def _copy_requirements_to_packages_dir(repo_path: Path, packages_dir: Path) -> bool:
|
|
"""Copies requirements.txt from repo root to packages directory."""
|
|
source_req = repo_path / "requirements.txt"; dest_req = packages_dir / "requirements.txt"
|
|
if not source_req.is_file(): logger.warning(f"Source '{source_req}' not found, cannot copy."); return False
|
|
try: shutil.copy2(source_req, dest_req); logger.info(f"Copied '{source_req.name}' to '{packages_dir}'."); return True
|
|
except Exception as e: logger.exception(f"Failed to copy '{source_req.name}': {e}"); return False
|
|
|
|
def download_packages(repo_path: Path, requirements_file_path_in_repo: Path) -> Tuple[bool, str]:
|
|
"""Downloads packages one-by-one based on requirements file."""
|
|
packages_dir = repo_path / "_req_packages"; packages_dir.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Starting package download to '{packages_dir}'...")
|
|
if not requirements_file_path_in_repo.is_file(): msg = f"Source requirements file '{requirements_file_path_in_repo}' not found."; logger.error(msg); return False, msg
|
|
_copy_requirements_to_packages_dir(repo_path, packages_dir)
|
|
|
|
packages_to_download: List[str] = []
|
|
try:
|
|
with open(requirements_file_path_in_repo, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip();
|
|
if line and not line.startswith('#') and not line.startswith('-'): packages_to_download.append(line)
|
|
except Exception as e: msg = f"Error reading requirements file '{requirements_file_path_in_repo}': {e}"; logger.exception(msg); return False, msg
|
|
if not packages_to_download: msg = f"No package specifications found in '{requirements_file_path_in_repo}'."; logger.info(msg); return True, msg
|
|
|
|
logger.info(f"Found {len(packages_to_download)} specifications. Downloading individually...")
|
|
overall_success = True; failed = []; succeeded = []
|
|
for pkg_spec in packages_to_download:
|
|
logger.info(f"--- Downloading: {pkg_spec} ---")
|
|
cmd = [sys.executable, "-m", "pip", "download", pkg_spec, "-d", str(packages_dir.resolve()), "--no-cache-dir", "--disable-pip-version-check"]
|
|
logger.debug(f"Running: {' '.join(cmd)}")
|
|
proc = None
|
|
try:
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0)
|
|
stdout, stderr = proc.communicate(timeout=300)
|
|
if proc.returncode == 0:
|
|
logger.info(f"OK: '{pkg_spec}' downloaded (or already present).")
|
|
succeeded.append(pkg_spec)
|
|
if stderr and stderr.strip(): logger.warning(f"Pip stderr for '{pkg_spec}':\n{stderr.strip()}")
|
|
if stdout and stdout.strip(): logger.debug(f"Pip stdout for '{pkg_spec}':\n{stdout.strip()[:300]}...")
|
|
else: raise Exception(f"Pip error (code {proc.returncode}):\nStderr: {stderr.strip()}\nStdout: {stdout.strip()}") # Include stdout in error
|
|
except Exception as e:
|
|
error_msg = f"Failed to download '{pkg_spec}': {str(e)}" # Use str(e) for cleaner log message
|
|
logger.error(error_msg) # Log specific error
|
|
# Log traceback for unexpected errors, not for timeouts or known subprocess errors unless needed
|
|
if not isinstance(e, (subprocess.TimeoutExpired, subprocess.CalledProcessError)) and not "Pip error" in str(e):
|
|
logger.exception("Traceback for unexpected download error:")
|
|
if isinstance(e, subprocess.TimeoutExpired) and proc: proc.kill(); proc.wait()
|
|
overall_success = False; failed.append(pkg_spec)
|
|
# logger.info(f"--- Finished: {pkg_spec} ---") # Can be a bit verbose, log summary instead
|
|
|
|
summary = f"Download finished. Attempted: {len(packages_to_download)}, Succeeded: {len(succeeded)}, Failed: {len(failed)}."
|
|
if failed: summary += f" Failures: {', '.join(failed)}"
|
|
logger.log(logging.ERROR if not overall_success else logging.INFO, summary)
|
|
return overall_success, summary
|
|
|
|
def create_install_scripts(repo_path: Path, requirements_file_path_in_repo: Path) -> Tuple[bool, str]:
|
|
"""Creates install scripts with pip check and relative paths."""
|
|
packages_dir = repo_path / "_req_packages"; packages_dir.mkdir(parents=True, exist_ok=True)
|
|
if not requirements_file_path_in_repo.is_file(): return False, f"Source requirements file '{requirements_file_path_in_repo}' not found."
|
|
if not _copy_requirements_to_packages_dir(repo_path, packages_dir): return False, "Failed to copy requirements.txt."
|
|
|
|
req_file = "requirements.txt" # Relative name
|
|
bat = f"""@echo off & cls
|
|
echo ------------------------------------
|
|
echo Package Installer for {repo_path.name}
|
|
echo ------------------------------------
|
|
echo.
|
|
echo Checking for pip...
|
|
python -m pip --version >nul 2>&1 & if errorlevel 1 ( python3 -m pip --version >nul 2>&1 & if errorlevel 1 ( goto :pip_error ) )
|
|
echo pip found. & goto :install
|
|
:pip_error
|
|
echo ERROR: Python pip module not found.
|
|
echo Please install pip for your Python environment. See:
|
|
echo https://pip.pypa.io/en/stable/installation/
|
|
echo. & pause & exit /b 1
|
|
:install
|
|
echo Installing packages from local folder using '{req_file}'...
|
|
echo Source: %~dp0
|
|
echo.
|
|
python -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file}" --disable-pip-version-check || ( python3 -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file}" --disable-pip-version-check || ( goto :install_error ) )
|
|
echo. & echo --------------------------
|
|
echo Installation Successful!
|
|
echo --------------------------
|
|
echo. & pause & exit /b 0
|
|
:install_error
|
|
echo. & echo --------------------------
|
|
echo ERROR: Installation Failed.
|
|
echo --------------------------
|
|
echo Please check the messages above for details. & pause & exit /b 1
|
|
"""
|
|
sh = f"""#!/bin/bash
|
|
echo "------------------------------------"
|
|
echo "Package Installer for {repo_path.name}"
|
|
echo "------------------------------------"
|
|
echo ""
|
|
echo "Checking for pip..."
|
|
PYTHON_CMD=""
|
|
if command -v python3 &>/dev/null && python3 -m pip --version &>/dev/null; then PYTHON_CMD="python3"
|
|
elif command -v python &>/dev/null && python -m pip --version &>/dev/null; then PYTHON_CMD="python"
|
|
else echo "ERROR: Python pip module not found for python3 or python."; echo "Install pip: https://pip.pypa.io/en/stable/installation/"; exit 1; fi
|
|
echo "pip found ($PYTHON_CMD)."
|
|
|
|
SCRIPT_DIR="$( cd "$( dirname "${{BASH_SOURCE[0]}}" )" &> /dev/null && pwd )"
|
|
REQ_FILE="$SCRIPT_DIR/{req_file}"
|
|
echo "Installing packages from '$SCRIPT_DIR' using '$REQ_FILE'..."
|
|
echo ""
|
|
"$PYTHON_CMD" -m pip install --no-index --find-links "$SCRIPT_DIR" -r "$REQ_FILE" --disable-pip-version-check
|
|
INSTALL_STATUS=$?
|
|
echo ""
|
|
if [ $INSTALL_STATUS -ne 0 ]; then
|
|
echo "--------------------------"; echo "ERROR: Installation Failed."; echo "--------------------------"; echo "Check messages above."; exit 1;
|
|
else
|
|
echo "--------------------------"; echo "Installation Successful!"; echo "--------------------------"; exit 0;
|
|
fi
|
|
"""
|
|
try:
|
|
bat_path = packages_dir / "install_packages.bat"; sh_path = packages_dir / "install_packages.sh"
|
|
with open(bat_path, 'w', encoding='utf-8', newline='\r\n') as f: f.write(bat); logger.info(f"Created {bat_path}")
|
|
with open(sh_path, 'w', encoding='utf-8', newline='\n') as f: f.write(sh); os.chmod(sh_path, 0o755); logger.info(f"Created {sh_path} (executable)")
|
|
return True, f"Installation scripts created in '{packages_dir}'."
|
|
except IOError as e: logger.exception(f"Error creating install scripts: {e}"); return False, f"Error creating scripts: {e}"
|
|
|
|
def get_installed_packages() -> Dict[str, str]:
|
|
"""Gets installed packages, normalizing names to lowercase and hyphens."""
|
|
installed = {}; cmd = [sys.executable, "-m", "pip", "list", "--format=json", "--disable-pip-version-check"]
|
|
logger.debug(f"Getting installed packages using: {' '.join(cmd)}")
|
|
try:
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8', errors='ignore', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0)
|
|
pkgs = json.loads(proc.stdout)
|
|
for pkg in pkgs: installed[pkg['name'].lower().replace('_', '-')] = pkg['version']
|
|
logger.info(f"Found {len(installed)} installed packages.")
|
|
except FileNotFoundError: logger.error(f"Command not found: {cmd[0]}. Is Python/pip in PATH?")
|
|
except subprocess.CalledProcessError as e: logger.error(f"pip list command failed (code {e.returncode}): {e.stderr}")
|
|
except json.JSONDecodeError as e: logger.error(f"Failed to parse pip list JSON output: {e}")
|
|
except Exception as e: logger.exception(f"Unexpected error getting installed packages: {e}")
|
|
return installed
|
|
|
|
try: from packaging.requirements import Requirement; from packaging.version import parse as parse_version, InvalidVersion; PACKAGING_AVAILABLE = True; logger.debug("Using 'packaging' library.")
|
|
except ImportError: PACKAGING_AVAILABLE = False; Requirement=None; parse_version=None; InvalidVersion=None; logger.warning("'packaging' not found, version checks limited.")
|
|
|
|
def compare_requirements_with_installed( requirements_file: Path, installed_packages: Dict[str, str]) -> List[Dict[str, str]]:
|
|
"""Compares requirements file (PyPI names) against installed packages."""
|
|
results = []
|
|
if not requirements_file.is_file(): logger.warning(f"Compare failed: '{requirements_file}' not found."); return results
|
|
logger.info(f"Comparing '{requirements_file.name}' with installed packages...")
|
|
line_num = 0
|
|
with open(requirements_file, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line_num += 1
|
|
spec = line.strip()
|
|
if not spec or spec.startswith('#') or spec.startswith('-'): continue
|
|
|
|
pypi_spec = spec; pkg_name = pypi_spec; req_str = "any"; req_obj = None
|
|
if PACKAGING_AVAILABLE:
|
|
try: req_obj = Requirement(pypi_spec); pkg_name = req_obj.name; req_str = str(req_obj.specifier) if req_obj.specifier else "any"
|
|
except Exception as e:
|
|
logger.warning(f"L{line_num}: Failed to parse '{pypi_spec}' with packaging: {e}.");
|
|
match = re.match(r"([a-zA-Z0-9._-]+)", pypi_spec);
|
|
if match: pkg_name = match.group(1)
|
|
else: # Basic fallback
|
|
m_spec = re.match(r"([a-zA-Z0-9._-]+)\s*([<>=!~]=?)\s*([0-9a-zA-Z.]+)", pypi_spec); \
|
|
m_name = re.match(r"([a-zA-Z0-9._-]+)", pypi_spec)
|
|
if m_spec: pkg_name, req_str = m_spec.group(1), m_spec.group(2)+m_spec.group(3)
|
|
elif m_name: pkg_name = m_name.group(1)
|
|
else: logger.warning(f"L{line_num}: Could not extract package name from '{pypi_spec}'. Skipping comparison."); continue # Skip if name cannot be extracted
|
|
|
|
norm_name = pkg_name.lower().replace('_', '-')
|
|
inst_ver = installed_packages.get(norm_name)
|
|
status = ""; item = {"package": pkg_name, "required": req_str, "installed": inst_ver or "Not installed"}
|
|
if inst_ver:
|
|
if PACKAGING_AVAILABLE and req_obj and req_obj.specifier:
|
|
try:
|
|
if parse_version(inst_ver) in req_obj.specifier: status = "OK"
|
|
else: status = "Mismatch"
|
|
except InvalidVersion: logger.warning(f"Package '{pkg_name}': Installed version '{inst_ver}' is invalid."); status = "Invalid Installed Version"
|
|
except Exception as e: logger.error(f"Error comparing version for '{pkg_name}': {e}"); status = "Compare Error"
|
|
elif req_str != "any" and "==" in req_str: # Basic == check if no packaging
|
|
expected_ver = req_str.split('==', 1)[1].strip(); status = "OK" if inst_ver == expected_ver else "Mismatch"
|
|
else: status = "Installed" # Version check skipped or not required by spec
|
|
else: status = "Not installed"
|
|
item["status"] = status; results.append(item)
|
|
|
|
logger.info(f"Comparison finished: {len(results)} entries.")
|
|
return results
|
|
|
|
def update_system_packages(packages_to_update: List[str], repo_path: Path) -> Tuple[bool, str]:
|
|
"""Updates packages using pip install --upgrade -r temp_file."""
|
|
if not packages_to_update: return True, "No packages specified for update."
|
|
req_file = repo_path / "requirements.txt"
|
|
if not req_file.is_file(): return False, f"Cannot update: '{req_file}' not found."
|
|
|
|
temp_req_lines: List[str] = []
|
|
req_map: Dict[str, str] = {} # normalized_name -> pip_spec_line
|
|
try:
|
|
with open(req_file, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
spec = line.strip()
|
|
if not spec or spec.startswith('#') or spec.startswith('-'): continue
|
|
name = spec # Default
|
|
if PACKAGING_AVAILABLE:
|
|
try: req = Requirement(spec); name = req.name
|
|
except:
|
|
match = re.match(r"([a-zA-Z0-9._-]+)", spec);
|
|
if match: name = match.group(1)
|
|
else:
|
|
match = re.match(r"([a-zA-Z0-9._-]+)", spec);
|
|
if match: name = match.group(1)
|
|
if name: req_map[name.lower().replace('_', '-')] = spec
|
|
except Exception as e: return False, f"Error reading '{req_file}': {e}"
|
|
|
|
for pkg_update_name in packages_to_update:
|
|
norm_name = pkg_update_name.lower().replace('_', '-')
|
|
if norm_name in req_map: temp_req_lines.append(req_map[norm_name]) # Use spec from file
|
|
else: logger.warning(f"'{pkg_update_name}' not in requirements, upgrading by name."); temp_req_lines.append(pkg_update_name)
|
|
|
|
if not temp_req_lines: return True, "No matching packages found in requirements to update."
|
|
|
|
temp_req_path = repo_path / "_temp_update_reqs.txt"
|
|
cmd = [sys.executable, "-m", "pip", "install", "--upgrade", "-r", str(temp_req_path.resolve()), "--disable-pip-version-check"]
|
|
proc = None
|
|
try:
|
|
with open(temp_req_path, 'w', encoding='utf-8') as f: f.write("\n".join(temp_req_lines) + "\n")
|
|
logger.info(f"Attempting to update {len(temp_req_lines)} package(s) via: {' '.join(cmd)}")
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', errors='ignore', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0)
|
|
stdout, stderr = proc.communicate(timeout=600)
|
|
if proc.returncode == 0:
|
|
msg = f"Update completed for: {', '.join(packages_to_update)}.";
|
|
logger.info(msg);
|
|
if stderr and stderr.strip():
|
|
logger.warning(f"Update stderr:\n{stderr.strip()}");
|
|
return True, msg
|
|
else: raise Exception(f"Pip error (code {proc.returncode}):\nStderr: {stderr.strip()}\nStdout: {stdout.strip()}") # Include stdout
|
|
except Exception as e:
|
|
error_msg = f"Update failed: {str(e)}"
|
|
logger.error(error_msg)
|
|
if isinstance(e, subprocess.TimeoutExpired) and proc: proc.kill(); proc.wait()
|
|
return False, error_msg
|
|
finally:
|
|
if temp_req_path.exists():
|
|
try: os.remove(temp_req_path); logger.debug(f"Removed temp file: {temp_req_path}")
|
|
except OSError as e: logger.warning(f"Could not remove temp file '{temp_req_path}': {e}") |