Skip to content

Conversation

@nirandaperera
Copy link
Contributor

@nirandaperera nirandaperera commented Nov 5, 2025

This PR adds Fanout Node.

/**
 * @brief Fanout policy controlling how messages are propagated.
 */
enum class FanoutPolicy : uint8_t {
    /**
     * @brief Process messages as they arrive and immediately forward them.
     *
     * Messages are forwarded as soon as they are received from the input channel.
     * The next message is not processed until all output channels have completed
     * sending the current one, ensuring backpressure and synchronized flow.
     */
    BOUNDED,

    /**
     * @brief Forward messages without enforcing backpressure.
     *
     * In this mode, messages may be accumulated internally before being
     * broadcast, or they may be forwarded immediately depending on the
     * implementation and downstream consumption rate.
     *
     * This mode disables coordinated backpressure between outputs, allowing
     * consumers to process at independent rates, but can lead to unbounded
     * buffering and increased memory usage.
     *
     * @note Consumers might not receive any messages until *all* upstream
     * messages have been sent, depending on the implementation and buffering
     * strategy.
     */
    UNBOUNDED,
};

/**
 * @brief Broadcast messages from one input channel to multiple output channels.
 *
 * The node continuously receives messages from the input channel and forwards
 * them to all output channels according to the selected fanout policy, see
 * ::FanoutPolicy.
 *
 * Each output channel receives a shallow copy of the same message; no payload
 * data is duplicated. All copies share the same underlying payload, ensuring
 * zero-copy broadcast semantics.
 *
 * @param ctx The node context to use.
 * @param ch_in Input channel from which messages are received.
 * @param chs_out Output channels to which messages are broadcast.
 * @param policy The fanout strategy to use (see ::FanoutPolicy).
 *
 * @return Streaming node representing the fanout operation.
 *
 * @throws std::invalid_argument If an unknown fanout policy is specified.
 *
 * @note Since messages are shallow-copied, releasing a payload (`release<T>()`)
 * is only valid on messages that hold exclusive ownership of the payload.
 */
Node fanout(
    std::shared_ptr<Context> ctx,
    std::shared_ptr<Channel> ch_in,
    std::vector<std::shared_ptr<Channel>> chs_out,
    FanoutPolicy policy
);

Depends on #648

Closes #560

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@copy-pr-bot
Copy link

copy-pr-bot bot commented Nov 5, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@nirandaperera nirandaperera marked this pull request as ready for review November 7, 2025 20:13
@nirandaperera nirandaperera requested review from a team as code owners November 7, 2025 20:13
@nirandaperera nirandaperera added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Nov 7, 2025
@nirandaperera nirandaperera requested a review from a team as a code owner November 7, 2025 22:29
Signed-off-by: niranda perera <[email protected]>
Co-authored-by: Mads R. B. Kristensen <[email protected]>
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a much simpler implementation lurking here.

Comment on lines 144 to 146
RAPIDSMPF_EXPECTS(
co_await ch_out->send(msg.copy(res)), "failed to send message"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be an error I think. Consider the case where the consumer has "seen enough". It wants to shut down the input channel to signal to the producer "I don't need any more inputs". The producer task should then exit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay. That's a good point. I didnt think about it. This could make purging messages a little complicated. But let me try this out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it in a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I added this capability now.

Comment on lines 15 to 41
class FanoutPolicy(IntEnum):
"""
Fanout policy controlling how messages are propagated.
Attributes
----------
BOUNDED : int
Process messages as they arrive and immediately forward them.
Messages are forwarded as soon as they are received from the input channel.
The next message is not processed until all output channels have completed
sending the current one, ensuring backpressure and synchronized flow.
UNBOUNDED : int
Forward messages without enforcing backpressure.
In this mode, messages may be accumulated internally before being
broadcast, or they may be forwarded immediately depending on the
implementation and downstream consumption rate.
This mode disables coordinated backpressure between outputs, allowing
consumers to process at independent rates, but can lead to unbounded
buffering and increased memory usage.
Note: Consumers might not receive any messages until *all* upstream
messages have been sent, depending on the implementation and buffering
strategy.
"""
BOUNDED = <uint8_t>cpp_FanoutPolicy.BOUNDED
UNBOUNDED = <uint8_t>cpp_FanoutPolicy.UNBOUNDED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reimport the C++ enum.

from rapidsmpf.streaming.core.fanout import FanoutPolicy

I think.

Comment on lines 92 to 94
# Validate policy
if not isinstance(policy, (FanoutPolicy, int)):
raise TypeError(f"policy must be a FanoutPolicy enum value, got {type(policy)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only accept FanoutPolicy as the type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, and cython was complaining that FanoutPolicy enum is not a type identifier

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to cimport it as well.

owner.append(ch_out)
_chs_out.push_back((<Channel>ch_out)._handle)

cdef cpp_FanoutPolicy _policy = <cpp_FanoutPolicy>(<int>policy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you8 use the re-exported cdef enum this is unnecessary. Check some of the pylibcudf cython code to see how it's done there.

Comment on lines 51 to 56
for (auto& ch_out : chs_out) {
// do a reservation for each copy, so that it will fallback to host memory if
// needed
// TODO: change this
auto res = ctx->br()->reserve_or_fail(msg.copy_cost(), try_memory_types(msg)[0]);
tasks.push_back(ch_out->send(msg.copy(res)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We own (or should own) the message, so one of these copies is redundant (the "last" output channel can just take ownership of the input message).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let me move the msg to the last channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- I've addressed this now

// intentionally not locking the mtx here, because we only need to know a
// lower-bound on the last completed idx (ch_next_idx values are monotonically
// increasing)
size_t last_completed_idx = std::ranges::min(state.ch_next_idx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is UB, because another thread (one of the consumers) might be updating an entry in state.ch_next_idx simultaneously.

Copy link
Contributor Author

@nirandaperera nirandaperera Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but its strictly increasing. And we only need an approximate value here. So, a current running min may not be the exact min in std::ranges::min, but it will be strictly less than or equal. Consequence would be, not cleaning up all the finished messages. But I felt it was a better trade-off than trying to relock the mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Thinking about this again, maybe I can do a ranges::min_max during request_data.wait, and purge until the min value. That would eliminate this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed this now

Comment on lines 221 to 228
// start send tasks for each output channel
coro::task_container<coro::thread_pool> tasks(ctx->executor());
for (size_t i = 0; i < chs_out.size(); i++) {
RAPIDSMPF_EXPECTS(
tasks.start(unbounded_fo_send_task(*ctx, i, chs_out[i], state)),
"failed to start send task"
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need a task container here, or can in "process inputs" loop be a task as well, and we instead do:

std:vector<...> tasks = {process_inputs(), unbounded_fo_send_task(...), ...};
coro_results(co_await coro::when_all(std::move(tasks)));

With appropriate schedule of the tasks we're running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 235 to 237
return std::ranges::any_of(state.ch_next_idx, [&](size_t next_idx) {
return state.recv_messages.size() == next_idx;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this more complicated than it needs to be?

I think this is "just" a boolean flag for "at least one of the consumers wants more data".

Could we have such a flag in the state struct that is updated when a consumer is ready and the flipped back to false here?

I think then the ch_next_idx of each consumer doesn't need to be part of the state of the struct, it's a local piece of information rather than a signalling mechanism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. The only issue would be, cleaning up finished messages. Currently I use the ch_next_idx to find the boundary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost of this test is O(n_out_channels) which would be fairly small IMO.

* tasks are in an invalid state).
*/
auto wait_for_data_request(
UnboundedFanoutState& state, size_t& last_completed_idx, size_t& latest_processed_idx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly suggest to return a std::tuple instead of the output parameters

}

/**
* @brief State for the unbounded fanout.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good with a description of the algorithm here.

*/
Node unbounded_fo_send_task(
Context& ctx,
size_t idx,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to something like self

* @param state The state of the unbounded fanout.
* @return A coroutine representing the task.
*/
Node unbounded_fo_process_input_task(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to unbounded_recv_task ?

// here, because we only need to know a lower-bound on the last completed idx
// (ch_next_idx values are monotonically increasing)
while (purge_idx + 1 < last_completed_idx) {
state->recv_messages[purge_idx].reset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to purge here, could the last unbounded_fo_send_task not move the message out of recv_messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldve done that. But the difficulty there is, for a send task to know for sure that its the last one to consume a particular message, it should observe/ wait on ch_next_idx vector under the mutex. Currently send tasks are only dependent on the cached messages deque, and each task only accesses its own ch_nex_idx[i] value.

Copy link
Contributor Author

@nirandaperera nirandaperera Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a send task fails to determine this concretely, we can have cases where some completed messages not cleaned up until the fanout node completes.

coro::mutex mtx;
// notify send tasks to copy & send messages
coro::condition_variable data_ready;
// notify this task to receive more data from the input channel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// notify this task to receive more data from the input channel
// notify recv task to receive more data from the input channel

Comment on lines 214 to 215
// now next_idx can be updated to end_idx, and if !input_done, we need to request
// parent task for more data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// now next_idx can be updated to end_idx, and if !input_done, we need to request
// parent task for more data
// now next_idx can be updated to end_idx, and if !input_done, we need to request
// the recv task for more data

Comment on lines 414 to 418
// if there is only one output channel, both bounded and unbounded implementations are
// semantically equivalent. So we can use the bounded fanout implementation.
if (chs_out.size() == 1) {
return bounded_fanout(std::move(ctx), std::move(ch_in), std::move(chs_out));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should error if the output size is 1. In that case the user should just have directly used the input channel as the output channel on the other side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am enforcing chs_out.size() > 1 now.

@wence-
Copy link
Contributor

wence- commented Nov 19, 2025

I had another look at this, and think, and I am back to my position that using a queue instead of this indices + mutex + condition_variable approach would lead to code that is easier to understand/maintain.

Here's an implementation of that approach, it passes the test suite in this PR:

struct Fanner {
    std::shared_ptr<Channel> ch_in_;
    std::vector<std::shared_ptr<Channel>> chs_out_;
    std::vector<coro::queue<std::size_t>> queues_{};
    std::deque<std::pair<std::atomic<std::size_t>, Message>> messages_{};
    coro::semaphore<1> semaphore_{1};

    ~Fanner() {
        std::vector<coro::task<void>> tasks;
        tasks.push_back(ch_in_->shutdown());
        tasks.push_back(semaphore_.shutdown());
        for (auto& q : queues_) {
            tasks.push_back(q.shutdown());
        }
        for (auto ch_out : chs_out_) {
            tasks.push_back(ch_out->shutdown());
        }
        coro::sync_wait(coro::when_all(std::move(tasks)));
    }

    Fanner(std::shared_ptr<Channel> ch_in, std::vector<std::shared_ptr<Channel>> chs_out)
        : ch_in_(std::move(ch_in)),
          chs_out_(std::move(chs_out)),
          queues_{chs_out_.size()} {}

    Node sender(std::shared_ptr<Context> ctx) {
        auto send = [](coro::queue<std::size_t>& q, std::size_t idx) -> coro::task<bool> {
            auto r = co_await q.push(idx);
            co_return r == coro::queue_produce_result::stopped;
        };
        while (true) {
            // Suspend so that we don't read from the input until at least one output
            // wants more input.
            co_await semaphore_.acquire();
            auto msg = co_await ch_in_->receive();
            if (msg.empty()) {
                break;
            }
            messages_.emplace_back(chs_out_.size(), std::move(msg));
            std::vector<coro::task<bool>> sends;
            for (auto& q : queues_) {
                if (!q.is_shutdown()) {
                    sends.push_back(send(q, messages_.size() - 1));
                }
            }
            auto results = coro_results(co_await coro::when_all(std::move(sends)));
            if (std::ranges::all_of(results, std::identity{})) {
                break;
            }
        }
        std::vector<coro::task<void>> shutdown;
        shutdown.push_back(ch_in_->drain(ctx->executor()));
        for (auto& q : queues_) {
            shutdown.push_back(q.shutdown_drain(ctx->executor()));
        }
        coro_results(co_await coro::when_all(std::move(shutdown)));
    }

    Node receiver(std::shared_ptr<Context> ctx, std::size_t i) {
        auto& q = queues_[i];
        auto ch_out = chs_out_[i];
        while (true) {
            auto idx = co_await q.pop();
            if (!idx.has_value()) {
                break;
            }
            auto& [refcount, msg_ref] = messages_[*idx];
            Message msg;
            if (refcount.load(std::memory_order_acquire) == 1) {
                msg = std::move(msg_ref);
            } else {
                auto res = ctx->br()->reserve_or_fail(
                    msg_ref.copy_cost(), try_memory_types(msg_ref)
                );
                msg = msg_ref.copy(res);
                refcount.fetch_sub(1, std::memory_order_acq_rel);
            }
            auto sent = co_await ch_out->send(std::move(msg));
            if (!sent) {
                break;
            }
            // We're ready for more input
            co_await semaphore_.release();
        }

        auto results =
            co_await coro::when_all(q.shutdown(), ch_out->drain(ctx->executor()));
        // After shutting down our queue and draining the output channel, release again,
        // so that if we shutdown early and the sender is sitting at acquire they are
        // woken up. Can't release at the same time as the shutdown because we would wake
        // the sender and they might get all the way round again to the next acquire
        // before we shutdown our queue and then they will block forever waiting for a
        // release that will never arrive.
        co_await semaphore_.release();
        // Raise any exceptions after releasing the sender.
        coro_results(std::move(results));
    }
};

Node unbounded_fanout(
    std::shared_ptr<Context> ctx,
    std::shared_ptr<Channel> ch_in,
    std::vector<std::shared_ptr<Channel>> chs_out
) {
    ShutdownAtExit c{ch_in};
    ShutdownAtExit cs{chs_out};
    auto fanner = Fanner(ch_in, chs_out);
    std::vector<Node> tasks;
    tasks.push_back(fanner.sender(ctx));
    for (std::size_t i = 0; i < chs_out.size(); i++) {
        tasks.push_back(fanner.receiver(ctx, i));
    }
    coro_results(co_await coro::when_all(std::move(tasks)));
}

Although this maintains slightly more state per output channel (a queue rather than a pair of integers) to my mind the control flow is much easier to follow. We also don't have to do complicated dances for early shutdown of an output channel.

WDYT?

@wence-
Copy link
Contributor

wence- commented Nov 19, 2025

If we don't want to consume the inputs until at least one output channel needs another message, we can use a semaphore.

@madsbk
Copy link
Member

madsbk commented Nov 19, 2025

If we don't want to consume the inputs until at least one output channel needs another message, we can use a semaphore.

This is important

@wence-
Copy link
Contributor

wence- commented Nov 19, 2025

If we don't want to consume the inputs until at least one output channel needs another message, we can use a semaphore.

This is important

Updated my suggestion to do that.

@nirandaperera
Copy link
Contributor Author

If we don't want to consume the inputs until at least one output channel needs another message, we can use a semaphore.

This is important

Updated my suggestion to do that.

@wence- I think a semaphore over calculates the number of messages to pull even though its max is set to 1. For an example, say one output channel is being consumed a lot and then suddenly paused (input not finished). Then, consuming from the catching up channels would still trigger a pull from the input channel. We want the input channel pulling to pause until all output channels have caught up, or the fast channel resumes.
(I thought about using semaphores, but didnt use it for this reason. Couldnt think of a way to get around this)

@nirandaperera
Copy link
Contributor Author

@wence- Thanks for the sketch. I like the class structure, so, I copied that to this PR. 😬

However, I am still not sold on the coro queues. Still feel like its an overkill. As I said earlier, even though its easy to follow, the queues contain redundant information. We only really care about the head and tail of the queue. Say, if we had N input messages, and n_out output channels, a downstream node that consumes all out channels in channel order.
Then, by the time channel 1 is fully consumed, we have unnecessarily pushed O(N*n_out) amount of data into the n_out queues. But the only important info there is O(n_out). To push and pull from these queues we need to go through n_out coro::mutexes.

I added an explanation to the algorithm. I hope it clears things up. IMHO, I dont think this is any more complicated than our existing code 😇

Signed-off-by: niranda perera <[email protected]>
Comment on lines 109 to 134
* The implementation follows a pull-based model, where the send tasks request data from
* the recv task. There is one recv task that receives messages from the input channel,
* and there are N send tasks that send messages to the output channels.
*
* Main task operation:
* - There is a shared deque of cached messages, and a vector that indicates the next
* index of the message to be sent to each output channel.
* - Recv task awaits until the number of cached messages is equal to the latest sent
* message index by any of the send tasks. This notifies the recv task to pull a message
* from the input channel, cache it, and notify the send tasks about the new messages.
* recv task continues this process until the input channel is fully consumed.
* - Each send task awaits until there are more cached messages to send. Once notified, it
* determines the current end of the cached messages, and sends messages in the range
* [next_idx, end_idx). Once these messages have been sent, it updates the next index to
* end_idx and notifies the recv task.
*
* Additional considerations:
* - In the recv task loop, it also identifies the last completed message index by all
* send tasks. Message upto this index are no longer needed, and are purged from the
* cached messages.
* - When a send task fails to send a message, this means the channel may have been
* prematurely shut down. In this case, it sets its index to InvalidIdx. Recv task will
* filter out channels with InvalidIdx.
* - There two RAII helpers to ensure that the notification mechanisms are properly
* cleaned up when the unbounded fanout state goes out of scope/ encounters an error.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do some work on this explanation. It intermingles low-level implementation with algorithmic stuff. The latter is more important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the description now

Comment on lines 206 to 208
// now we can copy & send messages in indices [next_idx, curr_recv_msg_sz)
// it is guaranteed that message purging will be done only on indices less
// than next_idx, so we can safely send messages without locking the mtx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunately not true by the standard, since deque::push_back modifies the container and deque::operator[] accesses the container it is not safe to concurrently access the deque and push into it.

If you grab references under a lock and then release it, they are guaranteed to not be invalidated by a push_back that runs without the lock while you hold the references, but you're not doing that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Following are the invalidation notes.

Invalidation notes

  • When inserting at either end of the deque, references are not invalidated by insert and emplace.
    push_front, push_back, emplace_front and emplace_back do not invalidate any references to elements of the deque.
  • When erasing at either end of the deque, references to non-erased elements are not invalidated by erase, pop_front and pop_back.
  • A call to resize with a smaller size does not invalidate any references to non-erased elements.
  • A call to resize with a bigger size does not invalidate any references to elements of the deque.

It seems to me that, this reference auto const& msg = recv_messages[i] is not invalidated. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_task is guaranteed that the recv_task would not touch [next_idx, curr_recv_msg_sz) indices.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you obtain the reference with the lock and then release the lock it cannot be invalidated by push operations.

The race is in operator[] since that accesses the container and could race with the modification

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two threads are not modifying the same index of the deque, this is not an issue, isnt it? Currently the idea is multiple send tasks may read from an index i, but until all of send tasks have read ith index, it will not be updated. The last sent index vector is always updated under a mutex. So, I think the recv task and send tasks should not step on each others' toes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you have to obtain the reference with mutual exclusion. Once you have it, it will not be invalidated, and you can modify the element it references just fine.

Note that operator[] is equivalent to *(begin() + n) for some n, which shows that you might have iterator invalidation with a concurrent push_back even if you're not accessing that new element

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just hold a vector<reference_wrapper> that you fill with the lock when woken and use that to get the messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the ref wrapper vector here 0fef621

I still have some questions regarding this. I'm running a simple tsan example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- You were right! 😇
tsan complaints about a data rance in the reference access in reader_thread_unsafe and emplace_back in writer_thread.

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::deque<int> d = {42};
std::mutex m;
std::atomic<bool> running = true;

void reader_thread_safe()
{
    std::vector<std::reference_wrapper<int>> ref_ptr{};

    {
        std::lock_guard<std::mutex> lock(m);
        ref_ptr.push_back(d[0]);
    }

    while (running)
    {
        int value = ref_ptr[0].get();
        std::printf("Reader saw: %d\n", value);
        std::this_thread::sleep_for(std::chrono::microseconds(10));
    }
}

void reader_thread_unsafe()
{
    std::vector<std::reference_wrapper<int>> ref_ptr{};
    ref_ptr.push_back(d[0]);

    while (running)
    {
        int value = ref_ptr[0].get();
        std::printf("Reader saw: %d\n", value);
        std::this_thread::sleep_for(std::chrono::microseconds(10));
    }
}

void writer_thread()
{
    for (int i = 0; i < 100000; ++i)
    {
        std::lock_guard<std::mutex> lock(m);
        d.emplace_back(i); // modifies deque structure
        std::this_thread::sleep_for(std::chrono::microseconds(10));
    }
    running = false;
}

int main()
{
    std::thread t2(writer_thread);
    std::thread t1(reader_thread_unsafe);

    t2.join();
    t1.join();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the ref_ptr.push_back(d[0]); must be done with mutual exclusion with the d.emplace_back(i). But ref_ptr.get() is safe without it.

@nirandaperera
Copy link
Contributor Author

@wence- @madsbk I think I addressed all the comments now. Could you please take another look?

chs_out : list[Channel]
Output channels to which messages are broadcast.
policy : FanoutPolicy
The fanout strategy to use (see FanoutPolicy).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really documented in python I think. Can we have a sentence here describing the two options?

} else {
// request more data from the input channel
lock.unlock();
co_await request_data.notify_one();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was reminded what was bugging me here. If there are multiple tasks running notify_one (or notify_all) on the same condvar, we can have lost wakeups unfortunately. I have tried to fix this in libcoro here: jbaldwin/libcoro#416

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! 🙁 I think your rationale in the jbaldwin/libcoro#416 makes sense. Should hold this PR off until jbaldwin/libcoro#416 merges? I did a bunch of repeat test runs, but didnt encounter a hang yet though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason for that is that these tasks are typically not all firing at once. I think we are probably OK to merge now, but keep that in mind in case we see issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Support channel broadcast (streaming "fanout" node).

3 participants