diff --git a/source/qip/__init__.py b/source/qip/__init__.py index d4b6aba..c31c02f 100644 --- a/source/qip/__init__.py +++ b/source/qip/__init__.py @@ -3,8 +3,12 @@ import os import logging import sys +import time import tempfile import shutil +import copy +import multiprocessing +import multiprocessing.pool import six.moves import click @@ -17,6 +21,8 @@ from qip._version import __version__ +THREAD_COUNT = int(os.getenv("QIP_INSTALLER_THREAD_COUNT", 20)) + def install( requests, output_path, definition_path=None, overwrite=False, @@ -86,68 +92,139 @@ def install( definition_mapping = wiz.fetch_definition_mapping(registry_paths or []) # Setup temporary folder for package installation. - cache_path = tempfile.mkdtemp() - package_path = tempfile.mkdtemp() + temp_package_path = os.path.join(tempfile.gettempdir(), "qip-pkg") # Record requests and package installed to prevent duplications. installed_packages = set() installed_requests = set() + package_paths = set() + cache_paths = set() + # Record packages skipped. skipped_packages = set() # Fill up queue with requirements extracted from requests. queue = six.moves.queue.Queue() + active_count = multiprocessing.Value("i", 0) - try: - # Fetch environment mapping and installation path. - context_mapping = fetch_context_mapping(package_path, python_target) - library_path = context_mapping["environ"]["PYTHONPATH"] - - for request in requests: - queue.put((request, None, editable_mode)) - - while not queue.empty(): - request, parent_identifier, _editable_mode = queue.get() - if request in installed_requests: - continue - - # Clean up before installation. - shutil.rmtree(package_path) - wiz.filesystem.ensure_directory(package_path) - - # Needed for the editable mode. - wiz.filesystem.ensure_directory(library_path) - - package_mapping, overwrite = _install( - request, output_path, context_mapping, definition_mapping, - package_path, cache_path, installed_packages, - definition_path=definition_path, - overwrite=overwrite, - editable_mode=_editable_mode, - update_existing_definitions=update_existing_definitions, - parent_identifier=parent_identifier, - continue_on_error=continue_on_error - ) - if package_mapping is None: - continue + errors_found = set() - installed_packages.add(package_mapping["identifier"]) - installed_requests.add(request) + mutex = multiprocessing.Lock() + + def _worker_execute(first=False, _overwrite=False): + while not ( + queue.empty() and not active_count.value and not errors_found + ): + try: + # Keep alive if queue is empty but other installs are active. + if queue.empty(): + time.sleep(0.5) + continue + + with mutex: + _request, parent_identifier, _editable_mode = queue.get() + if _request in installed_requests: + continue + + # Add to installed requests to prevent duplicate install + installed_requests.add(_request) + + # Increase the active count + active_count.value += 1 + + # Setup temporary folder for package installation. + cache_path = tempfile.mkdtemp() + package_path = tempfile.mkdtemp() + + package_paths.add(package_path) + cache_paths.add(cache_path) + + # Fetch environment mapping and installation path. + _context_mapping = copy.deepcopy(context_mapping) + + # Replace the temp package path with the current package path + _context_mapping["environ"]["PYTHONPATH"] = ( + library_path.replace(temp_package_path, package_path) + ) + lib_path = _context_mapping["environ"]["PYTHONPATH"] + wiz.filesystem.ensure_directory(lib_path) + + # Needed for the editable mode. + wiz.filesystem.ensure_directory(lib_path) + + package_mapping, _overwrite = _install( + _request, output_path, _context_mapping, definition_mapping, + package_path, cache_path, installed_packages, + definition_path=definition_path, + overwrite=_overwrite, + editable_mode=_editable_mode, + update_existing_definitions=update_existing_definitions, + parent_identifier=parent_identifier, + continue_on_error=continue_on_error + ) + if package_mapping is None: + # Decrement the active count + with mutex: + active_count.value -= 1 + + continue + + installed_packages.add(package_mapping["identifier"]) + + # Indicate if package was skipped. + if package_mapping.get("skipped", False): + skipped_packages.add(package_mapping["identifier"]) + + # Fill up queue with requirements extracted from package + # dependencies. + if not no_dependencies: + for _request in package_mapping.get("requirements", []): + queue.put( + (_request, package_mapping["identifier"], False) + ) + + # Decrement the active count + with mutex: + active_count.value -= 1 + + # Break out of loop if only installing first in queue. + if first: + break + + except Exception as err: + # Collect exception to stop execution on main thread later. + errors_found.add(err) + raise + + return _overwrite + + context_mapping = fetch_context_mapping(temp_package_path, python_target) + library_path = context_mapping["environ"]["PYTHONPATH"] + + for request in requests: + queue.put((request, None, editable_mode)) + + # Install the initial requests and collect dependencies. + overwrite = _worker_execute(first=True, _overwrite=overwrite) + + # Use a thread pool to install remaining requests. + pool = multiprocessing.pool.ThreadPool( + THREAD_COUNT, _worker_execute, + (False, overwrite if overwrite is not None else False) + ) - # Indicate if package was skipped. - if package_mapping.get("skipped", False): - skipped_packages.add(package_mapping["identifier"]) + # Wait until all threads have finished. + pool.close() + pool.join() - # Fill up queue with requirements extracted from package - # dependencies. - if not no_dependencies: - for request in package_mapping.get("requirements", []): - queue.put((request, package_mapping["identifier"], False)) + # Remove package and cache paths. + for path in package_paths.union(cache_paths): + shutil.rmtree(path) - finally: - shutil.rmtree(package_path) - shutil.rmtree(cache_path) + # Raise a RuntimeError if any errors have been found. + if errors_found: + raise RuntimeError(list(errors_found)[0]) # Sort and filter packages installed. installed = sorted( diff --git a/source/qip/command.py b/source/qip/command.py index 863f451..93218bc 100644 --- a/source/qip/command.py +++ b/source/qip/command.py @@ -31,7 +31,8 @@ def execute(command, environ_mapping, quiet=False): shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=environ_mapping + env=environ_mapping, + close_fds=True ) if not quiet: