Skip to content

Commit 0fef621

Browse files
committed
stashing messages using ref wrappers
Signed-off-by: niranda perera <[email protected]>
1 parent c7740c8 commit 0fef621

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

cpp/src/streaming/core/fanout.cpp

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ Node send_to_channels(
4848
) {
4949
RAPIDSMPF_EXPECTS(!chs_out.empty(), "output channels cannot be empty");
5050

51-
auto async_copy_and_send =
52-
[](Context& ctx_, Message const& msg_, size_t msg_sz_, Channel& ch_
53-
) -> coro::task<bool> {
51+
auto async_copy_and_send = [](Context& ctx_,
52+
Message const& msg_,
53+
size_t msg_sz_,
54+
Channel& ch_) -> coro::task<bool> {
5455
co_await ctx_.executor()->schedule();
5556
auto res = ctx_.br()->reserve_or_fail(msg_sz_, try_memory_types(msg_));
5657
co_return co_await ch_.send(msg_.copy(res));
@@ -204,6 +205,7 @@ struct UnboundedFanout {
204205
co_await ctx.executor()->schedule();
205206

206207
size_t n_available_messages = 0; // number of messages available to send
208+
std::vector<std::reference_wrapper<Message>> messages_to_send;
207209
while (true) {
208210
{
209211
auto lock = co_await mtx.scoped_lock();
@@ -217,25 +219,28 @@ struct UnboundedFanout {
217219
// no more messages will be received, and all messages have been sent
218220
break;
219221
}
222+
// stash msg references under the lock
223+
messages_to_send.reserve(n_available_messages - self_next_idx);
224+
for (size_t i = self_next_idx; i < n_available_messages; i++) {
225+
messages_to_send.emplace_back(recv_messages[i]);
226+
}
220227
}
221228

222-
// now we can copy & send messages in indices [next_idx, curr_recv_msg_sz)
223-
// it is guaranteed that message purging will be done only on indices less
224-
// than next_idx, so we can safely send messages without locking the mtx
225-
for (size_t i = self_next_idx; i < n_available_messages; i++) {
226-
auto const& msg = recv_messages[i];
227-
RAPIDSMPF_EXPECTS(!msg.empty(), "message cannot be empty");
229+
for (auto const& msg : messages_to_send) {
230+
RAPIDSMPF_EXPECTS(!msg.get().empty(), "message cannot be empty");
228231

229-
auto res =
230-
ctx.br()->reserve_or_fail(msg.copy_cost(), try_memory_types(msg));
231-
if (!co_await ch_out->send(msg.copy(res))) {
232+
auto res = ctx.br()->reserve_or_fail(
233+
msg.get().copy_cost(), try_memory_types(msg.get())
234+
);
235+
if (!co_await ch_out->send(msg.get().copy(res))) {
232236
// Failed to send message. Could be that the channel is shut down.
233237
// So we need to abort the send task, and notify the process input
234238
// task
235239
co_await set_ch_idx_invalid.set_channel_idx_invalid();
236240
co_return;
237241
}
238242
}
243+
messages_to_send.clear();
239244

240245
// now next_idx can be updated to end_idx, and if !no_more_input, we need to
241246
// request the recv task for more data

0 commit comments

Comments
 (0)