Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 124 additions & 47 deletions source/qip/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import os
import logging
import sys
import time
import tempfile
import shutil
import copy
import multiprocessing
import multiprocessing.pool

import six.moves
import click
Expand All @@ -17,6 +21,8 @@

from qip._version import __version__

THREAD_COUNT = int(os.getenv("QIP_INSTALLER_THREAD_COUNT", 20))


def install(
requests, output_path, definition_path=None, overwrite=False,
Expand Down Expand Up @@ -86,68 +92,139 @@ def install(
definition_mapping = wiz.fetch_definition_mapping(registry_paths or [])

# Setup temporary folder for package installation.
cache_path = tempfile.mkdtemp()
package_path = tempfile.mkdtemp()
temp_package_path = os.path.join(tempfile.gettempdir(), "qip-pkg")

# Record requests and package installed to prevent duplications.
installed_packages = set()
installed_requests = set()

package_paths = set()
cache_paths = set()

# Record packages skipped.
skipped_packages = set()

# Fill up queue with requirements extracted from requests.
queue = six.moves.queue.Queue()
active_count = multiprocessing.Value("i", 0)

try:
# Fetch environment mapping and installation path.
context_mapping = fetch_context_mapping(package_path, python_target)
library_path = context_mapping["environ"]["PYTHONPATH"]

for request in requests:
queue.put((request, None, editable_mode))

while not queue.empty():
request, parent_identifier, _editable_mode = queue.get()
if request in installed_requests:
continue

# Clean up before installation.
shutil.rmtree(package_path)
wiz.filesystem.ensure_directory(package_path)

# Needed for the editable mode.
wiz.filesystem.ensure_directory(library_path)

package_mapping, overwrite = _install(
request, output_path, context_mapping, definition_mapping,
package_path, cache_path, installed_packages,
definition_path=definition_path,
overwrite=overwrite,
editable_mode=_editable_mode,
update_existing_definitions=update_existing_definitions,
parent_identifier=parent_identifier,
continue_on_error=continue_on_error
)
if package_mapping is None:
continue
errors_found = set()

installed_packages.add(package_mapping["identifier"])
installed_requests.add(request)
mutex = multiprocessing.Lock()

def _worker_execute(first=False, _overwrite=False):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to get this function out of the install() function for better readability. Maybe it should be renamed to something like _install_in_thread() and take the queue as an argument? Also renaming _install() to _install_single() will help making sense of all of this. It would be something like:

def install(requests, output_path, ...):
    """Install packages to *output_path* from *requests*."""
    ...

def _install_in_thread(queue, ...):
    """Install package from requests and update queue."""
    ...

def _install_single(request, output_path, context_mapping, definition_mapping, ...):
    """Install single package to *output_path* from *request*."""
    ...

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Jeremy! I was hoping you would see my Teams message before seeing this pull request. This was more just a proof of concept rather than a legit implementation. I'm fine with any changes like this.

while not (
queue.empty() and not active_count.value and not errors_found
):
try:
# Keep alive if queue is empty but other installs are active.
if queue.empty():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that condition necessary? Why keeping one thread alive if other threads are active?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I needed a way to keep all threads alive and waiting even if there was only one install active. Once the initializer function exits then the thread is gone. Since a library's dependencies only get added to the queue after a pip install, this would cause all of the threads to exit because the queue is empty however won't be empty soon. Basically what was happening was sometimes all but one thread would exit leaving only one thread remaining to do the rest of the installs in a linear fashion.

time.sleep(0.5)
continue

with mutex:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure that I understand properly, the active_count value ensure that the process doesn't finish accidentally after popping up the last item from the queue before processing it, right?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The active_count was a way to communicate between all the threads to stay alive until all installs have finished. The queue grows slowly causing it to go empty at times

_request, parent_identifier, _editable_mode = queue.get()
if _request in installed_requests:
continue

# Add to installed requests to prevent duplicate install
installed_requests.add(_request)

# Increase the active count
active_count.value += 1

# Setup temporary folder for package installation.
cache_path = tempfile.mkdtemp()
package_path = tempfile.mkdtemp()

package_paths.add(package_path)
cache_paths.add(cache_path)

# Fetch environment mapping and installation path.
_context_mapping = copy.deepcopy(context_mapping)

# Replace the temp package path with the current package path
_context_mapping["environ"]["PYTHONPATH"] = (
library_path.replace(temp_package_path, package_path)
)
lib_path = _context_mapping["environ"]["PYTHONPATH"]
wiz.filesystem.ensure_directory(lib_path)

# Needed for the editable mode.
wiz.filesystem.ensure_directory(lib_path)

package_mapping, _overwrite = _install(
_request, output_path, _context_mapping, definition_mapping,
package_path, cache_path, installed_packages,
definition_path=definition_path,
overwrite=_overwrite,
editable_mode=_editable_mode,
update_existing_definitions=update_existing_definitions,
parent_identifier=parent_identifier,
continue_on_error=continue_on_error
)
if package_mapping is None:
# Decrement the active count
with mutex:
active_count.value -= 1

continue

installed_packages.add(package_mapping["identifier"])

# Indicate if package was skipped.
if package_mapping.get("skipped", False):
skipped_packages.add(package_mapping["identifier"])

# Fill up queue with requirements extracted from package
# dependencies.
if not no_dependencies:
for _request in package_mapping.get("requirements", []):
queue.put(
(_request, package_mapping["identifier"], False)
)

# Decrement the active count
with mutex:
active_count.value -= 1

# Break out of loop if only installing first in queue.
if first:
break

except Exception as err:
# Collect exception to stop execution on main thread later.
errors_found.add(err)
raise

return _overwrite

context_mapping = fetch_context_mapping(temp_package_path, python_target)
library_path = context_mapping["environ"]["PYTHONPATH"]

for request in requests:
queue.put((request, None, editable_mode))

# Install the initial requests and collect dependencies.
overwrite = _worker_execute(first=True, _overwrite=overwrite)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be possible to avoid calling the _worker_execute twice? What if you fill up the queue and pass it to the ThreadPool initializer?

queue = six.moves.queue.Queue()

for request in requests:
    queue.put((request, None, editable_mode))

pool = multiprocessing.pool.ThreadPool(
    THREAD_COUNT, _worker_execute,
    (queue, overwrite if overwrite is not None else False)
)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solved two things:

  1. Calling the initial requests on the main thread would allow a user to select from the overwrite prompt if it shows up. Then the "yes to all" would be respected for the dependencies.
  2. The queue would be full with dependencies by the time the thread pool started. I think that was only an issue before I added the active_count variable.


# Use a thread pool to install remaining requests.
pool = multiprocessing.pool.ThreadPool(
THREAD_COUNT, _worker_execute,
(False, overwrite if overwrite is not None else False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be safer to have "overwrite" being a multiprocessing.Value instance? So you could just mutate it in a thread without having to return it

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds fine

)

# Indicate if package was skipped.
if package_mapping.get("skipped", False):
skipped_packages.add(package_mapping["identifier"])
# Wait until all threads have finished.
pool.close()
pool.join()

# Fill up queue with requirements extracted from package
# dependencies.
if not no_dependencies:
for request in package_mapping.get("requirements", []):
queue.put((request, package_mapping["identifier"], False))
# Remove package and cache paths.
for path in package_paths.union(cache_paths):
shutil.rmtree(path)

finally:
shutil.rmtree(package_path)
shutil.rmtree(cache_path)
# Raise a RuntimeError if any errors have been found.
if errors_found:
raise RuntimeError(list(errors_found)[0])

# Sort and filter packages installed.
installed = sorted(
Expand Down
3 changes: 2 additions & 1 deletion source/qip/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def execute(command, environ_mapping, quiet=False):
shlex.split(command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=environ_mapping
env=environ_mapping,
close_fds=True
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that necessary? Is that because you risk a "Too many open files" error?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason the output parsing that grabs the request from the stdout would fail. Basically the stdout would be empty even the pip install was successful. I've had something like this happen to me before and I knew that close_fds=True would fix it. If you remove it, you'll see what happens.

)

if not quiet:
Expand Down