Skip to content

Async Ranges#449

Draft
PyXiion wants to merge 22 commits intojbaldwin:mainfrom
PyXiion:ranges
Draft

Async Ranges#449
PyXiion wants to merge 22 commits intojbaldwin:mainfrom
PyXiion:ranges

Conversation

@PyXiion
Copy link
Contributor

@PyXiion PyXiion commented Mar 1, 2026

While working with std::ranges, I've got this idea. We already have nice coroutine primitives and networking API in libcoro, but when it comes to stream-style processing, everything still ends up as manual while (co_await ...) loops. It works, but it’s not composable, and it doesn’t scale nicely once you add filtering, logging, etc.

So I experimented with a small async range layer built on top of existing coroutine primitives.

Idea

An async stream is just a type that provides:

// std::nullopt means the end of stream
auto next() -> awaitable<std::optional<value_type>>;

How it looks:

auto result = co_await (
    client
    | coro::ranges::with_buffer(1024)
    | coro::ranges::join() // we flatten chunks of data into a stream of bytes
    | coro::ranges::take_until([](auto f) -> bool { return static_cast<char>(f) == '0'; }) // we take them until 0
    | coro::ranges::to<std::vector<std::byte>> // and save to a container
);

// Executing a bunch of tasks
auto task_stream = inputs | coro::ranges::transform(make_task);

auto result_task = task_stream | coro::ranges::await | coro::ranges::to<std::vector<int>>;

Everything remains fully lazy and very nice to use with coroutines.

Everything gets completely inlined with no overhead (checked on clang).


My PR is a very early draft, because I just wanted to show this idea.

Idk if this idea makes sense, so I need feedback on the overall direction before taking this further, so I didn't focus on buffering, error handling and cancellation semantics yet

How could it look (when finished)

// Proxying from one client to another
auto redirect_task = client1
    | coro::ranges::with_buffer(64_KiB)
    | coro::ranges::take(1024 * 1024) // Limit to 1 MiB
    | coro::ranges::inspect([&](auto& b) { 
        bytes_transferred++; // Logging transferred bytes
    })
    | coro::ranges::to(client2);


auto task = client
    | coro::ranges::with_buffer() 
    | coro::ranges::inspect([](auto byte) { std::cout << (int)byte; }) // sideeffect
    | coro::ranges::to(...);
    
// Ratelimiting
auto throttled_task = co_await (
    client
    | coro::ranges::with_buffer(1024)
    | coro::ranges::throttle(100ms)   // one chunk per 100 ms
    | coro::ranges::to(file)
);

// Partial reads
auto buffered_stream = client | std::ranges::with_buffer(512) | std::ranges::join;

auto header = buffered_stream
    | as_chars
    | std::ranges::take_until("\r\n\r\n")
    | std::ranges::split("\r\n")
    | std::ranges::transform(parse_header)
    | std::ranges::to<std::vector<std::string>>();

auto body = buffered_stream
    | coro::ranges::to<std::vector<std::byte>>();

// Calculating simple XOR checksum on fly
uint8_t checksum = 0;
auto processing_task = client
    | coro::ranges::with_buffer()
    | coro::ranges::inspect([&](std::byte b) {
          checksum ^= static_cast<uint8_t>(b);
      })
    | coro::ranges::to<std::vector>();
    
// Imagine reading a stream of URLs from a socket and fetching them concurrently
auto fetch_results = co_await (
    url_client
    | coro::ranges::with_buffer()
    | coro::ranges::split_by("\n")
    // Launch up to 4 concurrent coroutines to fetch data
    | coro::ranges::flat_map_concurrent(4, [](std::string_view url) -> coro::task<Result> {
        auto response = co_await http::get(url);
        co_return parse(response);
    })
    | coro::ranges::to<std::vector>()
);

// Download a large payload via HTTP and stream it directly to a file
auto download_task = co_await (
    http_client
    | coro::ranges::with_buffer()
    | coro::ranges::join // by byte
    // Skip data until we find the end of the HTTP headers (\r\n\r\n)
    | coro::ranges::drop_until(is_start_of_body) 
    | coro::ranges::chunk(4096) 
    | coro::ranges::to(coro::ranges::write_to_file("cat.jpeg"))
);

// Simple teeing
auto stream = client | coro::ranges::with_buffer();

auto [logger, processor] = stream | coro::ranges::tee();

auto log_task = logger
    | coro::ranges::inspect([](auto b) { log_byte(b); })
    | coro::ranges::drain();

auto process_task = processor
    | coro::ranges::chunk(1024)
    | coro::ranges::to(coro::ranges::write_to(file));

co_await when_all(log_task, process_task);

// How to handle errors?
auto result = co_await (
    client
    | coro::ranges::with_buffer(1024)
    | coro::ranges::on_error([](auto const& err) {
          std::cerr << "stream error: " << err.message() << "\n";
      })
    | coro::ranges::to<std::vector>()
);

// How could cancellation work?
cancellation_source cancel;

auto task = co_await (
    client
    | coro::ranges::with_buffer(4096)
    | coro::ranges::with_cancellation(cancel.token())
    | coro::ranges::to(coro::ranges::write_to(file))
);

// somewhere else
cancel.request_cancellation();

@PyXiion PyXiion changed the title Async Ranges API for networking Async Ranges Mar 1, 2026
namespace coro::ranges
{
template<concepts::async_streamable previous_stream_t, typename value_t>
class await_view_base
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used a base class because coro::task needs a different treatment

template<concepts::async_streamable Rng>
auto operator()(Rng rng) const -> coro::task<void>
{
while (co_await rng.next()) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drain basically just extracts values until they run out


// Should be safe, because socket_stream gets moved
// into further pipe objects
auto next() -> coro::task<std::optional<std::span<const std::byte>>>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to std::optional<std;:expected<std::span<const std::byte>, io_status>>? Looks too verbose, so maybe it's better to use std::expected everywhere

else
{
return std::forward<Adaptor>(partial)(std::forward<Rng>(rng));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's the best way to handle references, but I don't know the better

}

template<typename Adaptor, typename... Args>
struct _partial : public concepts::_async_adaptor
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took this pattern from stdlib code, it saves args and then just appends sync stream as the first arg

constexpr auto unwrap_return_value(std::reference_wrapper<T> value) noexcept -> T&
{
return value.get();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because std::optional doesn't support references, this is a way around it. But maybe forbidding references will be better

@PyXiion
Copy link
Contributor Author

PyXiion commented Mar 7, 2026

I recently learned about Receiver/Sender in C++ 26, it's basically the same thing I'm doing here.

I think I should make the API compatible with it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant