Skip to content

Commit 2c59ffd

Browse files
committed
Move AllGather/AllReduce to coll namespace
1 parent 0eba9ce commit 2c59ffd

File tree

14 files changed

+52
-52
lines changed

14 files changed

+52
-52
lines changed

cpp/CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ target_link_options(maybe_asan INTERFACE "$<$<BOOL:${RAPIDSMPF_ASAN}>:-fsanitize
156156

157157
add_library(
158158
rapidsmpf
159-
src/allgather/allgather.cpp
160-
src/allreduce/allreduce.cpp
161-
src/allreduce/device_kernels.cu
159+
src/coll/allgather.cpp
160+
src/coll/allreduce.cpp
161+
src/coll/device_kernels.cu
162162
src/bootstrap/bootstrap.cpp
163163
src/bootstrap/file_backend.cpp
164164
src/communicator/communicator.cpp

cpp/include/rapidsmpf/allgather/allgather.hpp renamed to cpp/include/rapidsmpf/coll/allgather.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
#include <rapidsmpf/statistics.hpp>
2828

2929
/**
30-
* @namespace rapidsmpf::allgather
31-
* @brief Allgather communication interfaces.
30+
* @namespace rapidsmpf::coll
31+
* @brief Collective communication interfaces.
3232
*
3333
* An allgather service for distributed communication where all ranks collect
3434
* data from all other ranks.
3535
*/
36-
namespace rapidsmpf::allgather {
36+
namespace rapidsmpf::coll {
3737
namespace detail {
3838

3939
/// @brief Type alias for chunk identifiers.
@@ -559,4 +559,4 @@ class AllGather {
559559
std::vector<std::unique_ptr<Communicator::Future>> receive_futures_{};
560560
};
561561

562-
} // namespace rapidsmpf::allgather
562+
} // namespace rapidsmpf::coll

cpp/include/rapidsmpf/allreduce/allreduce.hpp renamed to cpp/include/rapidsmpf/coll/allreduce.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
#include <utility>
1616
#include <vector>
1717

18-
#include <rapidsmpf/allgather/allgather.hpp>
18+
#include <rapidsmpf/coll/allgather.hpp>
1919
#include <rapidsmpf/communicator/communicator.hpp>
2020
#include <rapidsmpf/error.hpp>
2121
#include <rapidsmpf/memory/buffer.hpp>
@@ -24,7 +24,7 @@
2424
#include <rapidsmpf/progress_thread.hpp>
2525
#include <rapidsmpf/statistics.hpp>
2626

27-
namespace rapidsmpf::allreduce {
27+
namespace rapidsmpf::coll {
2828

2929
/**
3030
* @brief Reduction operators supported by `AllReduce`.
@@ -57,7 +57,7 @@ using ReduceKernel = std::function<void(PackedData& accum, PackedData&& incoming
5757
/**
5858
* @brief AllReduce collective.
5959
*
60-
* The current implementation is built using `allgather::AllGather` and performs
60+
* The current implementation is built using `coll::AllGather` and performs
6161
* the reduction locally after allgather completes. Considering `R` is the number of
6262
* ranks, and `N` is the number of bytes of data, per rank this incurs `O(R * N)` bytes of
6363
* memory consumption and `O(R)` communication operations.
@@ -89,7 +89,7 @@ class AllReduce {
8989
* @param finished_callback Optional callback run once locally when the allreduce
9090
* is finished and results are ready for extraction.
9191
*
92-
* @note This constructor internally creates an `allgather::AllGather` instance
92+
* @note This constructor internally creates an `AllGather` instance
9393
* that uses the same communicator, progress thread, and buffer resource.
9494
*/
9595
AllReduce(
@@ -183,7 +183,7 @@ class AllReduce {
183183
ReduceKernel reduce_kernel_; ///< Type-erased reduction kernel
184184
std::function<void(void)> finished_callback_; ///< Optional finished callback
185185

186-
allgather::AllGather gatherer_; ///< Underlying allgather primitive
186+
AllGather gatherer_; ///< Underlying allgather primitive
187187

188188
std::atomic<std::uint32_t> nlocal_insertions_{0}; ///< Number of local inserts
189189
std::atomic<bool> reduced_computed_{
@@ -204,4 +204,4 @@ template <typename T, ReduceOp Op>
204204
ReduceKernel make_reduce_kernel();
205205
} // namespace detail
206206

207-
} // namespace rapidsmpf::allreduce
207+
} // namespace rapidsmpf::coll

cpp/include/rapidsmpf/memory/pinned_memory_resource.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ namespace rapidsmpf {
3131
/**
3232
* @brief Checks if the PinnedMemoryResource is supported for the current CUDA version.
3333
*
34-
* Requires rapidsmpf to be build with cuda>=12.6.
34+
* Requires rapidsmpf to be built with CUDA>=12.6.
3535
*
36+
* @return True if the PinnedMemoryResource is supported for the current CUDA version,
37+
* false otherwise.
3638
* @note The driver version check is cached and only performed once.
3739
*/
3840
inline bool is_pinned_memory_resources_supported() {

cpp/include/rapidsmpf/streaming/coll/allgather.hpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
#include <memory>
1010
#include <vector>
1111

12-
#include <rapidsmpf/allgather/allgather.hpp>
12+
#include <rapidsmpf/coll/allgather.hpp>
1313
#include <rapidsmpf/communicator/communicator.hpp>
1414
#include <rapidsmpf/memory/packed_data.hpp>
1515
#include <rapidsmpf/streaming/chunks/packed_data.hpp>
@@ -22,7 +22,7 @@
2222
namespace rapidsmpf::streaming {
2323

2424
/**
25-
* @brief Asynchronous (coroutine) interface to `allgather::AllGather`.
25+
* @brief Asynchronous (coroutine) interface to `coll::AllGather`.
2626
*
2727
* Once the AllGather is created, many tasks may insert data into it. If multiple tasks
2828
* insert data, the user is responsible for arranging that `insert_finished` is only
@@ -31,8 +31,8 @@ namespace rapidsmpf::streaming {
3131
*/
3232
class AllGather {
3333
public:
34-
/// @copydoc allgather::AllGather::Ordered
35-
using Ordered = rapidsmpf::allgather::AllGather::Ordered;
34+
/// @copydoc coll::AllGather::Ordered
35+
using Ordered = rapidsmpf::coll::AllGather::Ordered;
3636
/**
3737
* @brief Construct an asynchronous allgather.
3838
*
@@ -63,7 +63,7 @@ class AllGather {
6363
*/
6464
void insert(std::uint64_t sequence_number, PackedDataChunk&& chunk);
6565

66-
/// @copydoc rapidsmpf::allgather::AllGather::insert_finished()
66+
/// @copydoc rapidsmpf::coll::AllGather::insert_finished()
6767
void insert_finished();
6868

6969
/**
@@ -82,15 +82,15 @@ class AllGather {
8282
coro::event
8383
event_{}; ///< Event tracking whether all data has arrived and can be extracted.
8484
std::shared_ptr<Context> ctx_; ///< Streaming context.
85-
allgather::AllGather gatherer_; ///< Underlying collective allgather.
85+
coll::AllGather gatherer_; ///< Underlying collective allgather.
8686
};
8787

8888
namespace node {
8989

9090
/**
9191
* @brief Create an allgather node for a single allgather operation.
9292
*
93-
* This is a streaming version of `rapidsmpf::allgather::AllGather` that operates on
93+
* This is a streaming version of `rapidsmpf::coll::AllGather` that operates on
9494
* packed data received through `Channel`s.
9595
*
9696
* @param ctx The streaming context to use.

cpp/src/allgather/allgather.cpp renamed to cpp/src/coll/allgather.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
#include <mutex>
1313
#include <optional>
1414

15-
#include <rapidsmpf/allgather/allgather.hpp>
15+
#include <rapidsmpf/coll/allgather.hpp>
1616
#include <rapidsmpf/communicator/communicator.hpp>
1717
#include <rapidsmpf/memory/buffer.hpp>
1818
#include <rapidsmpf/progress_thread.hpp>
1919
#include <rapidsmpf/utils.hpp>
2020

21-
namespace rapidsmpf::allgather {
21+
namespace rapidsmpf::coll {
2222
namespace detail {
2323

2424
Chunk::Chunk(
@@ -538,4 +538,4 @@ ProgressThread::ProgressState AllGather::event_loop() {
538538
: ProgressThread::ProgressState::InProgress;
539539
}
540540

541-
} // namespace rapidsmpf::allgather
541+
} // namespace rapidsmpf::coll

cpp/src/allreduce/allreduce.cpp renamed to cpp/src/coll/allreduce.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010
#include <utility>
1111
#include <vector>
1212

13-
#include <rapidsmpf/allreduce/allreduce.hpp>
13+
#include <rapidsmpf/coll/allreduce.hpp>
1414
#include <rapidsmpf/error.hpp>
1515

16-
namespace rapidsmpf::allreduce {
16+
namespace rapidsmpf::coll {
1717

1818
AllReduce::AllReduce(
1919
std::shared_ptr<Communicator> comm,
@@ -56,9 +56,8 @@ std::vector<PackedData> AllReduce::wait_and_extract(std::chrono::milliseconds ti
5656
// Block until the underlying allgather completes, then perform the reduction locally
5757
// (exactly once).
5858
if (!reduced_computed_.load(std::memory_order_acquire)) {
59-
auto gathered = gatherer_.wait_and_extract(
60-
allgather::AllGather::Ordered::YES, std::move(timeout)
61-
);
59+
auto gathered =
60+
gatherer_.wait_and_extract(AllGather::Ordered::YES, std::move(timeout));
6261
reduced_results_ = reduce_all(std::move(gathered));
6362
reduced_computed_.store(true, std::memory_order_release);
6463
if (finished_callback_) {
@@ -120,4 +119,4 @@ std::vector<PackedData> AllReduce::reduce_all(std::vector<PackedData>&& gathered
120119
return results;
121120
}
122121

123-
} // namespace rapidsmpf::allreduce
122+
} // namespace rapidsmpf::coll

cpp/src/allreduce/device_kernels.cu renamed to cpp/src/coll/device_kernels.cu

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88

99
#include <rmm/exec_policy.hpp>
1010

11-
#include <rapidsmpf/allreduce/allreduce.hpp>
11+
#include <rapidsmpf/coll/allreduce.hpp>
1212
#include <rapidsmpf/error.hpp>
1313
#include <rapidsmpf/memory/buffer.hpp>
1414

15-
namespace rapidsmpf::allreduce::detail {
15+
namespace rapidsmpf::coll::detail {
1616

1717
namespace {
1818

@@ -177,4 +177,4 @@ ReduceKernel make_reduce_kernel<unsigned long, ReduceOp::MAX>() {
177177
return make_reduce_kernel_impl<unsigned long, ReduceOp::MAX>();
178178
}
179179

180-
} // namespace rapidsmpf::allreduce::detail
180+
} // namespace rapidsmpf::coll::detail

cpp/src/streaming/coll/allgather.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace rapidsmpf::streaming {
1414

1515
AllGather::AllGather(std::shared_ptr<Context> ctx, OpID op_id)
1616
: ctx_{std::move(ctx)},
17-
gatherer_{allgather::AllGather(
17+
gatherer_{coll::AllGather(
1818
ctx_->comm(),
1919
ctx_->progress_thread(),
2020
op_id,

cpp/tests/streaming/test_read_parquet.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
#include <rmm/cuda_stream_view.hpp>
3333
#include <rmm/mr/per_device_resource.hpp>
3434

35-
#include <rapidsmpf/allgather/allgather.hpp>
35+
#include <rapidsmpf/coll/allgather.hpp>
3636
#include <rapidsmpf/integrations/cudf/partition.hpp>
3737
#include <rapidsmpf/memory/packed_data.hpp>
3838
#include <rapidsmpf/owning_wrapper.hpp>
@@ -228,7 +228,7 @@ TEST_P(StreamingReadParquetParams, ReadParquet) {
228228
}
229229
run_streaming_pipeline(std::move(nodes));
230230

231-
allgather::AllGather allgather(
231+
coll::AllGather allgather(
232232
GlobalEnvironment->comm_,
233233
GlobalEnvironment->progress_thread_,
234234
/* op_id = */ 0,
@@ -254,8 +254,7 @@ TEST_P(StreamingReadParquetParams, ReadParquet) {
254254
allgather.insert_finished();
255255

256256
// May as well check on all ranks, so we also mildly exercise the allgather.
257-
auto gathered_packed_data =
258-
allgather.wait_and_extract(allgather::AllGather::Ordered::YES);
257+
auto gathered_packed_data = allgather.wait_and_extract(coll::AllGather::Ordered::YES);
259258
auto result = unpack_and_concat(
260259
std::move(gathered_packed_data), rmm::cuda_stream_default, br.get()
261260
);

0 commit comments

Comments
 (0)