diff --git a/.github/workflows/publish-test-results.yaml b/.github/workflows/publish-test-results.yaml new file mode 100644 index 00000000000..9082174c86f --- /dev/null +++ b/.github/workflows/publish-test-results.yaml @@ -0,0 +1,41 @@ +# Copied from https://github.com/EnricoMi/publish-unit-test-result-action/blob/v1.23/README.md#support-fork-repositories-and-dependabot-branches +# Warning: changes to this workflow will NOT be picked up until they land in the main branch! +# See https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#workflow_run + +name: Publish test results + +on: + workflow_run: + workflows: [Tests] + types: [completed] + +jobs: + publish-test-results: + name: Publish test results + runs-on: ubuntu-latest + if: github.event.workflow_run.conclusion != 'skipped' + + steps: + - name: Download and extract artifacts + env: + GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} + run: | + mkdir artifacts && cd artifacts + + artifacts_url=${{ github.event.workflow_run.artifacts_url }} + + gh api "$artifacts_url" -q '.artifacts[] | [.name, .archive_download_url] | @tsv' | while read artifact + do + IFS=$'\t' read name url <<< "$artifact" + gh api $url > "$name.zip" + unzip -d "$name" "$name.zip" + done + + - name: Publish Unit Test Results + uses: EnricoMi/publish-unit-test-result-action@v1 + with: + commit: ${{ github.event.workflow_run.head_sha }} + event_file: artifacts/Event File/event.json + event_name: ${{ github.event.workflow_run.event }} + files: artifacts/**/*.xml + comment_mode: off diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6755d5fed5a..2fa2c5bd177 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -130,7 +130,7 @@ jobs: - name: Coverage uses: codecov/codecov-action@v1 - - name: Upload test artifacts + - name: Upload test results # ensure this runs even if pytest fails if: > always() && @@ -149,3 +149,14 @@ jobs: name: ${{ env.TEST_ID }}_cluster_dumps path: test_cluster_dump if-no-files-found: ignore + + # Publish an artifact for the event; used by publish-test-results.yaml + event_file: + name: "Event File" + runs-on: ubuntu-latest + steps: + - name: Upload + uses: actions/upload-artifact@v2 + with: + name: Event File + path: ${{ github.event_path }} diff --git a/distributed/node.py b/distributed/node.py index 8e59bafb22c..f1bace03f9c 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -25,7 +25,7 @@ class ServerNode(Server): # XXX avoid inheriting from Server? there is some large potential for confusion # between base and derived attribute namespaces... - def versions(self, comm=None, packages=None): + def versions(self, comm=None, packages=()): return get_versions(packages=packages) def start_services(self, default_listen_ip): diff --git a/distributed/shuffle/__init__.py b/distributed/shuffle/__init__.py index a431c5bddfd..d530f9b679d 100644 --- a/distributed/shuffle/__init__.py +++ b/distributed/shuffle/__init__.py @@ -1,16 +1,7 @@ -try: - import pandas -except ImportError: - SHUFFLE_AVAILABLE = False -else: - del pandas - SHUFFLE_AVAILABLE = True - - from .shuffle import rearrange_by_column_p2p - from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension +from .shuffle import rearrange_by_column_p2p +from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension __all__ = [ - "SHUFFLE_AVAILABLE", "rearrange_by_column_p2p", "ShuffleId", "ShuffleMetadata", diff --git a/distributed/shuffle/shuffle.py b/distributed/shuffle/shuffle.py index 5811afd732f..33fe1189a09 100644 --- a/distributed/shuffle/shuffle.py +++ b/distributed/shuffle/shuffle.py @@ -3,7 +3,6 @@ from typing import TYPE_CHECKING from dask.base import tokenize -from dask.dataframe import DataFrame from dask.delayed import Delayed, delayed from dask.highlevelgraph import HighLevelGraph @@ -12,6 +11,8 @@ if TYPE_CHECKING: import pandas as pd + from dask.dataframe import DataFrame + def get_ext() -> ShuffleWorkerExtension: from distributed import get_worker @@ -53,6 +54,8 @@ def rearrange_by_column_p2p( column: str, npartitions: int | None = None, ): + from dask.dataframe import DataFrame + npartitions = npartitions or df.npartitions token = tokenize(df, column, npartitions) diff --git a/distributed/shuffle/shuffle_extension.py b/distributed/shuffle/shuffle_extension.py index e5e0baaf7bc..8f13480b91d 100644 --- a/distributed/shuffle/shuffle_extension.py +++ b/distributed/shuffle/shuffle_extension.py @@ -6,12 +6,12 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, NewType -import pandas as pd - from distributed.protocol import to_serialize from distributed.utils import sync if TYPE_CHECKING: + import pandas as pd + from distributed.worker import Worker ShuffleId = NewType("ShuffleId", str) @@ -103,6 +103,8 @@ async def add_partition(self, data: pd.DataFrame) -> None: await asyncio.gather(*tasks) def get_output_partition(self, i: int) -> pd.DataFrame: + import pandas as pd + assert self.transferred, "`get_output_partition` called before barrier task" assert self.metadata.worker_for(i) == self.worker.address, ( diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index dc3eba86e58..70692415c38 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -4,6 +4,7 @@ import multiprocessing as mp import os import random +import sys from contextlib import suppress from time import sleep from unittest import mock @@ -610,3 +611,28 @@ async def test_environ_plugin(c, s, a, b): assert results[a.worker_address] == "123" assert results[b.worker_address] == "123" assert results[n.worker_address] == "123" + + +@pytest.mark.parametrize( + "modname", + [ + pytest.param("numpy", marks=pytest.mark.xfail(reason="distributed#5729")), + "scipy", + "pandas", + ], +) +@gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)]) +async def test_no_unnecessary_imports_on_worker(c, s, a, modname): + """ + Regression test against accidentally importing unnecessary modules at worker startup. + + Importing modules like pandas slows down worker startup, especially if workers are + loading their software environment from NFS or other non-local filesystems. + It also slightly increases memory footprint. + """ + + def assert_no_import(dask_worker): + assert modname not in sys.modules + + await c.wait_for_workers(1) + await c.run(assert_no_import) diff --git a/distributed/tests/test_versions.py b/distributed/tests/test_versions.py index 69803b8f144..af6d6a6929a 100644 --- a/distributed/tests/test_versions.py +++ b/distributed/tests/test_versions.py @@ -1,6 +1,7 @@ import re import sys +import msgpack import pytest import tornado @@ -145,17 +146,15 @@ def test_python_version(): def test_version_custom_pkgs(): out = get_versions( [ - # Use custom function - ("distributed", lambda mod: "123"), - # Use version_of_package "notexist", - ("pytest", None), # has __version__ - "tornado", # has version - "math", # has nothing + "pytest", + "tornado", + "msgpack", + "math", ] )["packages"] - assert out["distributed"] == "123" assert out["notexist"] is None assert out["pytest"] == pytest.__version__ assert out["tornado"] == tornado.version + assert out["msgpack"] == ".".join(str(v) for v in msgpack.version) assert out["math"] is None diff --git a/distributed/versions.py b/distributed/versions.py index d1e51d4c9cc..a93d7b504a5 100644 --- a/distributed/versions.py +++ b/distributed/versions.py @@ -7,30 +7,40 @@ import platform import struct import sys -from collections.abc import Callable, Iterable +from collections.abc import Iterable from itertools import chain -from types import ModuleType from typing import Any +try: + import importlib.metadata +except ImportError: + # Python 3.7 compatibility + + import pkg_resources + + _version = lambda modname: pkg_resources.get_distribution(modname).version +else: + _version = importlib.metadata.version + required_packages = [ - ("dask", lambda p: p.__version__), - ("distributed", lambda p: p.__version__), - ("msgpack", lambda p: ".".join([str(v) for v in p.version])), - ("cloudpickle", lambda p: p.__version__), - ("tornado", lambda p: p.version), - ("toolz", lambda p: p.__version__), + "dask", + "distributed", + "msgpack", + "cloudpickle", + "tornado", + "toolz", ] optional_packages = [ - ("numpy", lambda p: p.__version__), - ("pandas", lambda p: p.__version__), - ("lz4", lambda p: p.__version__), - ("blosc", lambda p: p.__version__), + "numpy", + "pandas", + "lz4", + "blosc", ] # only these scheduler packages will be checked for version mismatch -scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4", "blosc"} +scheduler_relevant_packages = set(required_packages) | {"lz4", "blosc"} # notes to be displayed for mismatch packages @@ -39,18 +49,16 @@ } -def get_versions( - packages: Iterable[str | tuple[str, Callable[[ModuleType], str | None]]] - | None = None -) -> dict[str, dict[str, Any]]: +def get_versions(packages: Iterable[str] = ()) -> dict[str, dict[str, Any]]: """Return basic information on our software installation, and our installed versions of packages """ return { "host": get_system_info(), - "packages": get_package_info( - chain(required_packages, optional_packages, packages or []) - ), + "packages": { + "python": ".".join(map(str, sys.version_info)), + **get_package_info(chain(required_packages, optional_packages, packages)), + }, } @@ -69,37 +77,13 @@ def get_system_info() -> dict[str, Any]: } -def version_of_package(pkg: ModuleType) -> str | None: - """Try a variety of common ways to get the version of a package""" - from contextlib import suppress - - with suppress(AttributeError): - return pkg.__version__ # type: ignore - with suppress(AttributeError): - return str(pkg.version) # type: ignore - with suppress(AttributeError): - return ".".join(map(str, pkg.version_info)) # type: ignore - return None - - -def get_package_info( - pkgs: Iterable[str | tuple[str, Callable[[ModuleType], str | None]]] -) -> dict[str, str | None]: +def get_package_info(pkgs: Iterable[str]) -> dict[str, str | None]: """get package versions for the passed required & optional packages""" - pversions: dict[str, str | None] = {"python": ".".join(map(str, sys.version_info))} - for pkg in pkgs: - if isinstance(pkg, (tuple, list)): - modname, ver_f = pkg - if ver_f is None: - ver_f = version_of_package - else: - modname = pkg - ver_f = version_of_package - + pversions: dict[str, str | None] = {} + for modname in pkgs: try: - mod = importlib.import_module(modname) - pversions[modname] = ver_f(mod) + pversions[modname] = _version(modname) except Exception: pversions[modname] = None diff --git a/distributed/worker.py b/distributed/worker.py index 72d3c93f5e3..19adedfcf0a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -44,7 +44,7 @@ typename, ) -from . import comm, preloading, profile, shuffle, system, utils +from . import comm, preloading, profile, system, utils from .batched import BatchedSend from .comm import Comm, connect, get_address_host from .comm.addressing import address_from_user_args, parse_address @@ -67,6 +67,7 @@ from .protocol import pickle, to_serialize from .pubsub import PubSubWorkerExtension from .security import Security +from .shuffle import ShuffleWorkerExtension from .sizeof import safe_sizeof as sizeof from .threadpoolexecutor import ThreadPoolExecutor from .threadpoolexecutor import secede as tpe_secede @@ -115,9 +116,7 @@ # Worker.status subsets RUNNING = {Status.running, Status.paused, Status.closing_gracefully} -DEFAULT_EXTENSIONS: list[type] = [PubSubWorkerExtension] -if shuffle.SHUFFLE_AVAILABLE: - DEFAULT_EXTENSIONS.append(shuffle.ShuffleWorkerExtension) +DEFAULT_EXTENSIONS: list[type] = [PubSubWorkerExtension, ShuffleWorkerExtension] DEFAULT_METRICS: dict[str, Callable[[Worker], Any]] = {}