Skip to content

Conversation

@pentschev
Copy link
Member

Implements an AllReduce collective on top of the AllGather implementation. This may or may not be an acceptable implementation given the performance characteristics it delivers, see the AllReduce class docs for details, but it makes the implementation certainly easier to maintain.

/**
 * @brief AllReduce collective.
 *
 * The current implementation is built using `coll::AllGather` and performs
 * the reduction locally after allgather completes. Considering `R` is the number of
 * ranks, and `N` is the number of bytes of data, per rank this incurs `O(R * N)` bytes of
 * memory consumption and `O(R)` communication operations.
 *
 * Semantics:
 *  - Each rank may call `insert` any number of times with a local sequence number.
 *  - Conceptually, the *k*-th insertion on each rank participates in a single
 *    global reduction. That is, insertions are paired across ranks by their
 *    local insertion order, not by sequence number values.
 *  - Once all ranks call `insert_finished`, `wait_and_extract` returns one
 *    globally-reduced `PackedData` per local insertion on this rank.
 *
 * The actual reduction is implemented via a type-erased `ReduceKernel` that is
 * supplied at construction time. Helper factories such as
 * `detail::make_reduce_kernel` can be used to build element-wise
 * reductions over contiguous arrays in device memory.
 */

@pentschev pentschev self-assigned this Nov 20, 2025
@pentschev pentschev requested a review from a team as a code owner November 20, 2025 22:09
@pentschev pentschev added the feature request New feature or request label Nov 20, 2025
@pentschev pentschev requested review from a team as code owners November 20, 2025 22:09
@pentschev pentschev added the non-breaking Introduces a non-breaking change label Nov 20, 2025
Copy link
Member

@KyleFromNVIDIA KyleFromNVIDIA left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved trivial CMake changes

Comment on lines 66 to 71
* - Each rank may call `insert` any number of times with a local sequence number.
* - Conceptually, the *k*-th insertion on each rank participates in a single
* global reduction. That is, insertions are paired across ranks by their
* local insertion order, not by sequence number values.
* - Once all ranks call `insert_finished`, `wait_and_extract` returns one
* globally-reduced `PackedData` per local insertion on this rank.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the semantics we want?

To me, an allreduce takes "one input per rank" and produces "one output".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think reasonable semantics are:

  1. AllReduce is "one shot" (not streamed). i.e. you only ever insert locally a single PackedData, and you only get a single PackedData out.
  2. All inputs are reduced over, i.e. you only ever get a single PackedData output.

So either:

// Effectively the semantics of MPI_Allreduce
auto allreduce = AllReduce(comm, op_id, reduce_kernel=SUM);
allreduce.insert(packed_data);
auto result = allreduce.wait_and_extract();

Or

auto allreduce = AllReduce(comm, op_id, reduce_kernel=SUM);
while (...) {
    allreduce.insert(packed_data[i]);
}
allreduce.insert_finished();
auto result = allreduce.wait_and_extract();

But in that latter case you inserted/communicated more data than necessary (unless we stage and combine locally first). So you should just have done the local combinations up front and inserted a single contribution to the global allreduce.

So that argues that (1) is the right option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that each PackedData entry is globally reduced with PackedData entries at the same i-th position as other ranks, so if you have only one insertion per rank you get exactly what you're saying. This is how it can be used currently, for the following inputs:

# Rank A
insert(PackedData([1, 2, 3]))
insert(PackedData([4, 5, 6]))

# Rank B
insert(PackedData([10, 20, 30]))
insert(PackedData([40, 50, 60]))

We get this output:

std::vector<PackedData>{[11, 22, 33], [44, 55, 66]}

IIUC, what you're proposing is a simpler interface that instead of allowing inserting N PackedData entries and getting a vector containing N PackedData outputs, we should only allow one insertion per rank and get only a single PackedData output. Is that what you're proposing?

To clarify, I did this to match what AllGather currently does, where you can insert multiple entries, and retrieve a vector with the same number of entries using wait_and_extract

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to be able to insert multiple entries in allgather because you want the concatenation of all the inputs.

In allreduce, you want the accumulation of all the inputs, and I think we should just simplify and say "you should combine your local inputs before inserting them".

Matching what allgather does just because we happen to be using an allgather to implement the reduction is the wrong way to think about this, I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not everything is already done, but the API has been updated to only allow one PackedData per rank in b353aa3.

Comment on lines 178 to 182
std::shared_ptr<Communicator> comm_; ///< Communicator
std::shared_ptr<ProgressThread>
progress_thread_; ///< Progress thread (unused directly).
BufferResource* br_; ///< Buffer resource
std::shared_ptr<Statistics> statistics_; ///< Statistics collection instance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these things are managed by the allgather. Why are they replicated here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, these were all leftovers from a previous iteration, fixed in 70145ee.

Comment on lines 188 to 191
std::atomic<std::uint32_t> nlocal_insertions_{0}; ///< Number of local inserts
std::atomic<bool> reduced_computed_{
false
}; ///< Whether the reduction has been computed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these at all? This object does no communication itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same previous iteration, sorry for so much leftover code. Also fixed in 70145ee.

std::atomic<bool> reduced_computed_{
false
}; ///< Whether the reduction has been computed
std::vector<PackedData> reduced_results_; ///< Cached reduced results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This is, as far as I can tell, populated in wait_and_extract and then immediately returned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 70145ee as well.

Comment on lines 82 to 84
RAPIDSMPF_EXPECTS(
nranks > 0, "AllReduce requires a positive number of ranks", std::runtime_error
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 70145ee.

n_local == 0 || n_local == n_per_rank,
"AllReduce local insertion count does not match gathered contributions per rank",
std::runtime_error
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes no sense. Either, everyone has to insert exactly k entries and the ith result is computed by reducing over the PackedData pieces that are sliced from [i::k], or everyone inserts different numbers of entries (as is claimed is being done here) and then you can't do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now removed in b353aa3 since we support one, and only one, PackedData per rank.

Comment on lines 106 to 117
// Conceptually, the k-th insertion on each rank participates in a single
// reduction. With ordered allgather results, entries are laid out as:
// [rank0:0..n_per_rank-1][rank1:0..n_per_rank-1]...[rankP-1:0..n_per_rank-1]
for (std::size_t k = 0; k < n_per_rank; ++k) {
// Start from rank 0's contribution for this logical insertion.
auto accum = std::move(gathered[k]);
for (std::size_t r = 1; r < nranks; ++r) {
auto idx = r * n_per_rank + k;
reduce_kernel_(accum, std::move(gathered[idx]));
}
results.emplace_back(std::move(accum));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this assumes that we have always have k insertions on every rank and then the reduction for output i is sliced from gathered_results[i::k].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also removed in b353aa3.


thrust::transform(policy, acc_ptr, acc_ptr + count, in_ptr, acc_ptr, op);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream-ordering not respected.

}
results.emplace_back(std::move(accum));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this assumes that we have always have k insertions on every rank and then the reduction for output i is sliced from gathered_results[i::k].

Comment on lines +21 to +29
RAPIDSMPF_EXPECTS(
acc_buf && in_buf, "Device reduction kernel requires non-null buffers"
);
RAPIDSMPF_EXPECTS(
acc_buf->mem_type() == MemoryType::DEVICE
&& in_buf->mem_type() == MemoryType::DEVICE,
"Device reduction kernel expects device memory"
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not going to move the data to device for the user to do the reductions then we shouldn't offer this interface. Because it is invoked directly from wait_and_extract the user can't arrange that this condition holds.

* The kernel is responsible for interpreting `PackedData::metadata` and
* `PackedData::data` consistently across all ranks.
*/
using ReduceKernel = std::function<void(PackedData& accum, PackedData&& incoming)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: MPI allreduce reduces on each index. But are (/ should) we allowing list -> scalar reductions too?
eg. take the distributed sum of an array
This reduce kernel definition seems to support both.

Comment on lines +66 to +67
* - Each rank calls `insert` exactly once to contribute data to the reduction.
* - Once all ranks call `insert_finished`, `wait_and_extract` returns the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If insert is called exactly once, I think we can forgo insert_finished. It only makes sense for many insert calls.

std::function<void(void)> finished_callback
)
: reduce_kernel_{std::move(reduce_kernel)},
finished_callback_{std::move(finished_callback)},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cb is not called, isnt it? IMO, we might need to use this callback in the allgather finished callback. My suggestion is to add a cv and mutex to Allreduce and wait-methods should wait on it, based on the allgather callback,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way, both callback-based calls and wait-based calls can be supported.

Comment on lines +60 to +64
// Block until the underlying allgather completes, then perform the reduction locally
// (exactly once).
auto gathered =
gatherer_.wait_and_extract(AllGather::Ordered::YES, std::move(timeout));
return reduce_all(std::move(gathered));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to may earlier comment on callbacks.

BufferResource* br,
std::shared_ptr<Statistics> statistics = Statistics::disabled(),
ReduceKernel reduce_kernel = {},
std::function<void(void)> finished_callback = nullptr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think finished callback should provide the extracted results, similar to wait_and_extract

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't in the allgather impl.

Comment on lines +150 to +152
[[nodiscard]] PackedData wait_and_extract(
std::chrono::milliseconds timeout = std::chrono::milliseconds{-1}
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can insert only once, we should only be able extract once also, IMO.
Consequently, wait_and_extract should only be enabled if a finished callback is not provided IMO.

ReduceKernel reduce_kernel_; ///< Type-erased reduction kernel
std::function<void(void)> finished_callback_; ///< Optional finished callback

Rank nranks_; ///< Number of ranks in the communicator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, to avoid the static_casts in the cpp file.

Suggested change
Rank nranks_; ///< Number of ranks in the communicator
size_t nranks_; ///< Number of ranks in the communicator

// Block until the underlying allgather completes, then perform the reduction locally
// (exactly once).
auto gathered =
gatherer_.wait_and_extract(AllGather::Ordered::YES, std::move(timeout));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we need the Ordered results here? If the reduce kernel depends on the order, maybe we should pass the rank of the incoming as an arg?

Comment on lines +55 to +56
template <typename T, ReduceOp Op>
ReduceKernel make_reduce_kernel_impl() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change the signature to, and move the impl to the template specializations. This seems a bit out of place and if-else latches are unnecessary.

Suggested change
template <typename T, ReduceOp Op>
ReduceKernel make_reduce_kernel_impl() {
template <typename T, typename Op>
ReduceKernel make_reduce_kernel_impl(Op reduce_op) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we might want to add an array of types we support and add a type-visitor?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants