Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/publish-test-results.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copied from https://github.com/EnricoMi/publish-unit-test-result-action/blob/v1.23/README.md#support-fork-repositories-and-dependabot-branches
# Warning: changes to this workflow will NOT be picked up until they land in the main branch!
# See https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#workflow_run

name: Publish test results

on:
workflow_run:
workflows: [Tests]
types: [completed]

jobs:
publish-test-results:
name: Publish test results
runs-on: ubuntu-latest
if: github.event.workflow_run.conclusion != 'skipped'

steps:
- name: Download and extract artifacts
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
run: |
mkdir artifacts && cd artifacts

artifacts_url=${{ github.event.workflow_run.artifacts_url }}

gh api "$artifacts_url" -q '.artifacts[] | [.name, .archive_download_url] | @tsv' | while read artifact
do
IFS=$'\t' read name url <<< "$artifact"
gh api $url > "$name.zip"
unzip -d "$name" "$name.zip"
done

- name: Publish Unit Test Results
uses: EnricoMi/publish-unit-test-result-action@v1
with:
commit: ${{ github.event.workflow_run.head_sha }}
event_file: artifacts/Event File/event.json
event_name: ${{ github.event.workflow_run.event }}
files: artifacts/**/*.xml
comment_mode: off
13 changes: 12 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ jobs:
- name: Coverage
uses: codecov/codecov-action@v1

- name: Upload test artifacts
- name: Upload test results
# ensure this runs even if pytest fails
if: >
always() &&
Expand All @@ -149,3 +149,14 @@ jobs:
name: ${{ env.TEST_ID }}_cluster_dumps
path: test_cluster_dump
if-no-files-found: ignore

# Publish an artifact for the event; used by publish-test-results.yaml
event_file:
name: "Event File"
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v2
with:
name: Event File
path: ${{ github.event_path }}
2 changes: 1 addition & 1 deletion distributed/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ServerNode(Server):
# XXX avoid inheriting from Server? there is some large potential for confusion
# between base and derived attribute namespaces...

def versions(self, comm=None, packages=None):
def versions(self, comm=None, packages=()):
return get_versions(packages=packages)

def start_services(self, default_listen_ip):
Expand Down
13 changes: 2 additions & 11 deletions distributed/shuffle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
try:
import pandas
except ImportError:
SHUFFLE_AVAILABLE = False
else:
del pandas
SHUFFLE_AVAILABLE = True

from .shuffle import rearrange_by_column_p2p
from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension
from .shuffle import rearrange_by_column_p2p
from .shuffle_extension import ShuffleId, ShuffleMetadata, ShuffleWorkerExtension

__all__ = [
"SHUFFLE_AVAILABLE",
"rearrange_by_column_p2p",
"ShuffleId",
"ShuffleMetadata",
Expand Down
5 changes: 4 additions & 1 deletion distributed/shuffle/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import TYPE_CHECKING

from dask.base import tokenize
from dask.dataframe import DataFrame
from dask.delayed import Delayed, delayed
from dask.highlevelgraph import HighLevelGraph

Expand All @@ -12,6 +11,8 @@
if TYPE_CHECKING:
import pandas as pd

from dask.dataframe import DataFrame


def get_ext() -> ShuffleWorkerExtension:
from distributed import get_worker
Expand Down Expand Up @@ -53,6 +54,8 @@ def rearrange_by_column_p2p(
column: str,
npartitions: int | None = None,
):
from dask.dataframe import DataFrame

npartitions = npartitions or df.npartitions
token = tokenize(df, column, npartitions)

Expand Down
6 changes: 4 additions & 2 deletions distributed/shuffle/shuffle_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, NewType

import pandas as pd

from distributed.protocol import to_serialize
from distributed.utils import sync

if TYPE_CHECKING:
import pandas as pd

from distributed.worker import Worker

ShuffleId = NewType("ShuffleId", str)
Expand Down Expand Up @@ -103,6 +103,8 @@ async def add_partition(self, data: pd.DataFrame) -> None:
await asyncio.gather(*tasks)

def get_output_partition(self, i: int) -> pd.DataFrame:
import pandas as pd

assert self.transferred, "`get_output_partition` called before barrier task"

assert self.metadata.worker_for(i) == self.worker.address, (
Expand Down
26 changes: 26 additions & 0 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import multiprocessing as mp
import os
import random
import sys
from contextlib import suppress
from time import sleep
from unittest import mock
Expand Down Expand Up @@ -610,3 +611,28 @@ async def test_environ_plugin(c, s, a, b):
assert results[a.worker_address] == "123"
assert results[b.worker_address] == "123"
assert results[n.worker_address] == "123"


@pytest.mark.parametrize(
"modname",
[
pytest.param("numpy", marks=pytest.mark.xfail(reason="distributed#5729")),
"scipy",
"pandas",
],
)
@gen_cluster(client=True, Worker=Nanny, nthreads=[("", 1)])
async def test_no_unnecessary_imports_on_worker(c, s, a, modname):
"""
Regression test against accidentally importing unnecessary modules at worker startup.

Importing modules like pandas slows down worker startup, especially if workers are
loading their software environment from NFS or other non-local filesystems.
It also slightly increases memory footprint.
"""

def assert_no_import(dask_worker):
assert modname not in sys.modules

await c.wait_for_workers(1)
await c.run(assert_no_import)
13 changes: 6 additions & 7 deletions distributed/tests/test_versions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import sys

import msgpack
import pytest
import tornado

Expand Down Expand Up @@ -145,17 +146,15 @@ def test_python_version():
def test_version_custom_pkgs():
out = get_versions(
[
# Use custom function
("distributed", lambda mod: "123"),
# Use version_of_package
"notexist",
("pytest", None), # has __version__
"tornado", # has version
"math", # has nothing
"pytest",
"tornado",
"msgpack",
"math",
]
)["packages"]
assert out["distributed"] == "123"
assert out["notexist"] is None
assert out["pytest"] == pytest.__version__
assert out["tornado"] == tornado.version
assert out["msgpack"] == ".".join(str(v) for v in msgpack.version)
assert out["math"] is None
80 changes: 32 additions & 48 deletions distributed/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,40 @@
import platform
import struct
import sys
from collections.abc import Callable, Iterable
from collections.abc import Iterable
from itertools import chain
from types import ModuleType
from typing import Any

try:
import importlib.metadata
except ImportError:
# Python 3.7 compatibility

import pkg_resources

_version = lambda modname: pkg_resources.get_distribution(modname).version
else:
_version = importlib.metadata.version

required_packages = [
("dask", lambda p: p.__version__),
("distributed", lambda p: p.__version__),
("msgpack", lambda p: ".".join([str(v) for v in p.version])),
("cloudpickle", lambda p: p.__version__),
("tornado", lambda p: p.version),
("toolz", lambda p: p.__version__),
"dask",
"distributed",
"msgpack",
"cloudpickle",
"tornado",
"toolz",
]

optional_packages = [
("numpy", lambda p: p.__version__),
("pandas", lambda p: p.__version__),
("lz4", lambda p: p.__version__),
("blosc", lambda p: p.__version__),
"numpy",
"pandas",
"lz4",
"blosc",
]


# only these scheduler packages will be checked for version mismatch
scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4", "blosc"}
scheduler_relevant_packages = set(required_packages) | {"lz4", "blosc"}


# notes to be displayed for mismatch packages
Expand All @@ -39,18 +49,16 @@
}


def get_versions(
packages: Iterable[str | tuple[str, Callable[[ModuleType], str | None]]]
| None = None
) -> dict[str, dict[str, Any]]:
def get_versions(packages: Iterable[str] = ()) -> dict[str, dict[str, Any]]:
"""Return basic information on our software installation, and our installed versions
of packages
"""
return {
"host": get_system_info(),
"packages": get_package_info(
chain(required_packages, optional_packages, packages or [])
),
"packages": {
"python": ".".join(map(str, sys.version_info)),
**get_package_info(chain(required_packages, optional_packages, packages)),
},
}


Expand All @@ -69,37 +77,13 @@ def get_system_info() -> dict[str, Any]:
}


def version_of_package(pkg: ModuleType) -> str | None:
"""Try a variety of common ways to get the version of a package"""
from contextlib import suppress

with suppress(AttributeError):
return pkg.__version__ # type: ignore
with suppress(AttributeError):
return str(pkg.version) # type: ignore
with suppress(AttributeError):
return ".".join(map(str, pkg.version_info)) # type: ignore
return None


def get_package_info(
pkgs: Iterable[str | tuple[str, Callable[[ModuleType], str | None]]]
) -> dict[str, str | None]:
def get_package_info(pkgs: Iterable[str]) -> dict[str, str | None]:
"""get package versions for the passed required & optional packages"""

pversions: dict[str, str | None] = {"python": ".".join(map(str, sys.version_info))}
for pkg in pkgs:
if isinstance(pkg, (tuple, list)):
modname, ver_f = pkg
if ver_f is None:
ver_f = version_of_package
else:
modname = pkg
ver_f = version_of_package

pversions: dict[str, str | None] = {}
for modname in pkgs:
try:
mod = importlib.import_module(modname)
pversions[modname] = ver_f(mod)
pversions[modname] = _version(modname)
except Exception:
pversions[modname] = None

Expand Down
7 changes: 3 additions & 4 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
typename,
)

from . import comm, preloading, profile, shuffle, system, utils
from . import comm, preloading, profile, system, utils
from .batched import BatchedSend
from .comm import Comm, connect, get_address_host
from .comm.addressing import address_from_user_args, parse_address
Expand All @@ -67,6 +67,7 @@
from .protocol import pickle, to_serialize
from .pubsub import PubSubWorkerExtension
from .security import Security
from .shuffle import ShuffleWorkerExtension
from .sizeof import safe_sizeof as sizeof
from .threadpoolexecutor import ThreadPoolExecutor
from .threadpoolexecutor import secede as tpe_secede
Expand Down Expand Up @@ -115,9 +116,7 @@
# Worker.status subsets
RUNNING = {Status.running, Status.paused, Status.closing_gracefully}

DEFAULT_EXTENSIONS: list[type] = [PubSubWorkerExtension]
if shuffle.SHUFFLE_AVAILABLE:
DEFAULT_EXTENSIONS.append(shuffle.ShuffleWorkerExtension)
DEFAULT_EXTENSIONS: list[type] = [PubSubWorkerExtension, ShuffleWorkerExtension]

DEFAULT_METRICS: dict[str, Callable[[Worker], Any]] = {}

Expand Down