Skip to content
Open
1 change: 1 addition & 0 deletions python/rapidsmpf/rapidsmpf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rapids_cython_create_modules(

add_subdirectory(_detail)
add_subdirectory(allgather)
add_subdirectory(bootstrap)
add_subdirectory(memory)
add_subdirectory(communicator)
add_subdirectory(integrations/cudf)
Expand Down
22 changes: 15 additions & 7 deletions python/rapidsmpf/rapidsmpf/benchmarks/streaming_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,15 @@
from typing import TYPE_CHECKING

import cupy as cp
import ucxx._lib.libucxx as ucx_api
from mpi4py import MPI

import cudf
import rmm.mr
from pylibcudf.contiguous_split import pack
from rmm.pylibrmm.stream import DEFAULT_STREAM

import rapidsmpf.bootstrap
import rapidsmpf.communicator.mpi
from rapidsmpf.communicator.ucxx import (
barrier,
get_root_ucxx_address,
new_communicator,
)
from rapidsmpf.config import Options, get_environment_variables
from rapidsmpf.memory.buffer import MemoryType
from rapidsmpf.memory.buffer_resource import BufferResource, LimitAvailableMemory
Expand Down Expand Up @@ -200,6 +195,14 @@ def ucxx_mpi_setup(options: Options) -> Communicator:
options
Configuration options.
"""
import ucxx._lib.libucxx as ucx_api

from rapidsmpf.communicator.ucxx import (
barrier,
get_root_ucxx_address,
new_communicator,
)

if MPI.COMM_WORLD.Get_rank() == 0:
comm = new_communicator(MPI.COMM_WORLD.size, None, None, options)
root_address_str = get_root_ucxx_address(comm)
Expand Down Expand Up @@ -231,7 +234,12 @@ def setup_and_run(args: argparse.Namespace) -> None:
if args.comm == "mpi":
comm = rapidsmpf.communicator.mpi.new_communicator(MPI.COMM_WORLD, options)
elif args.comm == "ucxx":
comm = ucxx_mpi_setup(options)
if rapidsmpf.bootstrap.is_running_with_rrun():
raise ValueError(
"UCXX communicator is not supported with rrun yet, due to missing allreduce support"
)
else:
comm = ucxx_mpi_setup(options)

# Create a RMM stack with both a device pool and statistics.
mr = RmmResourceAdaptor(
Expand Down
16 changes: 16 additions & 0 deletions python/rapidsmpf/rapidsmpf/bootstrap/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# =================================================================================
# cmake-format: off
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
# cmake-format: on
# =================================================================================

set(modules_need_ucxx bootstrap.pyx)

if(RAPIDSMPF_HAVE_UCXX)
rapids_cython_create_modules(
CXX
SOURCE_FILES "${modules_need_ucxx}"
LINKED_LIBRARIES rapidsmpf::rapidsmpf maybe_asan
)
endif()
2 changes: 2 additions & 0 deletions python/rapidsmpf/rapidsmpf/bootstrap/__init__.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
// SPDX-License-Identifier: Apache-2.0
13 changes: 13 additions & 0 deletions python/rapidsmpf/rapidsmpf/bootstrap/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Bootstrap utilities for communicator creation."""

from __future__ import annotations

from rapidsmpf.bootstrap.bootstrap import (
Backend,
create_ucxx_comm,
is_running_with_rrun,
)

__all__ = ["Backend", "create_ucxx_comm", "is_running_with_rrun"]
17 changes: 17 additions & 0 deletions python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from enum import IntEnum

from rapidsmpf.communicator.communicator import Communicator
from rapidsmpf.config import Options

class Backend(IntEnum):
AUTO = ...
FILE = ...

def create_ucxx_comm(
backend: Backend = ...,
options: Options | None = ...,
) -> Communicator: ...
def is_running_with_rrun() -> bool: ...
97 changes: 97 additions & 0 deletions python/rapidsmpf/rapidsmpf/bootstrap/bootstrap.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from libcpp.memory cimport dynamic_pointer_cast, shared_ptr

from rapidsmpf.communicator.communicator cimport Communicator, cpp_Communicator
from rapidsmpf.config cimport Options, cpp_Options


cdef extern from "<rapidsmpf/bootstrap/bootstrap.hpp>" namespace \
"rapidsmpf::bootstrap" nogil:
cpdef enum class Backend(int):
AUTO
FILE


cdef extern from "<rapidsmpf/communicator/ucxx.hpp>" namespace \
"rapidsmpf::ucxx" nogil:
cdef cppclass cpp_UCXX_Communicator "rapidsmpf::ucxx::UCXX":
pass


cdef extern from "<rapidsmpf/bootstrap/ucxx.hpp>" nogil:
bint cpp_is_running_with_rrun \
"rapidsmpf::bootstrap::is_running_with_rrun"() except +

shared_ptr[cpp_UCXX_Communicator] cpp_create_ucxx_comm \
"rapidsmpf::bootstrap::create_ucxx_comm"(
Backend backend,
cpp_Options options,
) except +


def create_ucxx_comm(Backend backend = Backend.AUTO, options = None):
"""
Create a UCXX communicator using the bootstrap backend.

This function bootstraps a UCXX-based communicator using the selected
coordination backend (currently file-based), relying on environment
variables such as ``RAPIDSMPF_RANK``, ``RAPIDSMPF_NRANKS``, and
``RAPIDSMPF_COORD_DIR``.

Parameters
----------
backend
Backend to use for coordination. By default, ``Backend.AUTO`` is used,
which currently resolves to the file-based backend.
options
Configuration options for the UCXX communicator. If ``None``, a default
`rapidsmpf.config.Options` instance is used.

Returns
-------
rapidsmpf.communicator.communicator.Communicator
A new RapidsMPF-UCXX communicator instance.
"""
cdef Communicator ret = Communicator.__new__(Communicator)
cdef shared_ptr[cpp_UCXX_Communicator] ucxx_comm
cdef shared_ptr[cpp_Communicator] base_comm
cdef Options cpp_options

if options is None:
cpp_options = Options()
else:
if not isinstance(options, Options):
raise TypeError(
"options must be a rapidsmpf.config.Options instance or None"
)
cpp_options = <Options>options

with nogil:
ucxx_comm = cpp_create_ucxx_comm(backend, cpp_options._handle)
base_comm = dynamic_pointer_cast[cpp_Communicator, cpp_UCXX_Communicator](
ucxx_comm
)
ret._handle = base_comm

return ret


def is_running_with_rrun():
"""
Check whether the current process was launched via ``rrun``.

This helper inspects the bootstrap environment (e.g. the presence of
``RAPIDSMPF_RANK``) to determine if the process is running under
``rrun``-managed bootstrap mode.

Returns
-------
bool
``True`` if running under ``rrun`` bootstrap mode, ``False`` otherwise.
"""
cdef bint ret
with nogil:
ret = cpp_is_running_with_rrun()
return bool(ret)
29 changes: 26 additions & 3 deletions python/rapidsmpf/rapidsmpf/examples/bulk_mpi_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import rmm.mr
from rmm.pylibrmm.stream import DEFAULT_STREAM

import rapidsmpf.bootstrap
import rapidsmpf.communicator.mpi
from rapidsmpf.config import Options, get_environment_variables
from rapidsmpf.integrations.cudf.partition import (
Expand Down Expand Up @@ -45,6 +46,23 @@
from rapidsmpf.communicator.communicator import Communicator


def barrier(comm: Communicator) -> None:
"""
Blocks until all processes in the communicator have reached this point.

Parameters
----------
comm
The communicator to barrier.
"""
if rapidsmpf.bootstrap.is_running_with_rrun():
from rapidsmpf.communicator.ucxx import barrier as ucxx_barrier

ucxx_barrier(comm)
else:
MPI.COMM_WORLD.barrier()


def read_batch(paths: list[str]) -> tuple[plc.Table, list[str]]:
"""
Read a single batch of Parquet files.
Expand Down Expand Up @@ -279,7 +297,12 @@ def setup_and_run(args: argparse.Namespace) -> None:
if args.cluster_type == "mpi":
comm = rapidsmpf.communicator.mpi.new_communicator(MPI.COMM_WORLD, options)
elif args.cluster_type == "ucxx":
comm = ucxx_mpi_setup(options)
if rapidsmpf.bootstrap.is_running_with_rrun():
comm = rapidsmpf.bootstrap.create_ucxx_comm(
backend=rapidsmpf.bootstrap.Backend.AUTO, options=options
)
else:
comm = ucxx_mpi_setup(options)

# Create a RMM stack with both a device pool and statistics.
mr = RmmResourceAdaptor(
Expand Down Expand Up @@ -333,7 +356,7 @@ def setup_and_run(args: argparse.Namespace) -> None:
--spill-device: {spill_device}"""
)

MPI.COMM_WORLD.barrier()
barrier(comm)

if cupti_monitor is not None:
cupti_monitor.start_monitoring()
Expand All @@ -351,7 +374,7 @@ def setup_and_run(args: argparse.Namespace) -> None:
statistics=stats,
)
elapsed_time = MPI.Wtime() - start_time
MPI.COMM_WORLD.barrier()
barrier(comm)

if cupti_monitor is not None:
cupti_monitor.stop_monitoring()
Expand Down