Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import atexit
import copy
import inspect
import itertools
import json
import logging
import os
Expand Down Expand Up @@ -31,7 +32,7 @@
from tlz import first, groupby, merge, partition_all, valmap

import dask
from dask.base import collections_to_dsk, normalize_token, tokenize
from dask.base import collections_to_dsk, tokenize
from dask.core import flatten, validate_key
from dask.highlevelgraph import HighLevelGraph
from dask.optimization import SubgraphCallable
Expand Down Expand Up @@ -210,11 +211,15 @@ class Future(WrappedKey):

_cb_executor = None
_cb_executor_pid = None
_counter = itertools.count()
# Make sure this stays unique even across multiple processes or hosts
_uid = uuid.uuid4().hex

def __init__(self, key, client=None, inform=True, state=None):
def __init__(self, key, client=None, inform=True, state=None, _id=None):
self.key = key
self._cleared = False
self._client = client
self._id = _id or (Future._uid, next(Future._counter))
self._input_state = state
self._inform = inform
self._state = None
Expand Down Expand Up @@ -499,8 +504,16 @@ def release(self):
except TypeError: # pragma: no cover
pass # Shutting down, add_callback may be None

@staticmethod
def make_future(key, id):
# Can't use kwargs in pickle __reduce__ methods
return Future(key=key, _id=id)

def __reduce__(self) -> str | tuple[Any, ...]:
return Future, (self.key,)
return Future.make_future, (self.key, self._id)

def __dask_tokenize__(self):
return (type(self).__name__, self.key, self._id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return (type(self).__name__, self.key, self._id)
return (normalize_token(type(self)), self.key, self._id)

nit - mostly in case a third party subclasses distributed.Future without changing the name


def __del__(self):
try:
Expand Down Expand Up @@ -643,18 +656,6 @@ async def done_callback(future, callback):
callback(future)


@partial(normalize_token.register, Future)
def normalize_future(f):
"""Returns the key and the type as a list

Parameters
----------
list
The key and the type
"""
return [f.key, type(f)]


class AllExit(Exception):
"""Custom exception class to exit All(...) early."""

Expand Down Expand Up @@ -3434,9 +3435,11 @@ def compute(

if traverse:
collections = tuple(
dask.delayed(a)
if isinstance(a, (list, set, tuple, dict, Iterator))
else a
(
dask.delayed(a)
if isinstance(a, (list, set, tuple, dict, Iterator))
else a
)
for a in collections
)

Expand Down
6 changes: 4 additions & 2 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,11 +863,13 @@ async def test_tokenize_on_futures(c, s, a, b):
y = c.submit(inc, 1)
tok = tokenize(x)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tok = tokenize(x)

assert tokenize(x) == tokenize(x)
assert tokenize(x) == tokenize(y)
# Tokens must be unique per instance
# See https://github.com/dask/distributed/issues/8561
assert tokenize(x) != tokenize(y)

c.futures[x.key].finish()

assert tok == tokenize(y)
assert tok != tokenize(y)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert tok != tokenize(y)

nit: redundant with the test above



@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
Expand Down