diff --git a/distributed/client.py b/distributed/client.py index 935cda3061..d46f942744 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -95,6 +95,7 @@ WorkerPlugin, _get_plugin_name, ) +from distributed.exceptions import WorkerStartTimeoutError from distributed.metrics import time from distributed.objects import HasWhat, SchedulerInfo, WhoHas from distributed.protocol import to_serialize @@ -1651,10 +1652,8 @@ def running_workers(info): while running_workers(info) < n_workers: if deadline and time() > deadline: - raise TimeoutError( - "Only %d/%d workers arrived after %s" - % (running_workers(info), n_workers, timeout) - ) + assert timeout is not None + raise WorkerStartTimeoutError(running_workers(info), n_workers, timeout) await asyncio.sleep(0.1) info = await self.scheduler.identity() self._scheduler_identity = SchedulerInfo(info) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 233fa0d969..7b68f67114 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -18,6 +18,7 @@ from distributed.compatibility import PeriodicCallback from distributed.core import Status from distributed.deploy.adaptive import Adaptive +from distributed.exceptions import WorkerStartTimeoutError from distributed.metrics import time from distributed.objects import SchedulerInfo from distributed.utils import ( @@ -610,9 +611,8 @@ def running_workers(info): while n_workers and running_workers(self.scheduler_info) < n_workers: if deadline and time() > deadline: - raise TimeoutError( - "Only %d/%d workers arrived after %s" - % (running_workers(self.scheduler_info), n_workers, timeout) + raise WorkerStartTimeoutError( + running_workers(self.scheduler_info), n_workers, timeout ) await asyncio.sleep(0.1) diff --git a/distributed/exceptions.py b/distributed/exceptions.py index bd18fe57ae..4f9de63846 100644 --- a/distributed/exceptions.py +++ b/distributed/exceptions.py @@ -1,5 +1,7 @@ from __future__ import annotations +from asyncio import TimeoutError + class Reschedule(Exception): """Reschedule this task @@ -13,3 +15,34 @@ class Reschedule(Exception): load across the cluster has significantly changed since first scheduling the task. """ + + +class WorkerStartTimeoutError(TimeoutError): + """Raised when the expected number of workers to not start within the timeout period.""" + + def __init__( + self, available_workers: int, expected_workers: int, timeout: float + ) -> None: + super().__init__(available_workers, expected_workers, timeout) + + @property + def available_workers(self) -> int: + """Number of workers that are available.""" + return self.args[0] + + @property + def expected_workers(self) -> int: + """Number of workers that were expected to be available.""" + return self.args[1] + + @property + def timeout(self) -> float: + """Timeout period in seconds.""" + return self.args[2] + + def __str__(self) -> str: + return "Only %d/%d workers arrived after %s" % ( + self.available_workers, + self.expected_workers, + self.timeout, + )