Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ jobs:
strategy:
matrix:
include:
- toxenv: py39-dim
- toxenv: py310-dim
- toxenv: py311-dim
- toxenv: py312-dim
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ jobs:
strategy:
matrix:
include:
- os: ubuntu-latest
python: 3.9
toxenv: py39
- os: ubuntu-latest
python: '3.10'
toxenv: py310
Expand All @@ -29,6 +26,9 @@ jobs:
- os: ubuntu-latest
python: '3.13'
toxenv: py313
- os: ubuntu-latest
python: '3.14'
toxenv: py314
runs-on: ${{ matrix.os }}

steps:
Expand Down
12 changes: 6 additions & 6 deletions docs/contributing/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ and ProxyStore installed in editable mode with the necessary extras options.
```bash
$ git clone https://github.com/proxystore/extensions
$ cd extensions
$ tox --devenv venv -e py311
$ tox --devenv venv -e py314
$ . venv/bin/activate
```

!!! warning

Running Tox in a Conda environment is possible but it may conflict with
Tox's ability to find the correct Python versions. E.g., if your
Conda environment is Python 3.9, running `#!bash $ tox -e p38` may still use
Python 3.9.
Conda environment is Python 3.14, running `#!bash $ tox -e p313` may still use
Python 3.14.

To install manually:
```bash
Expand Down Expand Up @@ -61,9 +61,9 @@ Code that is useful for building tests but is not a test itself belongs in the

```bash
# Run all tests in tests/
$ tox -e py311
$ tox -e py314
# Run a specific test
$ tox -e py311 -- tests/x/y_test.py::test_z
$ tox -e py314 -- tests/x/y_test.py::test_z
```

### Tests (docker)
Expand All @@ -81,7 +81,7 @@ $ docker pull ghcr.io/proxystore/proxystore-dim:nightly
# Be sure to change the path to your proxystore repo directory
$ docker run --rm -it --network host -v /path/to/proxystore:/proxystore ghcr.io/proxystore/proxystore-dim:nightly
# Inside container
$ tox -e py311-dim
$ tox -e py314-dim
```

## Docs
Expand Down
8 changes: 5 additions & 3 deletions proxystore_ex/connectors/daos.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def get_batch(self, keys: Sequence[DAOSKey]) -> list[BytesLike | None]:
objs.append(self.get(key))
return objs

def new_key(self, obj: bytes | None = None) -> DAOSKey:
def new_key(self, obj: BytesLike | None = None) -> DAOSKey:
"""Create a new key.

Args:
Expand Down Expand Up @@ -286,10 +286,12 @@ def put_batch(self, objs: Sequence[BytesLike]) -> list[DAOSKey]:
)
for _ in objs
]
self._dict.bput({key.dict_key: obj for key, obj in zip(keys, objs)})
self._dict.bput(
{key.dict_key: obj for key, obj in zip(keys, objs, strict=False)},
)
return keys

def set(self, key: DAOSKey, obj: bytes) -> None:
def set(self, key: DAOSKey, obj: BytesLike) -> None:
"""Set the object associated with a key.

Note:
Expand Down
2 changes: 1 addition & 1 deletion proxystore_ex/connectors/dim/margo.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def put_batch(self, objs: Sequence[BytesLike]) -> list[DIMKey]:
]
rpcs: list[RPC] = []

for key, obj in zip(keys, objs):
for key, obj in zip(keys, objs, strict=False):
blk = self.engine.create_bulk(obj, bulk.read_only)
rpcs.append(RPC(operation='put', key=key, data=blk))

Expand Down
2 changes: 1 addition & 1 deletion proxystore_ex/connectors/dim/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def put_batch(self, objs: Sequence[BytesLike]) -> list[DIMKey]:
]
rpcs = [
RPC(operation='put', key=key, data=obj)
for key, obj in zip(keys, objs)
for key, obj in zip(keys, objs, strict=False)
]
self._send_rpcs(rpcs)
return keys
Expand Down
2 changes: 1 addition & 1 deletion proxystore_ex/connectors/dim/zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def put_batch(self, objs: Sequence[BytesLike]) -> list[DIMKey]:
]
rpcs = [
RPC(operation='put', key=key, data=obj)
for key, obj in zip(keys, objs)
for key, obj in zip(keys, objs, strict=False)
]
self._send_rpcs(rpcs)
return keys
Expand Down
66 changes: 21 additions & 45 deletions proxystore_ex/plugins/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,18 @@

import functools
import logging
import sys
import warnings
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Mapping
from functools import partial
from typing import Any
from typing import Callable
from typing import cast
from typing import ParamSpec
from typing import TypeVar

if sys.version_info >= (3, 10): # pragma: >3.10 cover
from typing import ParamSpec
else: # pragma: <3.10 cover
from typing_extensions import ParamSpec

try:
from dask._task_spec import DataNode
from dask.base import tokenize
from dask.utils import funcname
from distributed import Client as DaskDistributedClient
Expand All @@ -40,17 +36,6 @@
from proxystore.store.utils import get_key
from proxystore.warnings import ExperimentalWarning

try: # pragma: >3.9 cover
from dask._task_spec import DataNode

class _ProxyNode(DataNode):
key: ConnectorKeyT
value: Proxy[Any]

USE_TASK_SPEC = True
except ImportError: # pragma: <=3.9 cover
USE_TASK_SPEC = False

warnings.warn(
'Dask plugins are an experimental feature and may exhibit unexpected '
'behaviour or change in the future.',
Expand All @@ -65,6 +50,11 @@ class _ProxyNode(DataNode):
logger = logging.getLogger(__name__)


class _ProxyNode(DataNode):
key: ConnectorKeyT
value: Proxy[Any]


class Client(DaskDistributedClient):
"""Dask Distributed Client with ProxyStore support.

Expand Down Expand Up @@ -211,11 +201,11 @@ def map( # type: ignore[no-untyped-def]
# and instead want to wait to proxy until the later calls to map()
# on each batch.
key = key or funcname(func)
iterables = list(zip(*zip(*iterables))) # type: ignore[assignment]
iterables = list(zip(*zip(*iterables, strict=False), strict=False)) # type: ignore[assignment]
if not isinstance(key, list) and pure: # pragma: no branch
key = [
f'{key}-{tokenize(func, kwargs, *args)}-proxy'
for args in zip(*iterables)
for args in zip(*iterables, strict=False)
]

iterables = tuple(
Expand Down Expand Up @@ -265,7 +255,7 @@ def map( # type: ignore[no-untyped-def]
not (batch_size and batch_size > 1 and total_length > batch_size)
and self._ps_store is not None
):
for future, *args in zip(futures, *iterables):
for future, *args in zip(futures, *iterables, strict=False):
# TODO: how to delete kwargs?
callback = partial(
_evict_proxies_callback,
Expand Down Expand Up @@ -394,17 +384,11 @@ def _evict_proxies_callback(


def _get_keys(iterable: Iterable[Any]) -> tuple[ConnectorKeyT, ...]:
if USE_TASK_SPEC: # pragma: >3.9 cover
return tuple(x.key for x in iterable if isinstance(x, _ProxyNode))
else: # pragma: <=3.9 cover
return tuple(x for x in iterable if isinstance(x, Proxy))
return tuple(x.key for x in iterable if isinstance(x, _ProxyNode))


def _is_proxy(obj: Any) -> bool:
if USE_TASK_SPEC: # pragma: >3.9 cover
return isinstance(obj, (_ProxyNode, Proxy))
else: # pragma: <=3.9 cover
return isinstance(obj, Proxy)
return isinstance(obj, _ProxyNode | Proxy)


def _proxy_by_size(
Expand Down Expand Up @@ -495,13 +479,10 @@ def _proxy_iterable(
for value in iterable
)

if USE_TASK_SPEC: # pragma: >3.9 cover
return tuple(
_ProxyNode(get_key(obj), obj) if isinstance(obj, Proxy) else obj
for obj in objects
)
else: # pragma: <=3.9 cover
return objects
return tuple(
_ProxyNode(get_key(obj), obj) if isinstance(obj, Proxy) else obj
for obj in objects
)


def _proxy_mapping(
Expand Down Expand Up @@ -533,15 +514,10 @@ def _proxy_mapping(
for key in mapping
}

if USE_TASK_SPEC: # pragma: >3.9 cover
return {
key: _ProxyNode(get_key(obj), obj)
if isinstance(obj, Proxy)
else obj
for key, obj in objects.items()
}
else: # pragma: <=3.9 cover
return objects
return {
key: _ProxyNode(get_key(obj), obj) if isinstance(obj, Proxy) else obj
for key, obj in objects.items()
}


def _proxy_task_wrapper(
Expand Down
14 changes: 5 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@ maintainers = [
]
description = "ProxyStore extensions."
readme = "README.md"
requires-python = ">=3.9"
requires-python = ">=3.10"
license = {text = "MIT"}
classifiers = [
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: Implementation :: CPython",
]
dependencies = [
"proxystore>=0.6.5",
"proxystore>=0.8.3",
"pyzmq",
]

Expand Down Expand Up @@ -86,7 +82,7 @@ omit = [
parallel = true

[tool.mypy]
python_version = "3.12"
python_version = "3.14"
plugins = [ "proxystore.mypy_plugin" ]
check_untyped_defs = true
disallow_any_generics = true
Expand All @@ -112,7 +108,7 @@ timeout = 30

[tool.ruff]
line-length = 79
target-version = "py39"
target-version = "py310"

[tool.ruff.format]
indent-style = "space"
Expand Down
2 changes: 1 addition & 1 deletion testing/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import contextlib
import importlib.util
import platform
from collections.abc import Callable
from collections.abc import Generator
from contextlib import AbstractContextManager
from typing import Any
from typing import Callable
from unittest import mock

import pytest
Expand Down
12 changes: 6 additions & 6 deletions tests/connectors/dim/ucx_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ async def recv_obj(self) -> bytes:
return serialize(r)

with (
mock.patch(
'proxystore_ex.connectors.dim.ucx.wait_for_server',
),
mock.patch('proxystore_ex.connectors.dim.ucx.wait_for_server'),
mock.patch(
'ucp.create_endpoint',
AsyncMock(return_value=MockEndpoint()),
),
):
with UCXConnector(port=0) as connector:
with pytest.raises(Exception, match='test'):
connector._send_rpcs([RPC('get', TEST_KEY)])
with (
UCXConnector(port=0) as connector,
pytest.raises(Exception, match='test'),
):
connector._send_rpcs([RPC('get', TEST_KEY)])


def test_connector_close_kills_server() -> None:
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py{39,310,311,312,313},py{39,310,311}-dim, pre-commit, docs
envlist = py{310,311,312,313,314},py{310,311}-dim, pre-commit, docs

[testenv]
extras = dev
Expand All @@ -9,7 +9,7 @@ commands =
coverage combine --quiet
coverage report

[testenv:py{39,310,311}-dim]
[testenv:py{310,311}-dim]
allowlist_externals =
bash
timeout
Expand Down