diff --git a/dataspaces.conf b/dataspaces.conf new file mode 100644 index 0000000..eeda567 --- /dev/null +++ b/dataspaces.conf @@ -0,0 +1,5 @@ +## Config file for DataSpaces +ndim = 1 +dims = 10000000000 +max_versions = 1 +num_apps = 1 diff --git a/psbench/argparse.py b/psbench/argparse.py index 149354f..9bfa25d 100644 --- a/psbench/argparse.py +++ b/psbench/argparse.py @@ -54,6 +54,21 @@ def add_ipfs_options(parser: argparse.ArgumentParser) -> None: ) +def add_dspaces_options(parser: argparse.ArgumentParser) -> None: + """Add CLI arguments for DataSpaces. + + Args: + parser (ArgumentParser): parser object to add DataSpaces arguments to. + """ + ' '.join(sys.argv) + parser.add_argument( + '--dspaces', + action='store_true', + default=False, + help='Use DataSpaces for data transfer.', + ) + + def add_logging_options(parser: argparse.ArgumentParser) -> None: """Add CLI arguments for logging options.""" group = parser.add_argument_group( diff --git a/psbench/benchmarks/globus_compute_tasks/main.py b/psbench/benchmarks/globus_compute_tasks/main.py index 76c5461..6748026 100644 --- a/psbench/benchmarks/globus_compute_tasks/main.py +++ b/psbench/benchmarks/globus_compute_tasks/main.py @@ -22,6 +22,7 @@ from proxystore.store.utils import get_key from psbench import ipfs +from psbench.argparse import add_dspaces_options from psbench.argparse import add_globus_compute_options from psbench.argparse import add_ipfs_options from psbench.argparse import add_logging_options @@ -31,6 +32,7 @@ from psbench.logging import TESTING_LOG_LEVEL from psbench.proxystore import init_store_from_args from psbench.tasks.pong import pong +from psbench.tasks.pong import pong_dspaces from psbench.tasks.pong import pong_ipfs from psbench.tasks.pong import pong_proxy from psbench.utils import randbytes @@ -153,6 +155,84 @@ def time_task_ipfs( ) +def time_task_dspaces( + *, + gce: globus_compute_sdk.Executor, + input_size: int, + output_size: int, + task_sleep: float, +) -> TaskStats: + """Execute and time a single Globus Compute task with DataSpaces for data transfer. + + Args: + gce (Executor): Globus Compute Executor to submit task through. + input_size (int): number of bytes to send as input to task. + output_size (int): number of bytes task should return. + task_sleep (int): number of seconds to sleep inside task. + + Returns: + TaskStats + """ + import dspaces as ds + import numpy as np + from mpi4py import MPI + + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + size = comm.Get_size() + version = 1 + + client = ds.dspaces.DSClient() + path = str(uuid.uuid4()) + + data = randbytes(input_size) + input_size / size + start = time.perf_counter_ns() + + client.Put( + np.array(bytearray(data)), + path, + version=version, + offset=((input_size * rank),), + ) + fut = gce.submit( + pong_dspaces, + path, + input_size, + rank, + size, + version=version, + result_size=output_size, + sleep=task_sleep, + ) + + result = fut.result() + + if result is not None: + out_path = result[0] + out_size = result[1] + data = client.Get( + out_path, + version, + lb=((out_size * rank),), + ub=((out_size * rank + out_size - 1),), + dtype=bytes, + timeout=-1, + ).tobytes() + + end = time.perf_counter_ns() + assert isinstance(data, bytes) + + return TaskStats( + proxystore_backend='DataSpaces', + task_name='pong', + input_size_bytes=input_size, + output_size_bytes=output_size, + task_sleep_seconds=task_sleep, + total_time_ms=(end - start) / 1e6, + ) + + def time_task_proxy( *, gce: globus_compute_sdk.Executor, @@ -219,6 +299,7 @@ def runner( *, globus_compute_endpoint: str, store: Store | None, + use_dspaces: bool, use_ipfs: bool, ipfs_local_dir: str | None, ipfs_remote_dir: str | None, @@ -237,6 +318,7 @@ def runner( 'Starting test runner\n' f' - Globus Compute Endpoint: {globus_compute_endpoint}\n' f' - ProxyStore backend: {store_connector_name}\n' + f' - DataSpaces enabled: {use_dspaces}\n' f' - IPFS enabled: {use_ipfs}\n' f' - Task type: ping-pong\n' f' - Task repeat: {task_repeat}\n' @@ -245,9 +327,10 @@ def runner( f' - Task sleep time: {task_sleep} s', ) - if store is not None and use_ipfs: + if store is not None and (use_ipfs or use_dspaces): raise ValueError( - 'IPFS and ProxyStore cannot be used at the same time.', + f"""{"IPFS" if use_ipfs else "DataSpaces"} and ProxyStore + cannot be used at the same time.""", ) runner_start = time.perf_counter_ns() @@ -281,6 +364,13 @@ def runner( output_size=output_size, task_sleep=task_sleep, ) + elif use_dspaces: + stats = time_task_dspaces( + gce=gce, + input_size=input_size, + output_size=output_size, + task_sleep=task_sleep, + ) else: stats = time_task( gce=gce, @@ -360,6 +450,7 @@ def main(argv: Sequence[str] | None = None) -> int: add_logging_options(parser) add_proxystore_options(parser, required=False) add_ipfs_options(parser) + add_dspaces_options(parser) args = parser.parse_args(argv) init_logging(args.log_file, args.log_level, force=True) @@ -369,6 +460,7 @@ def main(argv: Sequence[str] | None = None) -> int: runner( globus_compute_endpoint=args.globus_compute_endpoint, store=store, + use_dspaces=args.dspaces, use_ipfs=args.ipfs, ipfs_local_dir=args.ipfs_local_dir, ipfs_remote_dir=args.ipfs_remote_dir, diff --git a/psbench/tasks/pong.py b/psbench/tasks/pong.py index 821084f..8f2fa36 100644 --- a/psbench/tasks/pong.py +++ b/psbench/tasks/pong.py @@ -71,6 +71,66 @@ def pong_ipfs( return None +def pong_dspaces( + path: str, + data_size: int, + rank: int, + size: int, + *, + version: int = 1, + result_size: int = 0, + sleep: float = 0, +) -> tuple[str, int] | None: + """Task that takes a DataSpace path and returns data via DataSpaces. + + Args: + client (ds.DSpaces):DataSpaces client + path (str): filename of the DataSpaces stored data. + data_size (int) : the size of the DataSpaces object. + rank (int) : MPI rank. + size (int): MPI communication size. + version (int): The version of the data to access (default: 1). + result_size (int): size of results byte array (default: 0). + sleep (float): seconds to sleep for to simulate work (default: 0). + + Returns: + Filename of return data or None. + """ + import time + import uuid + + import dspaces as ds + import numpy as np + + from psbench.utils import randbytes + + client = ds.dspaces.DSClient() + data = client.Get( + path, + version=version, + lb=((data_size * rank),), + ub=((data_size * rank + data_size - 1),), + dtype=bytes, + timeout=-1, + ).tobytes() + + assert isinstance(data, bytes) + time.sleep(sleep) + + if result_size > 0: + filepath = str(uuid.uuid4()) + return_data = bytearray(randbytes(result_size)) + client.Put( + np.array(return_data), + filepath, + version=version, + offset=((result_size * rank),), + ) + return (filepath, result_size) + else: + return None + + def pong_proxy( data: bytes, *, @@ -101,7 +161,6 @@ def pong_proxy( from proxystore.proxy import is_resolved from proxystore.proxy import Proxy from proxystore.store import get_store - from proxystore.store.utils import resolve_async from psbench.tasks.pong import ProxyStats from psbench.utils import randbytes @@ -110,7 +169,7 @@ def pong_proxy( assert not is_resolved(data) if sleep > 0.0: - resolve_async(data) + data.resolve_async() time.sleep(sleep) assert isinstance(data, bytes) diff --git a/tests/benchmarks/globus_compute_tasks/main_test.py b/tests/benchmarks/globus_compute_tasks/main_test.py index 49623ef..e74e4d1 100644 --- a/tests/benchmarks/globus_compute_tasks/main_test.py +++ b/tests/benchmarks/globus_compute_tasks/main_test.py @@ -109,13 +109,18 @@ def test_time_task_proxy() -> None: @pytest.mark.parametrize( - ('use_ipfs', 'use_proxystore', 'log_to_csv'), - ((False, True, False), (True, False, False), (False, False, True)), + ('use_ipfs', 'use_proxystore', 'use_dspaces', 'log_to_csv'), + ( + (False, True, False, False), + (True, False, False, False), + (False, False, False, True), + ), ) def test_runner( caplog, use_ipfs: bool, use_proxystore: bool, + use_dspaces: bool, log_to_csv: bool, tmp_path: pathlib.Path, ) -> None: @@ -145,6 +150,7 @@ def test_runner( runner( globus_compute_endpoint=str(uuid.uuid4()), store=store, + use_dspaces=use_dspaces, use_ipfs=use_ipfs, ipfs_local_dir=str(ipfs_local_dir), ipfs_remote_dir=str(ipfs_remote_dir), @@ -171,6 +177,7 @@ def test_runner_error() -> None: globus_compute_endpoint=str(uuid.uuid4()), store=store, use_ipfs=True, + use_dspaces=False, ipfs_local_dir='/tmp/local/', ipfs_remote_dir='/tmp/remote/', input_sizes=[0],