SXXXXXXX_DependencyAnalyzer/dependencyanalyzer/core.py
2025-05-06 14:37:18 +02:00

522 lines
33 KiB
Python

# dependencyanalyzer/core.py
import ast
import os
import shutil # For copying files
import subprocess
import sys
import json
import logging
import importlib.util
from pathlib import Path
import sysconfig
import re
from typing import List, Dict, Set, Tuple, Optional, Union, Any, Callable
# --- Logger Configuration ---
logger = logging.getLogger(__name__) # Use __name__ for logger hierarchy
# --- Standard Library Detection Logic ---
if sys.version_info >= (3, 10):
try:
STANDARD_LIBRARY_MODULES: frozenset[str] = sys.stdlib_module_names
logger.debug("Using sys.stdlib_module_names for standard library module list (Python 3.10+).")
except AttributeError:
logger.warning("sys.stdlib_module_names not found despite Python 3.10+. Using predefined list.")
STANDARD_LIBRARY_MODULES = frozenset() # Will be populated below
else:
logger.debug("Using a predefined list for standard library modules (Python < 3.10).")
# Predefined list (assign directly here)
_PREDEFINED_STDLIBS = {
'abc', 'aifc', 'argparse', 'array', 'ast', 'asynchat', 'asyncio', 'asyncore',
'atexit', 'audioop', 'base64', 'bdb', 'binascii', 'binhex', 'bisect', 'builtins',
'bz2', 'calendar', 'cgi', 'cgitb', 'chunk', 'cmath', 'cmd', 'code', 'codecs',
'codeop', 'collections', 'colorsys', 'compileall', 'concurrent', 'configparser',
'contextlib', 'contextvars', 'copy', 'copyreg', 'cProfile', 'crypt', 'csv',
'ctypes', 'curses', 'dataclasses', 'datetime', 'dbm', 'decimal', 'difflib',
'dis', 'distutils', 'doctest', 'email', 'encodings', 'ensurepip', 'enum', 'errno',
'faulthandler', 'fcntl', 'filecmp', 'fileinput', 'fnmatch', 'formatter',
'fractions', 'ftplib', 'functools', 'gc', 'getopt', 'getpass', 'gettext',
'glob', 'graphlib', 'grp', 'gzip', 'hashlib', 'heapq', 'hmac', 'html', 'http',
'idlelib', 'imaplib', 'imghdr', 'imp', 'importlib', 'inspect', 'io',
'ipaddress', 'itertools', 'json', 'keyword', 'lib2to3', 'linecache', 'locale',
'logging', 'lzma', 'mailbox', 'mailcap', 'marshal', 'math', 'mimetypes', 'mmap',
'modulefinder', 'multiprocessing', 'netrc', 'nis', 'nntplib', 'numbers',
'operator', 'optparse', 'os', 'ossaudiodev', 'parser', 'pathlib', 'pdb',
'pickle', 'pickletools', 'pipes', 'pkgutil', 'platform', 'plistlib', 'poplib',
'posix', 'pprint', 'profile', 'pstats', 'pty', 'pwd', 'py_compile', 'pyclbr',
'pydoc', 'pydoc_data', 'pyexpat', 'queue', 'quopri', 'random', 're',
'readline', 'reprlib', 'resource', 'rlcompleter', 'runpy', 'sched', 'secrets',
'select', 'selectors', 'shelve', 'shlex', 'shutil', 'signal', 'site',
'smtpd', 'smtplib', 'sndhdr', 'socket', 'socketserver', 'spwd', 'sqlite3',
'ssl', 'stat', 'statistics', 'string', 'stringprep', 'struct', 'subprocess',
'sunau', 'symtable', 'sys', 'sysconfig', 'syslog', 'tabnanny', 'tarfile',
'telnetlib', 'tempfile', 'termios', 'textwrap', 'threading', 'time', 'timeit',
'tkinter', 'token', 'tokenize', 'trace', 'traceback', 'tracemalloc', 'tty',
'turtle', 'turtledemo', 'types', 'typing', 'unicodedata', 'unittest', 'urllib',
'uu', 'uuid', 'venv', 'warnings', 'wave', 'weakref', 'webbrowser', 'wsgiref',
'xdrlib', 'xml', 'xmlrpc', 'zipapp', 'zipfile', 'zipimport', 'zlib',
'_thread', '_collections_abc', '_json', '_datetime', '_weakrefset', '_strptime',
'_socket', '_ssl', '_struct', '_queue', '_pickle', '_lsprof', '_heapq', '_hashlib',
'_csv', '_bz2', '_codecs', '_bisect', '_blake2', '_asyncio', '_ast', '_abc',
}
STANDARD_LIBRARY_MODULES = frozenset(_PREDEFINED_STDLIBS)
# Final check/assignment if Python 3.10+ but stdlib_module_names failed
if sys.version_info >= (3, 10) and (not isinstance(STANDARD_LIBRARY_MODULES, frozenset) or not STANDARD_LIBRARY_MODULES):
logger.error("Could not determine standard library modules for Python 3.10+. Stdlib check may be unreliable.")
# Assign empty or predefined as last resort? Let's stick with empty to signal failure.
STANDARD_LIBRARY_MODULES = frozenset()
_CACHED_STD_LIB_PATHS: Optional[Set[str]] = None
def get_standard_library_paths() -> Set[str]:
"""Retrieves and caches normalized paths for the Python standard library."""
global _CACHED_STD_LIB_PATHS
if _CACHED_STD_LIB_PATHS is not None: return _CACHED_STD_LIB_PATHS
paths = set(); logger.debug("Determining standard library paths...")
try:
for path_name in ('stdlib', 'platstdlib'):
try:
path_val = sysconfig.get_path(path_name);
if path_val and os.path.isdir(path_val): paths.add(os.path.normpath(path_val)); logger.debug(f"Found stdlib path ({path_name}): {path_val}")
except Exception as e: logger.warning(f"Could not get sysconfig path '{path_name}': {e}")
prefix_lib_path = os.path.normpath(os.path.join(sys.prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}"))
if os.path.isdir(prefix_lib_path): paths.add(prefix_lib_path); logger.debug(f"Found stdlib path (prefix): {prefix_lib_path}")
if sys.platform == "win32":
dlls_path = os.path.join(sys.prefix, "DLLs");
if os.path.isdir(dlls_path): paths.add(os.path.normpath(dlls_path)); logger.debug(f"Found stdlib path (DLLs): {dlls_path}")
else:
dynload_path = os.path.join(sys.exec_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}", "lib-dynload");
if os.path.isdir(dynload_path): paths.add(os.path.normpath(dynload_path)); logger.debug(f"Found stdlib path (dynload): {dynload_path}")
fw_prefix = sysconfig.get_config_var('PYTHONFRAMEWORKPREFIX');
if fw_prefix and isinstance(fw_prefix, str) and os.path.isdir(fw_prefix):
fw_path = os.path.normpath(os.path.join(fw_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}"))
if os.path.isdir(fw_path): paths.add(fw_path); logger.debug(f"Found stdlib path (Framework): {fw_path}")
if not paths:
logger.warning("Sysconfig paths failed, attempting fallback using 'os' and 'sysconfig' module locations.")
try: paths.add(os.path.normpath(os.path.dirname(os.__file__)))
except Exception: logger.error("Could not determine path for 'os' module.")
try: paths.add(os.path.normpath(os.path.dirname(sysconfig.__file__)))
except Exception: logger.error("Could not determine path for 'sysconfig' module.")
except Exception as e: logger.exception(f"Unexpected error determining standard library paths: {e}")
_CACHED_STD_LIB_PATHS = {p for p in paths if p};
if not _CACHED_STD_LIB_PATHS: logger.error("Failed to determine ANY standard library paths.")
else: logger.debug(f"Final cached standard library paths: {_CACHED_STD_LIB_PATHS}")
return _CACHED_STD_LIB_PATHS
def is_path_in_standard_library(file_path_str: Optional[str]) -> bool:
"""Checks if a file path string is within any known standard library directory."""
if not file_path_str: return False
std_lib_paths = get_standard_library_paths();
if not std_lib_paths: return False
try:
norm_file_path = os.path.normpath(os.path.abspath(file_path_str))
path_parts = Path(norm_file_path).parts
if 'site-packages' in path_parts or 'dist-packages' in path_parts: return False
for std_path in std_lib_paths:
norm_std_path = os.path.normpath(os.path.abspath(std_path))
if norm_file_path.startswith(norm_std_path + os.sep): return True
except Exception as e: logger.warning(f"Error during path comparison for '{file_path_str}': {e}"); return False
return False
def _is_standard_library(module_name: str) -> bool:
"""Checks if a module name belongs to the standard library."""
if module_name in STANDARD_LIBRARY_MODULES: logger.debug(f"'{module_name}' in predefined list."); return True
try: spec = importlib.util.find_spec(module_name)
except: logger.debug(f"Spec finding failed for '{module_name}'. Assuming non-standard."); return False
if spec is None: logger.debug(f"No spec for '{module_name}'. Assuming non-standard."); return False
origin = spec.origin; logger.debug(f"'{module_name}': origin='{origin}'")
if origin == 'built-in' or origin == 'frozen': logger.debug(f"'{module_name}' is '{origin}'."); return True
if is_path_in_standard_library(origin): logger.debug(f"'{module_name}' origin '{origin}' IS stdlib path."); return True
logger.debug(f"'{module_name}' origin '{origin}' NOT stdlib path."); return False
MODULE_NAME_TO_PACKAGE_NAME_MAP: Dict[str, str] = { "PIL": "Pillow" }
FALSE_POSITIVE_EXTERNAL_MODULES: Set[str] = { "mpl_toolkits" }
class ImportExtractor(ast.NodeVisitor):
"""AST visitor extracts (module_name, relative_file_path_string) tuples."""
def __init__(self, file_path_str: str): super().__init__(); self.file_path_str = file_path_str; self.imported_modules: Set[Tuple[str, str]] = set()
def visit_Import(self, node: ast.Import):
for alias in node.names:
module_name = alias.name.split('.')[0];
if module_name: self.imported_modules.add((module_name, self.file_path_str))
self.generic_visit(node)
def visit_ImportFrom(self, node: ast.ImportFrom):
if node.module and node.level == 0:
module_name = node.module.split('.')[0];
if module_name: self.imported_modules.add((module_name, self.file_path_str))
DependencyInfo = Dict[str, Dict[str, Union[Set[str], Optional[str], str]]]
def find_project_modules_and_dependencies(
repo_path: Path, # Original user-selected path
scan_path: Path # Path where the actual scanning begins
) -> Tuple[DependencyInfo, DependencyInfo]:
"""
Analyzes Python files starting from scan_path, identifies project modules
relative to scan_path, and finds standard/external dependencies.
Explicitly ignores imports matching the name of the scan_path directory if
scan_path is different from repo_path (assuming it's the main package).
Args:
repo_path (Path): The root path selected by the user.
scan_path (Path): The directory to actually scan for source code.
Returns:
Tuple[DependencyInfo, DependencyInfo]: std_lib_info, external_deps_info
"""
all_imports_locations: Dict[str, Set[str]] = {}
# project_modules: Set[str] = set() # Identifying project modules is complex, let's rely on scan_path name
# --- NUOVA LOGICA: Identifica il nome del pacchetto principale (se si scansiona sottocartella) ---
main_project_package_name: Optional[str] = None
if repo_path != scan_path and scan_path.name == repo_path.name.lower():
main_project_package_name = scan_path.name
logger.info(f"Assuming '{main_project_package_name}' is the main project package being scanned.")
# --- FINE NUOVA LOGICA ---
# Identify potential project modules *within* scan_path can still be useful
# but let's simplify the primary check based on main_project_package_name first.
# logger.info(f"Analysis target: Identifying project modules within '{scan_path}'...")
# ... (previous logic for project_modules identification removed for simplification,
# could be added back if needed for more complex internal structures)
logger.info(f"Analyzing Python files for imports starting from '{scan_path}'...")
excluded_dirs = {'venv', '.venv', 'env', '.env', 'docs', 'tests', 'test', 'site-packages', 'dist-packages', '__pycache__', '.git', '.hg', '.svn', '.tox', '.nox', 'build', 'dist', '*.egg-info'}
file_count = 0
for root, dirs, files in os.walk(scan_path, topdown=True):
dirs[:] = [d for d in dirs if d not in excluded_dirs and not d.startswith('.')]
current_root_path = Path(root)
for file_name in files:
if file_name.endswith(".py"):
file_path_obj = current_root_path / file_name; file_count += 1
try: report_rel_path_str = str(file_path_obj.relative_to(repo_path))
except ValueError: report_rel_path_str = str(file_path_obj); logger.warning(f"Path not relative: {file_path_obj}")
logger.debug(f"Parsing: {report_rel_path_str}")
try:
with open(file_path_obj, 'r', encoding='utf-8', errors='ignore') as f: source = f.read()
tree = ast.parse(source, filename=str(file_path_obj))
extractor = ImportExtractor(file_path_str=report_rel_path_str)
extractor.visit(tree)
for module, report_rel_path in extractor.imported_modules:
if module: all_imports_locations.setdefault(module, set()).add(report_rel_path)
except SyntaxError as e: logger.warning(f"Syntax error in '{report_rel_path_str}': {e}. Skipping.")
except Exception as e: logger.exception(f"Error processing file '{report_rel_path_str}': {e}")
logger.info(f"Analyzed {file_count} Python files. Found {len(all_imports_locations)} unique top-level imports.")
logger.info("Classifying imports and fetching versions...")
std_libs: DependencyInfo = {}; external_deps: DependencyInfo = {}
for imp_module, locs in all_imports_locations.items():
# --- MODIFIED CHECK: Ignore if it matches the main project package name ---
if main_project_package_name and imp_module == main_project_package_name:
logger.info(f"Skipping '{imp_module}' as it matches the main project package name being scanned.")
continue
# --- FINE MODIFIED CHECK ---
# Remove the check against the complex 'project_modules' set for now
# if imp_module in project_modules: logger.debug(f"Skipping project module: '{imp_module}'"); continue
if imp_module in FALSE_POSITIVE_EXTERNAL_MODULES: logger.info(f"Skipping known false positive: '{imp_module}'"); continue
if _is_standard_library(imp_module):
logger.debug(f"'{imp_module}' is standard library.")
std_libs[imp_module] = {'locations': locs, 'version': None}
else:
# External dependency processing (mapping, version check)
pypi_name = MODULE_NAME_TO_PACKAGE_NAME_MAP.get(imp_module, imp_module)
orig_imp = imp_module if pypi_name != imp_module else None
logger.debug(f"'{imp_module}' (PyPI: '{pypi_name}') is external. Fetching version...")
version: Optional[str] = None
try: version = importlib.metadata.version(pypi_name)
except: logger.warning(f"Version for '{pypi_name}' not found.")
dep_data = external_deps.setdefault(pypi_name, {'locations': set(), 'version': version, 'original_import_name': None})
dep_data['locations'].update(locs); # type: ignore
if orig_imp and dep_data.get('original_import_name') is None: dep_data['original_import_name'] = orig_imp
if dep_data.get('version') is None and version is not None: dep_data['version'] = version
logger.info(f"Classification complete: {len(std_libs)} stdlib used, {len(external_deps)} unique external dependencies.")
return std_libs, external_deps
# --- find_main_script DEFINITION ---
def find_main_script(repo_path: Path) -> Optional[Path]:
"""Finds the main script (__main__.py) in standard locations."""
repo_name = repo_path.name; script_subfolder_name = repo_name.lower()
path_subdir = repo_path / script_subfolder_name / "__main__.py"
path_root = repo_path / "__main__.py"
logger.info(f"Looking for main script at: '{path_subdir}' or '{path_root}'")
if path_subdir.is_file(): logger.info(f"Main script found: {path_subdir}"); return path_subdir
elif path_root.is_file(): logger.info(f"Main script found: {path_root}"); return path_root
else: logger.warning(f"Main script (__main__.py) not found in standard locations."); return None
def generate_requirements_file(repo_path: Path, external_deps_info: DependencyInfo, std_lib_deps_info: DependencyInfo) -> Path:
"""Generates requirements.txt with detailed comments."""
req_file_path = repo_path / "requirements.txt"; py_ver = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
logger.info(f"Generating '{req_file_path}'...")
try:
with open(req_file_path, 'w', encoding='utf-8') as f:
f.write(f"# Requirements generated by DependencyAnalyzer for {repo_path.name}\n"); f.write(f"# Python Version (analysis env): {py_ver}\n\n")
f.write(f"# --- Standard Library Modules Used (part of Python {py_ver}) ---\n")
if not std_lib_deps_info: f.write("# (No stdlib imports detected)\n")
else:
for name in sorted(std_lib_deps_info.keys()):
locs=std_lib_deps_info[name].get('locations',set()); loc_list=sorted(list(locs)) if isinstance(locs,set) else []
loc_str=f"Used in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else ""
f.write(f"# {name} ({loc_str})\n".replace(" ()",""))
f.write("\n# --- External Dependencies (for pip install) ---\n")
if not external_deps_info: f.write("# (No external dependencies detected)\n")
else:
for pypi_name in sorted(external_deps_info.keys()):
info=external_deps_info[pypi_name]; locs=info.get('locations',set()); ver=info.get('version'); orig_imp=info.get('original_import_name')
loc_list=sorted(list(locs)) if isinstance(locs,set) else []
imp_as=f"(imported as '{orig_imp}') " if orig_imp else ""
loc_str=f"# Found {imp_as}in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else "# Location info unavailable"
ver_str=f"# Detected version: {ver}" if ver else "# Version not detected in analysis env"
f.write(f"{loc_str}\n{ver_str}\n{pypi_name}\n\n")
logger.info(f"Successfully generated '{req_file_path}'.")
except IOError as e: logger.exception(f"Error writing requirements.txt: {e}"); raise
return req_file_path
def _copy_requirements_to_packages_dir(repo_path: Path, packages_dir: Path) -> bool:
"""Copies requirements.txt from repo root to packages directory."""
source_req = repo_path / "requirements.txt"; dest_req = packages_dir / "requirements.txt"
if not source_req.is_file(): logger.warning(f"Source '{source_req}' not found, cannot copy."); return False
try: shutil.copy2(source_req, dest_req); logger.info(f"Copied '{source_req.name}' to '{packages_dir}'."); return True
except Exception as e: logger.exception(f"Failed to copy '{source_req.name}': {e}"); return False
def download_packages(repo_path: Path, requirements_file_path_in_repo: Path) -> Tuple[bool, str]:
"""Downloads packages one-by-one based on requirements file."""
packages_dir = repo_path / "_req_packages"; packages_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Starting package download to '{packages_dir}'...")
if not requirements_file_path_in_repo.is_file(): msg = f"Source requirements file '{requirements_file_path_in_repo}' not found."; logger.error(msg); return False, msg
_copy_requirements_to_packages_dir(repo_path, packages_dir)
packages_to_download: List[str] = []
try:
with open(requirements_file_path_in_repo, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip();
if line and not line.startswith('#') and not line.startswith('-'): packages_to_download.append(line)
except Exception as e: msg = f"Error reading requirements file '{requirements_file_path_in_repo}': {e}"; logger.exception(msg); return False, msg
if not packages_to_download: msg = f"No package specifications found in '{requirements_file_path_in_repo}'."; logger.info(msg); return True, msg
logger.info(f"Found {len(packages_to_download)} specifications. Downloading individually...")
overall_success = True; failed = []; succeeded = []
for pkg_spec in packages_to_download:
logger.info(f"--- Downloading: {pkg_spec} ---")
cmd = [sys.executable, "-m", "pip", "download", pkg_spec, "-d", str(packages_dir.resolve()), "--no-cache-dir", "--disable-pip-version-check"]
logger.debug(f"Running: {' '.join(cmd)}")
proc = None
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0)
stdout, stderr = proc.communicate(timeout=300)
if proc.returncode == 0:
logger.info(f"OK: '{pkg_spec}' downloaded (or already present).")
succeeded.append(pkg_spec)
if stderr and stderr.strip(): logger.warning(f"Pip stderr for '{pkg_spec}':\n{stderr.strip()}")
if stdout and stdout.strip(): logger.debug(f"Pip stdout for '{pkg_spec}':\n{stdout.strip()[:300]}...")
else: raise Exception(f"Pip error (code {proc.returncode}):\nStderr: {stderr.strip()}\nStdout: {stdout.strip()}") # Include stdout in error
except Exception as e:
error_msg = f"Failed to download '{pkg_spec}': {str(e)}" # Use str(e) for cleaner log message
logger.error(error_msg) # Log specific error
# Log traceback for unexpected errors, not for timeouts or known subprocess errors unless needed
if not isinstance(e, (subprocess.TimeoutExpired, subprocess.CalledProcessError)) and not "Pip error" in str(e):
logger.exception("Traceback for unexpected download error:")
if isinstance(e, subprocess.TimeoutExpired) and proc: proc.kill(); proc.wait()
overall_success = False; failed.append(pkg_spec)
# logger.info(f"--- Finished: {pkg_spec} ---") # Can be a bit verbose, log summary instead
summary = f"Download finished. Attempted: {len(packages_to_download)}, Succeeded: {len(succeeded)}, Failed: {len(failed)}."
if failed: summary += f" Failures: {', '.join(failed)}"
logger.log(logging.ERROR if not overall_success else logging.INFO, summary)
return overall_success, summary
def create_install_scripts(repo_path: Path, requirements_file_path_in_repo: Path) -> Tuple[bool, str]:
"""Creates install scripts with pip check and relative paths."""
packages_dir = repo_path / "_req_packages"; packages_dir.mkdir(parents=True, exist_ok=True)
if not requirements_file_path_in_repo.is_file(): return False, f"Source requirements file '{requirements_file_path_in_repo}' not found."
if not _copy_requirements_to_packages_dir(repo_path, packages_dir): return False, "Failed to copy requirements.txt."
req_file = "requirements.txt" # Relative name
bat = f"""@echo off & cls
echo ------------------------------------
echo Package Installer for {repo_path.name}
echo ------------------------------------
echo.
echo Checking for pip...
python -m pip --version >nul 2>&1 & if errorlevel 1 ( python3 -m pip --version >nul 2>&1 & if errorlevel 1 ( goto :pip_error ) )
echo pip found. & goto :install
:pip_error
echo ERROR: Python pip module not found.
echo Please install pip for your Python environment. See:
echo https://pip.pypa.io/en/stable/installation/
echo. & pause & exit /b 1
:install
echo Installing packages from local folder using '{req_file}'...
echo Source: %~dp0
echo.
python -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file}" --disable-pip-version-check || ( python3 -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file}" --disable-pip-version-check || ( goto :install_error ) )
echo. & echo --------------------------
echo Installation Successful!
echo --------------------------
echo. & pause & exit /b 0
:install_error
echo. & echo --------------------------
echo ERROR: Installation Failed.
echo --------------------------
echo Please check the messages above for details. & pause & exit /b 1
"""
sh = f"""#!/bin/bash
echo "------------------------------------"
echo "Package Installer for {repo_path.name}"
echo "------------------------------------"
echo ""
echo "Checking for pip..."
PYTHON_CMD=""
if command -v python3 &>/dev/null && python3 -m pip --version &>/dev/null; then PYTHON_CMD="python3"
elif command -v python &>/dev/null && python -m pip --version &>/dev/null; then PYTHON_CMD="python"
else echo "ERROR: Python pip module not found for python3 or python."; echo "Install pip: https://pip.pypa.io/en/stable/installation/"; exit 1; fi
echo "pip found ($PYTHON_CMD)."
SCRIPT_DIR="$( cd "$( dirname "${{BASH_SOURCE[0]}}" )" &> /dev/null && pwd )"
REQ_FILE="$SCRIPT_DIR/{req_file}"
echo "Installing packages from '$SCRIPT_DIR' using '$REQ_FILE'..."
echo ""
"$PYTHON_CMD" -m pip install --no-index --find-links "$SCRIPT_DIR" -r "$REQ_FILE" --disable-pip-version-check
INSTALL_STATUS=$?
echo ""
if [ $INSTALL_STATUS -ne 0 ]; then
echo "--------------------------"; echo "ERROR: Installation Failed."; echo "--------------------------"; echo "Check messages above."; exit 1;
else
echo "--------------------------"; echo "Installation Successful!"; echo "--------------------------"; exit 0;
fi
"""
try:
bat_path = packages_dir / "install_packages.bat"; sh_path = packages_dir / "install_packages.sh"
with open(bat_path, 'w', encoding='utf-8', newline='\r\n') as f: f.write(bat); logger.info(f"Created {bat_path}")
with open(sh_path, 'w', encoding='utf-8', newline='\n') as f: f.write(sh); os.chmod(sh_path, 0o755); logger.info(f"Created {sh_path} (executable)")
return True, f"Installation scripts created in '{packages_dir}'."
except IOError as e: logger.exception(f"Error creating install scripts: {e}"); return False, f"Error creating scripts: {e}"
def get_installed_packages() -> Dict[str, str]:
"""Gets installed packages, normalizing names to lowercase and hyphens."""
installed = {}; cmd = [sys.executable, "-m", "pip", "list", "--format=json", "--disable-pip-version-check"]
logger.debug(f"Getting installed packages using: {' '.join(cmd)}")
try:
proc = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8', errors='ignore', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0)
pkgs = json.loads(proc.stdout)
for pkg in pkgs: installed[pkg['name'].lower().replace('_', '-')] = pkg['version']
logger.info(f"Found {len(installed)} installed packages.")
except FileNotFoundError: logger.error(f"Command not found: {cmd[0]}. Is Python/pip in PATH?")
except subprocess.CalledProcessError as e: logger.error(f"pip list command failed (code {e.returncode}): {e.stderr}")
except json.JSONDecodeError as e: logger.error(f"Failed to parse pip list JSON output: {e}")
except Exception as e: logger.exception(f"Unexpected error getting installed packages: {e}")
return installed
try: from packaging.requirements import Requirement; from packaging.version import parse as parse_version, InvalidVersion; PACKAGING_AVAILABLE = True; logger.debug("Using 'packaging' library.")
except ImportError: PACKAGING_AVAILABLE = False; Requirement=None; parse_version=None; InvalidVersion=None; logger.warning("'packaging' not found, version checks limited.")
def compare_requirements_with_installed( requirements_file: Path, installed_packages: Dict[str, str]) -> List[Dict[str, str]]:
"""Compares requirements file (PyPI names) against installed packages."""
results = []
if not requirements_file.is_file(): logger.warning(f"Compare failed: '{requirements_file}' not found."); return results
logger.info(f"Comparing '{requirements_file.name}' with installed packages...")
line_num = 0
with open(requirements_file, 'r', encoding='utf-8') as f:
for line in f:
line_num += 1
spec = line.strip()
if not spec or spec.startswith('#') or spec.startswith('-'): continue
pypi_spec = spec; pkg_name = pypi_spec; req_str = "any"; req_obj = None
if PACKAGING_AVAILABLE:
try: req_obj = Requirement(pypi_spec); pkg_name = req_obj.name; req_str = str(req_obj.specifier) if req_obj.specifier else "any"
except Exception as e:
logger.warning(f"L{line_num}: Failed to parse '{pypi_spec}' with packaging: {e}.");
match = re.match(r"([a-zA-Z0-9._-]+)", pypi_spec);
if match: pkg_name = match.group(1)
else: # Basic fallback
m_spec = re.match(r"([a-zA-Z0-9._-]+)\s*([<>=!~]=?)\s*([0-9a-zA-Z.]+)", pypi_spec); \
m_name = re.match(r"([a-zA-Z0-9._-]+)", pypi_spec)
if m_spec: pkg_name, req_str = m_spec.group(1), m_spec.group(2)+m_spec.group(3)
elif m_name: pkg_name = m_name.group(1)
else: logger.warning(f"L{line_num}: Could not extract package name from '{pypi_spec}'. Skipping comparison."); continue # Skip if name cannot be extracted
norm_name = pkg_name.lower().replace('_', '-')
inst_ver = installed_packages.get(norm_name)
status = ""; item = {"package": pkg_name, "required": req_str, "installed": inst_ver or "Not installed"}
if inst_ver:
if PACKAGING_AVAILABLE and req_obj and req_obj.specifier:
try:
if parse_version(inst_ver) in req_obj.specifier: status = "OK"
else: status = "Mismatch"
except InvalidVersion: logger.warning(f"Package '{pkg_name}': Installed version '{inst_ver}' is invalid."); status = "Invalid Installed Version"
except Exception as e: logger.error(f"Error comparing version for '{pkg_name}': {e}"); status = "Compare Error"
elif req_str != "any" and "==" in req_str: # Basic == check if no packaging
expected_ver = req_str.split('==', 1)[1].strip(); status = "OK" if inst_ver == expected_ver else "Mismatch"
else: status = "Installed" # Version check skipped or not required by spec
else: status = "Not installed"
item["status"] = status; results.append(item)
logger.info(f"Comparison finished: {len(results)} entries.")
return results
def update_system_packages(packages_to_update: List[str], repo_path: Path) -> Tuple[bool, str]:
"""Updates packages using pip install --upgrade -r temp_file."""
if not packages_to_update: return True, "No packages specified for update."
req_file = repo_path / "requirements.txt"
if not req_file.is_file(): return False, f"Cannot update: '{req_file}' not found."
temp_req_lines: List[str] = []
req_map: Dict[str, str] = {} # normalized_name -> pip_spec_line
try:
with open(req_file, 'r', encoding='utf-8') as f:
for line in f:
spec = line.strip()
if not spec or spec.startswith('#') or spec.startswith('-'): continue
name = spec # Default
if PACKAGING_AVAILABLE:
try: req = Requirement(spec); name = req.name
except:
match = re.match(r"([a-zA-Z0-9._-]+)", spec);
if match: name = match.group(1)
else:
match = re.match(r"([a-zA-Z0-9._-]+)", spec);
if match: name = match.group(1)
if name: req_map[name.lower().replace('_', '-')] = spec
except Exception as e: return False, f"Error reading '{req_file}': {e}"
for pkg_update_name in packages_to_update:
norm_name = pkg_update_name.lower().replace('_', '-')
if norm_name in req_map: temp_req_lines.append(req_map[norm_name]) # Use spec from file
else: logger.warning(f"'{pkg_update_name}' not in requirements, upgrading by name."); temp_req_lines.append(pkg_update_name)
if not temp_req_lines: return True, "No matching packages found in requirements to update."
temp_req_path = repo_path / "_temp_update_reqs.txt"
cmd = [sys.executable, "-m", "pip", "install", "--upgrade", "-r", str(temp_req_path.resolve()), "--disable-pip-version-check"]
proc = None
try:
with open(temp_req_path, 'w', encoding='utf-8') as f: f.write("\n".join(temp_req_lines) + "\n")
logger.info(f"Attempting to update {len(temp_req_lines)} package(s) via: {' '.join(cmd)}")
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', errors='ignore', creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == 'win32' else 0)
stdout, stderr = proc.communicate(timeout=600)
if proc.returncode == 0:
msg = f"Update completed for: {', '.join(packages_to_update)}.";
logger.info(msg);
if stderr and stderr.strip():
logger.warning(f"Update stderr:\n{stderr.strip()}");
return True, msg
else: raise Exception(f"Pip error (code {proc.returncode}):\nStderr: {stderr.strip()}\nStdout: {stdout.strip()}") # Include stdout
except Exception as e:
error_msg = f"Update failed: {str(e)}"
logger.error(error_msg)
if isinstance(e, subprocess.TimeoutExpired) and proc: proc.kill(); proc.wait()
return False, error_msg
finally:
if temp_req_path.exists():
try: os.remove(temp_req_path); logger.debug(f"Removed temp file: {temp_req_path}")
except OSError as e: logger.warning(f"Could not remove temp file '{temp_req_path}': {e}")