File tree Expand file tree Collapse file tree 3 files changed +39
-7
lines changed Expand file tree Collapse file tree 3 files changed +39
-7
lines changed Original file line number Diff line number Diff line change 95
95
WorkerPlugin ,
96
96
_get_plugin_name ,
97
97
)
98
+ from distributed .exceptions import WorkerStartTimeoutError
98
99
from distributed .metrics import time
99
100
from distributed .objects import HasWhat , SchedulerInfo , WhoHas
100
101
from distributed .protocol import to_serialize
@@ -1651,10 +1652,8 @@ def running_workers(info):
1651
1652
1652
1653
while running_workers (info ) < n_workers :
1653
1654
if deadline and time () > deadline :
1654
- raise TimeoutError (
1655
- "Only %d/%d workers arrived after %s"
1656
- % (running_workers (info ), n_workers , timeout )
1657
- )
1655
+ assert timeout is not None
1656
+ raise WorkerStartTimeoutError (running_workers (info ), n_workers , timeout )
1658
1657
await asyncio .sleep (0.1 )
1659
1658
info = await self .scheduler .identity ()
1660
1659
self ._scheduler_identity = SchedulerInfo (info )
Original file line number Diff line number Diff line change 18
18
from distributed .compatibility import PeriodicCallback
19
19
from distributed .core import Status
20
20
from distributed .deploy .adaptive import Adaptive
21
+ from distributed .exceptions import WorkerStartTimeoutError
21
22
from distributed .metrics import time
22
23
from distributed .objects import SchedulerInfo
23
24
from distributed .utils import (
@@ -610,9 +611,8 @@ def running_workers(info):
610
611
611
612
while n_workers and running_workers (self .scheduler_info ) < n_workers :
612
613
if deadline and time () > deadline :
613
- raise TimeoutError (
614
- "Only %d/%d workers arrived after %s"
615
- % (running_workers (self .scheduler_info ), n_workers , timeout )
614
+ raise WorkerStartTimeoutError (
615
+ running_workers (self .scheduler_info ), n_workers , timeout
616
616
)
617
617
await asyncio .sleep (0.1 )
618
618
Original file line number Diff line number Diff line change 1
1
from __future__ import annotations
2
2
3
+ from asyncio import TimeoutError
4
+
3
5
4
6
class Reschedule (Exception ):
5
7
"""Reschedule this task
@@ -13,3 +15,34 @@ class Reschedule(Exception):
13
15
load across the cluster has significantly changed since first scheduling
14
16
the task.
15
17
"""
18
+
19
+
20
+ class WorkerStartTimeoutError (TimeoutError ):
21
+ """Raised when the expected number of workers to not start within the timeout period."""
22
+
23
+ def __init__ (
24
+ self , available_workers : int , expected_workers : int , timeout : float
25
+ ) -> None :
26
+ super ().__init__ (available_workers , expected_workers , timeout )
27
+
28
+ @property
29
+ def available_workers (self ) -> int :
30
+ """Number of workers that are available."""
31
+ return self .args [0 ]
32
+
33
+ @property
34
+ def expected_workers (self ) -> int :
35
+ """Number of workers that were expected to be available."""
36
+ return self .args [1 ]
37
+
38
+ @property
39
+ def timeout (self ) -> float :
40
+ """Timeout period in seconds."""
41
+ return self .args [2 ]
42
+
43
+ def __str__ (self ) -> str :
44
+ return "Only %d/%d workers arrived after %s" % (
45
+ self .available_workers ,
46
+ self .expected_workers ,
47
+ self .timeout ,
48
+ )
You can’t perform that action at this time.
0 commit comments