# dependencyanalyzer/core.py import ast import os import shutil # For copying files import subprocess import sys import json import logging import importlib.util from pathlib import Path import sysconfig import re from typing import List, Dict, Set, Tuple, Optional, Union, Any, Callable # --- Logger Configuration --- logger = logging.getLogger(__name__) # Use __name__ for logger hierarchy # --- Standard Library Detection Logic --- if sys.version_info >= (3, 10): try: STANDARD_LIBRARY_MODULES: frozenset[str] = sys.stdlib_module_names logger.debug( "Using sys.stdlib_module_names for standard library module list (Python 3.10+)." ) except AttributeError: logger.warning( "sys.stdlib_module_names not found despite Python 3.10+. Using predefined list." ) STANDARD_LIBRARY_MODULES = frozenset() # Will be populated below else: logger.debug( "Using a predefined list for standard library modules (Python < 3.10)." ) # Predefined list (assign directly here) _PREDEFINED_STDLIBS = { "abc", "aifc", "argparse", "array", "ast", "asynchat", "asyncio", "asyncore", "atexit", "audioop", "base64", "bdb", "binascii", "binhex", "bisect", "builtins", "bz2", "calendar", "cgi", "cgitb", "chunk", "cmath", "cmd", "code", "codecs", "codeop", "collections", "colorsys", "compileall", "concurrent", "configparser", "contextlib", "contextvars", "copy", "copyreg", "cProfile", "crypt", "csv", "ctypes", "curses", "dataclasses", "datetime", "dbm", "decimal", "difflib", "dis", "distutils", "doctest", "email", "encodings", "ensurepip", "enum", "errno", "faulthandler", "fcntl", "filecmp", "fileinput", "fnmatch", "formatter", "fractions", "ftplib", "functools", "gc", "getopt", "getpass", "gettext", "glob", "graphlib", "grp", "gzip", "hashlib", "heapq", "hmac", "html", "http", "idlelib", "imaplib", "imghdr", "imp", "importlib", "inspect", "io", "ipaddress", "itertools", "json", "keyword", "lib2to3", "linecache", "locale", "logging", "lzma", "mailbox", "mailcap", "marshal", "math", "mimetypes", "mmap", "modulefinder", "multiprocessing", "netrc", "nis", "nntplib", "numbers", "operator", "optparse", "os", "ossaudiodev", "parser", "pathlib", "pdb", "pickle", "pickletools", "pipes", "pkgutil", "platform", "plistlib", "poplib", "posix", "pprint", "profile", "pstats", "pty", "pwd", "py_compile", "pyclbr", "pydoc", "pydoc_data", "pyexpat", "queue", "quopri", "random", "re", "readline", "reprlib", "resource", "rlcompleter", "runpy", "sched", "secrets", "select", "selectors", "shelve", "shlex", "shutil", "signal", "site", "smtpd", "smtplib", "sndhdr", "socket", "socketserver", "spwd", "sqlite3", "ssl", "stat", "statistics", "string", "stringprep", "struct", "subprocess", "sunau", "symtable", "sys", "sysconfig", "syslog", "tabnanny", "tarfile", "telnetlib", "tempfile", "termios", "textwrap", "threading", "time", "timeit", "tkinter", "token", "tokenize", "trace", "traceback", "tracemalloc", "tty", "turtle", "turtledemo", "types", "typing", "unicodedata", "unittest", "urllib", "uu", "uuid", "venv", "warnings", "wave", "weakref", "webbrowser", "wsgiref", "xdrlib", "xml", "xmlrpc", "zipapp", "zipfile", "zipimport", "zlib", "_thread", "_collections_abc", "_json", "_datetime", "_weakrefset", "_strptime", "_socket", "_ssl", "_struct", "_queue", "_pickle", "_lsprof", "_heapq", "_hashlib", "_csv", "_bz2", "_codecs", "_bisect", "_blake2", "_asyncio", "_ast", "_abc", } STANDARD_LIBRARY_MODULES = frozenset(_PREDEFINED_STDLIBS) # Final check/assignment if Python 3.10+ but stdlib_module_names failed if sys.version_info >= (3, 10) and ( not isinstance(STANDARD_LIBRARY_MODULES, frozenset) or not STANDARD_LIBRARY_MODULES ): logger.error( "Could not determine standard library modules for Python 3.10+. Stdlib check may be unreliable." ) # Assign empty or predefined as last resort? Let's stick with empty to signal failure. STANDARD_LIBRARY_MODULES = frozenset() _CACHED_STD_LIB_PATHS: Optional[Set[str]] = None def get_standard_library_paths() -> Set[str]: """Retrieves and caches normalized paths for the Python standard library.""" global _CACHED_STD_LIB_PATHS if _CACHED_STD_LIB_PATHS is not None: return _CACHED_STD_LIB_PATHS paths = set() logger.debug("Determining standard library paths...") try: for path_name in ("stdlib", "platstdlib"): try: path_val = sysconfig.get_path(path_name) if path_val and os.path.isdir(path_val): paths.add(os.path.normpath(path_val)) logger.debug(f"Found stdlib path ({path_name}): {path_val}") except Exception as e: logger.warning(f"Could not get sysconfig path '{path_name}': {e}") prefix_lib_path = os.path.normpath( os.path.join( sys.prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}", ) ) if os.path.isdir(prefix_lib_path): paths.add(prefix_lib_path) logger.debug(f"Found stdlib path (prefix): {prefix_lib_path}") if sys.platform == "win32": dlls_path = os.path.join(sys.prefix, "DLLs") if os.path.isdir(dlls_path): paths.add(os.path.normpath(dlls_path)) logger.debug(f"Found stdlib path (DLLs): {dlls_path}") else: dynload_path = os.path.join( sys.exec_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}", "lib-dynload", ) if os.path.isdir(dynload_path): paths.add(os.path.normpath(dynload_path)) logger.debug(f"Found stdlib path (dynload): {dynload_path}") fw_prefix = sysconfig.get_config_var("PYTHONFRAMEWORKPREFIX") if fw_prefix and isinstance(fw_prefix, str) and os.path.isdir(fw_prefix): fw_path = os.path.normpath( os.path.join( fw_prefix, "lib", f"python{sys.version_info.major}.{sys.version_info.minor}", ) ) if os.path.isdir(fw_path): paths.add(fw_path) logger.debug(f"Found stdlib path (Framework): {fw_path}") if not paths: logger.warning( "Sysconfig paths failed, attempting fallback using 'os' and 'sysconfig' module locations." ) try: paths.add(os.path.normpath(os.path.dirname(os.__file__))) except Exception: logger.error("Could not determine path for 'os' module.") try: paths.add(os.path.normpath(os.path.dirname(sysconfig.__file__))) except Exception: logger.error("Could not determine path for 'sysconfig' module.") except Exception as e: logger.exception(f"Unexpected error determining standard library paths: {e}") _CACHED_STD_LIB_PATHS = {p for p in paths if p} if not _CACHED_STD_LIB_PATHS: logger.error("Failed to determine ANY standard library paths.") else: logger.debug(f"Final cached standard library paths: {_CACHED_STD_LIB_PATHS}") return _CACHED_STD_LIB_PATHS def is_path_in_standard_library(file_path_str: Optional[str]) -> bool: """Checks if a file path string is within any known standard library directory.""" if not file_path_str: return False std_lib_paths = get_standard_library_paths() if not std_lib_paths: return False try: norm_file_path = os.path.normpath(os.path.abspath(file_path_str)) path_parts = Path(norm_file_path).parts if "site-packages" in path_parts or "dist-packages" in path_parts: return False for std_path in std_lib_paths: norm_std_path = os.path.normpath(os.path.abspath(std_path)) if norm_file_path.startswith(norm_std_path + os.sep): return True except Exception as e: logger.warning(f"Error during path comparison for '{file_path_str}': {e}") return False return False def _is_standard_library(module_name: str) -> bool: """Checks if a module name belongs to the standard library.""" if module_name in STANDARD_LIBRARY_MODULES: logger.debug(f"'{module_name}' in predefined list.") return True try: spec = importlib.util.find_spec(module_name) except: logger.debug(f"Spec finding failed for '{module_name}'. Assuming non-standard.") return False if spec is None: logger.debug(f"No spec for '{module_name}'. Assuming non-standard.") return False origin = spec.origin logger.debug(f"'{module_name}': origin='{origin}'") if origin == "built-in" or origin == "frozen": logger.debug(f"'{module_name}' is '{origin}'.") return True if is_path_in_standard_library(origin): logger.debug(f"'{module_name}' origin '{origin}' IS stdlib path.") return True logger.debug(f"'{module_name}' origin '{origin}' NOT stdlib path.") return False MODULE_NAME_TO_PACKAGE_NAME_MAP: Dict[str, str] = { "PIL": "Pillow", "cv2": "opencv-python", } FALSE_POSITIVE_EXTERNAL_MODULES: Set[str] = {"mpl_toolkits"} class ImportExtractor(ast.NodeVisitor): """AST visitor extracts (module_name, relative_file_path_string) tuples.""" def __init__(self, file_path_str: str): super().__init__() self.file_path_str = file_path_str self.imported_modules: Set[Tuple[str, str]] = set() def visit_Import(self, node: ast.Import): for alias in node.names: module_name = alias.name.split(".")[0] if module_name: self.imported_modules.add((module_name, self.file_path_str)) self.generic_visit(node) def visit_ImportFrom(self, node: ast.ImportFrom): if node.module and node.level == 0: module_name = node.module.split(".")[0] if module_name: self.imported_modules.add((module_name, self.file_path_str)) DependencyInfo = Dict[str, Dict[str, Union[Set[str], Optional[str], str]]] def find_project_modules_and_dependencies( repo_path: Path, # Original user-selected path scan_path: Path, # Path where the actual scanning begins ) -> Tuple[DependencyInfo, DependencyInfo]: """ Analyzes Python files starting from scan_path, identifies project modules relative to scan_path, and finds standard/external dependencies. Explicitly ignores imports matching the name of the scan_path directory if scan_path is different from repo_path (assuming it's the main package). Args: repo_path (Path): The root path selected by the user. scan_path (Path): The directory to actually scan for source code. Returns: Tuple[DependencyInfo, DependencyInfo]: std_lib_info, external_deps_info """ all_imports_locations: Dict[str, Set[str]] = {} # project_modules: Set[str] = set() # Identifying project modules is complex, let's rely on scan_path name # --- NUOVA LOGICA: Identifica il nome del pacchetto principale (se si scansiona sottocartella) --- main_project_package_name: Optional[str] = None if repo_path != scan_path and scan_path.name == repo_path.name.lower(): main_project_package_name = scan_path.name logger.info( f"Assuming '{main_project_package_name}' is the main project package being scanned." ) # --- FINE NUOVA LOGICA --- # Identify potential project modules *within* scan_path can still be useful # but let's simplify the primary check based on main_project_package_name first. # logger.info(f"Analysis target: Identifying project modules within '{scan_path}'...") # ... (previous logic for project_modules identification removed for simplification, # could be added back if needed for more complex internal structures) logger.info(f"Analyzing Python files for imports starting from '{scan_path}'...") excluded_dirs = { "venv", ".venv", "env", ".env", "docs", "tests", "test", "site-packages", "dist-packages", "__pycache__", ".git", ".hg", ".svn", ".tox", ".nox", "build", "dist", "*.egg-info", } file_count = 0 for root, dirs, files in os.walk(scan_path, topdown=True): dirs[:] = [d for d in dirs if d not in excluded_dirs and not d.startswith(".")] current_root_path = Path(root) for file_name in files: if file_name.endswith(".py"): file_path_obj = current_root_path / file_name file_count += 1 try: report_rel_path_str = str(file_path_obj.relative_to(repo_path)) except ValueError: report_rel_path_str = str(file_path_obj) logger.warning(f"Path not relative: {file_path_obj}") logger.debug(f"Parsing: {report_rel_path_str}") try: with open( file_path_obj, "r", encoding="utf-8", errors="ignore" ) as f: source = f.read() tree = ast.parse(source, filename=str(file_path_obj)) extractor = ImportExtractor(file_path_str=report_rel_path_str) extractor.visit(tree) for module, report_rel_path in extractor.imported_modules: if module: all_imports_locations.setdefault(module, set()).add( report_rel_path ) except SyntaxError as e: logger.warning( f"Syntax error in '{report_rel_path_str}': {e}. Skipping." ) except Exception as e: logger.exception( f"Error processing file '{report_rel_path_str}': {e}" ) logger.info( f"Analyzed {file_count} Python files. Found {len(all_imports_locations)} unique top-level imports." ) logger.info("Classifying imports and fetching versions...") std_libs: DependencyInfo = {} external_deps: DependencyInfo = {} for imp_module, locs in all_imports_locations.items(): # --- MODIFIED CHECK: Ignore if it matches the main project package name --- if main_project_package_name and imp_module == main_project_package_name: logger.info( f"Skipping '{imp_module}' as it matches the main project package name being scanned." ) continue # --- FINE MODIFIED CHECK --- # Remove the check against the complex 'project_modules' set for now # if imp_module in project_modules: logger.debug(f"Skipping project module: '{imp_module}'"); continue if imp_module in FALSE_POSITIVE_EXTERNAL_MODULES: logger.info(f"Skipping known false positive: '{imp_module}'") continue if _is_standard_library(imp_module): logger.debug(f"'{imp_module}' is standard library.") std_libs[imp_module] = {"locations": locs, "version": None} else: # External dependency processing (mapping, version check) pypi_name = MODULE_NAME_TO_PACKAGE_NAME_MAP.get(imp_module, imp_module) orig_imp = imp_module if pypi_name != imp_module else None logger.debug( f"'{imp_module}' (PyPI: '{pypi_name}') is external. Fetching version..." ) version: Optional[str] = None try: version = importlib.metadata.version(pypi_name) except: logger.warning(f"Version for '{pypi_name}' not found.") dep_data = external_deps.setdefault( pypi_name, {"locations": set(), "version": version, "original_import_name": None}, ) dep_data["locations"].update(locs) # type: ignore if orig_imp and dep_data.get("original_import_name") is None: dep_data["original_import_name"] = orig_imp if dep_data.get("version") is None and version is not None: dep_data["version"] = version logger.info( f"Classification complete: {len(std_libs)} stdlib used, {len(external_deps)} unique external dependencies." ) return std_libs, external_deps # --- find_main_script DEFINITION --- def find_main_script(repo_path: Path) -> Optional[Path]: """Finds the main script (__main__.py) in standard locations.""" repo_name = repo_path.name script_subfolder_name = repo_name.lower() path_subdir = repo_path / script_subfolder_name / "__main__.py" path_root = repo_path / "__main__.py" logger.info(f"Looking for main script at: '{path_subdir}' or '{path_root}'") if path_subdir.is_file(): logger.info(f"Main script found: {path_subdir}") return path_subdir elif path_root.is_file(): logger.info(f"Main script found: {path_root}") return path_root else: logger.warning(f"Main script (__main__.py) not found in standard locations.") return None def generate_requirements_file( repo_path: Path, external_deps_info: DependencyInfo, std_lib_deps_info: DependencyInfo, ) -> Path: """Generates requirements.txt with detailed comments.""" req_file_path = repo_path / "requirements.txt" py_ver = ( f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" ) logger.info(f"Generating '{req_file_path}'...") try: with open(req_file_path, "w", encoding="utf-8") as f: f.write( f"# Requirements generated by DependencyAnalyzer for {repo_path.name}\n" ) f.write(f"# Python Version (analysis env): {py_ver}\n\n") f.write( f"# --- Standard Library Modules Used (part of Python {py_ver}) ---\n" ) if not std_lib_deps_info: f.write("# (No stdlib imports detected)\n") else: for name in sorted(std_lib_deps_info.keys()): locs = std_lib_deps_info[name].get("locations", set()) loc_list = sorted(list(locs)) if isinstance(locs, set) else [] loc_str = ( f"Used in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else "" ) f.write(f"# {name} ({loc_str})\n".replace(" ()", "")) f.write("\n# --- External Dependencies (for pip install) ---\n") if not external_deps_info: f.write("# (No external dependencies detected)\n") else: for pypi_name in sorted(external_deps_info.keys()): info = external_deps_info[pypi_name] locs = info.get("locations", set()) ver = info.get("version") orig_imp = info.get("original_import_name") loc_list = sorted(list(locs)) if isinstance(locs, set) else [] imp_as = f"(imported as '{orig_imp}') " if orig_imp else "" loc_str = ( f"# Found {imp_as}in: {', '.join(loc_list[:3])}{', ...' if len(loc_list) > 3 else ''}" if loc_list else "# Location info unavailable" ) ver_str = ( f"# Detected version: {ver}" if ver else "# Version not detected in analysis env" ) f.write(f"{loc_str}\n{ver_str}\n{pypi_name}\n\n") logger.info(f"Successfully generated '{req_file_path}'.") except IOError as e: logger.exception(f"Error writing requirements.txt: {e}") raise return req_file_path def _copy_requirements_to_packages_dir(repo_path: Path, packages_dir: Path) -> bool: """Copies requirements.txt from repo root to packages directory.""" source_req = repo_path / "requirements.txt" dest_req = packages_dir / "requirements.txt" if not source_req.is_file(): logger.warning(f"Source '{source_req}' not found, cannot copy.") return False try: shutil.copy2(source_req, dest_req) logger.info(f"Copied '{source_req.name}' to '{packages_dir}'.") return True except Exception as e: logger.exception(f"Failed to copy '{source_req.name}': {e}") return False def download_packages( repo_path: Path, requirements_file_path_in_repo: Path ) -> Tuple[bool, str]: """Downloads packages one-by-one based on requirements file.""" packages_dir = repo_path / "_req_packages" packages_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Starting package download to '{packages_dir}'...") if not requirements_file_path_in_repo.is_file(): msg = f"Source requirements file '{requirements_file_path_in_repo}' not found." logger.error(msg) return False, msg _copy_requirements_to_packages_dir(repo_path, packages_dir) packages_to_download: List[str] = [] try: with open(requirements_file_path_in_repo, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#") and not line.startswith("-"): packages_to_download.append(line) except Exception as e: msg = f"Error reading requirements file '{requirements_file_path_in_repo}': {e}" logger.exception(msg) return False, msg if not packages_to_download: msg = f"No package specifications found in '{requirements_file_path_in_repo}'." logger.info(msg) return True, msg logger.info( f"Found {len(packages_to_download)} specifications. Downloading individually..." ) overall_success = True failed = [] succeeded = [] for pkg_spec in packages_to_download: logger.info(f"--- Downloading: {pkg_spec} ---") cmd = [ sys.executable, "-m", "pip", "download", pkg_spec, "-d", str(packages_dir.resolve()), "--no-cache-dir", "--disable-pip-version-check", ] logger.debug(f"Running: {' '.join(cmd)}") proc = None try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", creationflags=( subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 ), ) stdout, stderr = proc.communicate(timeout=300) if proc.returncode == 0: logger.info(f"OK: '{pkg_spec}' downloaded (or already present).") succeeded.append(pkg_spec) if stderr and stderr.strip(): logger.warning(f"Pip stderr for '{pkg_spec}':\n{stderr.strip()}") if stdout and stdout.strip(): logger.debug( f"Pip stdout for '{pkg_spec}':\n{stdout.strip()[:300]}..." ) else: raise Exception( f"Pip error (code {proc.returncode}):\nStderr: {stderr.strip()}\nStdout: {stdout.strip()}" ) # Include stdout in error except Exception as e: error_msg = f"Failed to download '{pkg_spec}': {str(e)}" # Use str(e) for cleaner log message logger.error(error_msg) # Log specific error # Log traceback for unexpected errors, not for timeouts or known subprocess errors unless needed if not isinstance( e, (subprocess.TimeoutExpired, subprocess.CalledProcessError) ) and not "Pip error" in str(e): logger.exception("Traceback for unexpected download error:") if isinstance(e, subprocess.TimeoutExpired) and proc: proc.kill() proc.wait() overall_success = False failed.append(pkg_spec) # logger.info(f"--- Finished: {pkg_spec} ---") # Can be a bit verbose, log summary instead summary = f"Download finished. Attempted: {len(packages_to_download)}, Succeeded: {len(succeeded)}, Failed: {len(failed)}." if failed: summary += f" Failures: {', '.join(failed)}" logger.log(logging.ERROR if not overall_success else logging.INFO, summary) return overall_success, summary def create_install_scripts( repo_path: Path, requirements_file_path_in_repo: Path ) -> Tuple[bool, str]: """Creates install scripts with pip check and relative paths.""" packages_dir = repo_path / "_req_packages" packages_dir.mkdir(parents=True, exist_ok=True) if not requirements_file_path_in_repo.is_file(): return ( False, f"Source requirements file '{requirements_file_path_in_repo}' not found.", ) if not _copy_requirements_to_packages_dir(repo_path, packages_dir): return False, "Failed to copy requirements.txt." req_file = "requirements.txt" # Relative name bat = f"""@echo off & cls echo ------------------------------------ echo Package Installer for {repo_path.name} echo ------------------------------------ echo. echo Checking for pip... python -m pip --version >nul 2>&1 & if errorlevel 1 ( python3 -m pip --version >nul 2>&1 & if errorlevel 1 ( goto :pip_error ) ) echo pip found. & goto :install :pip_error echo ERROR: Python pip module not found. echo Please install pip for your Python environment. See: echo https://pip.pypa.io/en/stable/installation/ echo. & pause & exit /b 1 :install echo Installing packages from local folder using '{req_file}'... echo Source: %~dp0 echo. python -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file}" --disable-pip-version-check || ( python3 -m pip install --no-index --find-links "%~dp0" -r "%~dp0{req_file}" --disable-pip-version-check || ( goto :install_error ) ) echo. & echo -------------------------- echo Installation Successful! echo -------------------------- echo. & pause & exit /b 0 :install_error echo. & echo -------------------------- echo ERROR: Installation Failed. echo -------------------------- echo Please check the messages above for details. & pause & exit /b 1 """ sh = f"""#!/bin/bash echo "------------------------------------" echo "Package Installer for {repo_path.name}" echo "------------------------------------" echo "" echo "Checking for pip..." PYTHON_CMD="" if command -v python3 &>/dev/null && python3 -m pip --version &>/dev/null; then PYTHON_CMD="python3" elif command -v python &>/dev/null && python -m pip --version &>/dev/null; then PYTHON_CMD="python" else echo "ERROR: Python pip module not found for python3 or python."; echo "Install pip: https://pip.pypa.io/en/stable/installation/"; exit 1; fi echo "pip found ($PYTHON_CMD)." SCRIPT_DIR="$( cd "$( dirname "${{BASH_SOURCE[0]}}" )" &> /dev/null && pwd )" REQ_FILE="$SCRIPT_DIR/{req_file}" echo "Installing packages from '$SCRIPT_DIR' using '$REQ_FILE'..." echo "" "$PYTHON_CMD" -m pip install --no-index --find-links "$SCRIPT_DIR" -r "$REQ_FILE" --disable-pip-version-check INSTALL_STATUS=$? echo "" if [ $INSTALL_STATUS -ne 0 ]; then echo "--------------------------"; echo "ERROR: Installation Failed."; echo "--------------------------"; echo "Check messages above."; exit 1; else echo "--------------------------"; echo "Installation Successful!"; echo "--------------------------"; exit 0; fi """ try: bat_path = packages_dir / "install_packages.bat" sh_path = packages_dir / "install_packages.sh" with open(bat_path, "w", encoding="utf-8", newline="\r\n") as f: f.write(bat) logger.info(f"Created {bat_path}") with open(sh_path, "w", encoding="utf-8", newline="\n") as f: f.write(sh) os.chmod(sh_path, 0o755) logger.info(f"Created {sh_path} (executable)") return True, f"Installation scripts created in '{packages_dir}'." except IOError as e: logger.exception(f"Error creating install scripts: {e}") return False, f"Error creating scripts: {e}" def get_installed_packages() -> Dict[str, str]: """Gets installed packages, normalizing names to lowercase and hyphens.""" installed = {} cmd = [ sys.executable, "-m", "pip", "list", "--format=json", "--disable-pip-version-check", ] logger.debug(f"Getting installed packages using: {' '.join(cmd)}") try: proc = subprocess.run( cmd, capture_output=True, text=True, check=True, encoding="utf-8", errors="ignore", creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, ) pkgs = json.loads(proc.stdout) for pkg in pkgs: installed[pkg["name"].lower().replace("_", "-")] = pkg["version"] logger.info(f"Found {len(installed)} installed packages.") except FileNotFoundError: logger.error(f"Command not found: {cmd[0]}. Is Python/pip in PATH?") except subprocess.CalledProcessError as e: logger.error(f"pip list command failed (code {e.returncode}): {e.stderr}") except json.JSONDecodeError as e: logger.error(f"Failed to parse pip list JSON output: {e}") except Exception as e: logger.exception(f"Unexpected error getting installed packages: {e}") return installed try: from packaging.requirements import Requirement from packaging.version import parse as parse_version, InvalidVersion PACKAGING_AVAILABLE = True logger.debug("Using 'packaging' library.") except ImportError: PACKAGING_AVAILABLE = False Requirement = None parse_version = None InvalidVersion = None logger.warning("'packaging' not found, version checks limited.") def compare_requirements_with_installed( requirements_file: Path, installed_packages: Dict[str, str] ) -> List[Dict[str, str]]: """Compares requirements file (PyPI names) against installed packages.""" results = [] if not requirements_file.is_file(): logger.warning(f"Compare failed: '{requirements_file}' not found.") return results logger.info(f"Comparing '{requirements_file.name}' with installed packages...") line_num = 0 with open(requirements_file, "r", encoding="utf-8") as f: for line in f: line_num += 1 spec = line.strip() if not spec or spec.startswith("#") or spec.startswith("-"): continue pypi_spec = spec pkg_name = pypi_spec req_str = "any" req_obj = None if PACKAGING_AVAILABLE: try: req_obj = Requirement(pypi_spec) pkg_name = req_obj.name req_str = str(req_obj.specifier) if req_obj.specifier else "any" except Exception as e: logger.warning( f"L{line_num}: Failed to parse '{pypi_spec}' with packaging: {e}." ) match = re.match(r"([a-zA-Z0-9._-]+)", pypi_spec) if match: pkg_name = match.group(1) else: # Basic fallback m_spec = re.match( r"([a-zA-Z0-9._-]+)\s*([<>=!~]=?)\s*([0-9a-zA-Z.]+)", pypi_spec ) m_name = re.match(r"([a-zA-Z0-9._-]+)", pypi_spec) if m_spec: pkg_name, req_str = m_spec.group(1), m_spec.group(2) + m_spec.group( 3 ) elif m_name: pkg_name = m_name.group(1) else: logger.warning( f"L{line_num}: Could not extract package name from '{pypi_spec}'. Skipping comparison." ) continue # Skip if name cannot be extracted norm_name = pkg_name.lower().replace("_", "-") inst_ver = installed_packages.get(norm_name) status = "" item = { "package": pkg_name, "required": req_str, "installed": inst_ver or "Not installed", } if inst_ver: if PACKAGING_AVAILABLE and req_obj and req_obj.specifier: try: if parse_version(inst_ver) in req_obj.specifier: status = "OK" else: status = "Mismatch" except InvalidVersion: logger.warning( f"Package '{pkg_name}': Installed version '{inst_ver}' is invalid." ) status = "Invalid Installed Version" except Exception as e: logger.error(f"Error comparing version for '{pkg_name}': {e}") status = "Compare Error" elif ( req_str != "any" and "==" in req_str ): # Basic == check if no packaging expected_ver = req_str.split("==", 1)[1].strip() status = "OK" if inst_ver == expected_ver else "Mismatch" else: status = ( "Installed" # Version check skipped or not required by spec ) else: status = "Not installed" item["status"] = status results.append(item) logger.info(f"Comparison finished: {len(results)} entries.") return results def update_system_packages( packages_to_update: List[str], repo_path: Path ) -> Tuple[bool, str]: """Updates packages using pip install --upgrade -r temp_file.""" if not packages_to_update: return True, "No packages specified for update." req_file = repo_path / "requirements.txt" if not req_file.is_file(): return False, f"Cannot update: '{req_file}' not found." temp_req_lines: List[str] = [] req_map: Dict[str, str] = {} # normalized_name -> pip_spec_line try: with open(req_file, "r", encoding="utf-8") as f: for line in f: spec = line.strip() if not spec or spec.startswith("#") or spec.startswith("-"): continue name = spec # Default if PACKAGING_AVAILABLE: try: req = Requirement(spec) name = req.name except: match = re.match(r"([a-zA-Z0-9._-]+)", spec) if match: name = match.group(1) else: match = re.match(r"([a-zA-Z0-9._-]+)", spec) if match: name = match.group(1) if name: req_map[name.lower().replace("_", "-")] = spec except Exception as e: return False, f"Error reading '{req_file}': {e}" for pkg_update_name in packages_to_update: norm_name = pkg_update_name.lower().replace("_", "-") if norm_name in req_map: temp_req_lines.append(req_map[norm_name]) # Use spec from file else: logger.warning( f"'{pkg_update_name}' not in requirements, upgrading by name." ) temp_req_lines.append(pkg_update_name) if not temp_req_lines: return True, "No matching packages found in requirements to update." temp_req_path = repo_path / "_temp_update_reqs.txt" cmd = [ sys.executable, "-m", "pip", "install", "--upgrade", "-r", str(temp_req_path.resolve()), "--disable-pip-version-check", ] proc = None try: with open(temp_req_path, "w", encoding="utf-8") as f: f.write("\n".join(temp_req_lines) + "\n") logger.info( f"Attempting to update {len(temp_req_lines)} package(s) via: {' '.join(cmd)}" ) proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="ignore", creationflags=subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0, ) stdout, stderr = proc.communicate(timeout=600) if proc.returncode == 0: msg = f"Update completed for: {', '.join(packages_to_update)}." logger.info(msg) if stderr and stderr.strip(): logger.warning(f"Update stderr:\n{stderr.strip()}") return True, msg else: raise Exception( f"Pip error (code {proc.returncode}):\nStderr: {stderr.strip()}\nStdout: {stdout.strip()}" ) # Include stdout except Exception as e: error_msg = f"Update failed: {str(e)}" logger.error(error_msg) if isinstance(e, subprocess.TimeoutExpired) and proc: proc.kill() proc.wait() return False, error_msg finally: if temp_req_path.exists(): try: os.remove(temp_req_path) logger.debug(f"Removed temp file: {temp_req_path}") except OSError as e: logger.warning(f"Could not remove temp file '{temp_req_path}': {e}")