Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions docs/api.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<!-- omit in toc -->
# API Documentation

> [!WARNING]
Expand All @@ -17,10 +18,10 @@
- [Member functions](#member-functions)
- [(constructor)](#constructor)
- [add](#add)
- [listen (`asio`)](#listen-asio)
- [listen](#listen)
- [run](#run)
- [Example (`asio`)](#example-asio)
- [Example (`libuv`)](#example-uv)
- [Example (`libuv`)](#example-libuv)
- [`grpcxx::context`](#grpcxxcontext)
- [Member types](#member-types)
- [Member functions](#member-functions-1)
Expand Down Expand Up @@ -153,33 +154,45 @@ Add a gRPC service implementation to the server.
1. Add a gRPC service implemented by `s`. `s` must stay in scope while the server in running. If two
services are added with the same name, the second will be ignored.

#### listen (`asio`)
#### listen

|||
--------------------------------------- | ---
`ASIO_NS::awaitable<void> listen(std::string_view ip, int port);` | (1)
`ASIO_NS::awaitable<void> listen(std::string_view ip, int port);` | (1) (`asio`)
`uv_loop_t *listen(std::string_view ip, int port);` | (2) (`libuv`)
`uv_loop_t *listen(uv_os_sock_t &&sock);` | (3) (`libuv`)

Listen and prepare to serve incoming gRPC requests.

1. Await on a coroutine executor and listen on `ip` and `port` for incoming gRPC connections. The returned
_awaitable_ must be passed on to an `io_context` executor to serve requests.
2. Listen on `ip` and `port` for incoming gRPC connections and returns a pointer to the event loop.
The returned event loop can be used to run the loop externally or add external events to the loop.
The loop will be stopped when the server instance is destroyed. No request will be served until the
loop is run, either externally or by calling `run(std::stop_token token = {})`.
3. Listen on an existing socket handler `sock` for incoming gRPC connections and returns a pointer
to the event loop. The socket handler must be already bound to a network address and will be closed
when the server stops. The event loop behaviour is same as (2).

#### run

|||
------------------------------------------------- | ---
`void run(const std::string_view &ip, int port);` | (1) (`asio`)
`void run(std::string_view ip, int port, std::stop_token stop_token = {});` | (2) (`libuv`)
`void run(uv_os_sock_t sock, std::stop_token stop_token = {});` | (3) (`libuv`)
`void run(const std::string_view &ip, int port);` | (1) (`asio`)
`void run(std::string_view ip, int port, std::stop_token token = {});` | (2) (`libuv`)
`void run(uv_os_sock_t &&sock, std::stop_token token = {});` | (3) (`libuv`)
`void run(std::stop_token token = {});` | (4) (`libuv`)

Listen and serve incoming gRPC requests.

1. Start listening on `ip` and `port` for incoming gRPC connections and serve requests.
2. Same as (1), but accepting an optional stop token to asynchronously signal the server to exit.
3. Start listening on the provided tcp socket `sock`, which needs to be already bound to a network
address. This is useful when the socket needs to have some additional properties set (such as
keep-alive) and/or reused from the outside run context (such as is the case of the
[systemd socket activation protocol](https://www.freedesktop.org/software/systemd/man/latest/sd_listen_fds.html)).
3. Start listening on the socket handler `sock` for incoming gRPC connections and serve requests.
The socket handler must be already bound to a network address and will be closed when the server stops.
This is useful when the socket needs to have some additional properties set (such as keep-alive) and/or
reused outside the run context (e.g. when using [systemd socket activation protocol](https://www.freedesktop.org/software/systemd/man/latest/sd_listen_fds.html)).
4. Run the server until a stop is requested or there aren't any active handlers in the event loop.
This can be called after calling `listen(...)`.

> [!IMPORTANT]
> If used with Asio, this will create and run an `io_context` executor on the main thread.
Expand Down
33 changes: 15 additions & 18 deletions lib/grpcxx/uv/loop.cpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,31 @@
#include "loop.h"

namespace grpcxx::uv::detail {

loop_t::loop_t() : _managed{true}, _loop{new uv_loop_t{}} {
uv_loop_init(_loop);
namespace grpcxx {
namespace uv {
namespace detail {
loop::loop() {
uv_loop_init(&_loop);
}

loop_t::loop_t(uv_loop_t &loop) : _managed(false), _loop{&loop} {}

loop_t::~loop_t() noexcept {
if (!is_managed()) {
return;
loop::~loop() noexcept {
if (uv_loop_alive(&_loop)) {
uv_stop(&_loop);
}

// Ensures all remaining handles are cleaned up (but without any
// special close callback)
// Ideally all handles should've been closed by now, do a cleanup just in case
uv_walk(
_loop,
&_loop,
[](uv_handle_t *handle, void *) {
if (!uv_is_closing(handle)) {
uv_close(handle, nullptr);
}
},
nullptr);

while (uv_loop_close(_loop) == UV_EBUSY) {
uv_run(_loop, UV_RUN_ONCE);
while (uv_loop_close(&_loop) == UV_EBUSY) {
uv_run(&_loop, UV_RUN_ONCE);
}

delete _loop;
}

} // namespace grpcxx::uv::detail
} // namespace detail
} // namespace uv
} // namespace grpcxx
26 changes: 12 additions & 14 deletions lib/grpcxx/uv/loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@

#include <uv.h>

namespace grpcxx::uv::detail {

class loop_t {
namespace grpcxx {
namespace uv {
namespace detail {
class loop {
public:
loop_t();
explicit loop_t(uv_loop_t &uv_loop);
loop_t(const loop_t &) = delete;
loop_t(loop_t &&) = default;
~loop_t() noexcept;
loop(const loop &) = delete;
loop();

operator uv_loop_t *() noexcept { return _loop; }
~loop() noexcept;

bool is_managed() const { return _managed; }
operator uv_loop_t *() noexcept { return &_loop; }

private:
bool _managed;
uv_loop_t *_loop;
uv_loop_t _loop;
};

} // namespace grpcxx::uv::detail
} // namespace detail
} // namespace uv
} // namespace grpcxx
5 changes: 5 additions & 0 deletions lib/grpcxx/uv/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ scheduler::~scheduler() {
t.join();
}
}

auto *handle = reinterpret_cast<uv_handle_t *>(&_async);
if (uv_is_active(handle)) {
uv_close(handle, nullptr);
}
}

bool scheduler::enqueue(std::coroutine_handle<> h) noexcept {
Expand Down
115 changes: 61 additions & 54 deletions lib/grpcxx/uv/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,27 @@

namespace grpcxx {
namespace uv {
server::server(std::size_t n) noexcept : _loop{}, _scheduler{_loop, n} {
server::server(std::size_t n) noexcept : _scheduler(_loop, n) {
uv_tcp_init(_loop, &_handle);
_handle.data = this;
}

server::server(uv_loop_t &loop, std::size_t n) noexcept : _loop{loop}, _scheduler{_loop, n} {
uv_tcp_init(_loop, &_handle);
_handle.data = this;
server::~server() noexcept {
auto *handle = reinterpret_cast<uv_handle_t *>(&_handle);
if (!uv_is_closing(handle)) {
uv_close(handle, nullptr);
}
}

server::~server() noexcept {
if (uv_loop_alive(_loop)) {
uv_stop(_loop);
void server::bind(std::string_view ip, int port) {
sockaddr_storage addr;
if (uv_ip4_addr(ip.data(), port, reinterpret_cast<sockaddr_in *>(&addr)) != 0 &&
uv_ip6_addr(ip.data(), port, reinterpret_cast<sockaddr_in6 *>(&addr)) != 0) {
throw std::runtime_error(std::string(ip) + " is not a valid IPv4 or IPv6 address");
}

if (auto r = uv_tcp_bind(&_handle, reinterpret_cast<const sockaddr *>(&addr), 0); r != 0) {
throw std::runtime_error(std::string("Failed to bind to tcp address: ") + uv_strerror(r));
}
}

Expand Down Expand Up @@ -54,21 +62,29 @@ void server::conn_cb(uv_stream_t *stream, int status) {
s->conn(stream);
}

void server::prepare(std::string_view ip, int port) {
sockaddr_storage addr;
if (uv_ip4_addr(ip.data(), port, reinterpret_cast<sockaddr_in *>(&addr)) != 0 &&
uv_ip6_addr(ip.data(), port, reinterpret_cast<sockaddr_in6 *>(&addr)) != 0) {
throw std::runtime_error(std::string(ip) + " is not a valid IPv4 or IPv6 address");
void server::listen() {
if (auto r = uv_listen(reinterpret_cast<uv_stream_t *>(&_handle), TCP_LISTEN_BACKLOG, conn_cb);
r != 0) {
throw std::runtime_error(
std::string("Failed to listen for connections: ") + uv_strerror(r));
}
}

if (auto r = uv_tcp_bind(&_handle, reinterpret_cast<const sockaddr *>(&addr), 0); r != 0) {
throw std::runtime_error(std::string("Failed to bind to tcp address: ") + uv_strerror(r));
}
uv_loop_t *server::listen(uv_os_sock_t &&sock) {
open(std::move(sock));
listen();

start_listening();
return _loop;
}

void server::prepare(uv_os_sock_t sock) {
uv_loop_t *server::listen(std::string_view ip, int port) {
bind(std::move(ip), port);
listen();

return _loop;
}

void server::open(uv_os_sock_t &&sock) {
if (auto r = uv_tcp_open(&_handle, sock); r != 0) {
throw std::runtime_error(
std::string("Failed to open socket as a tcp handle: ") + uv_strerror(r));
Expand All @@ -84,52 +100,43 @@ void server::prepare(uv_os_sock_t sock) {
std::string("Failed to retrieve bound address for socket: ") + uv_strerror(r));
}
#endif

start_listening();
}

void server::start_listening() {
if (auto r = uv_listen(reinterpret_cast<uv_stream_t *>(&_handle), TCP_LISTEN_BACKLOG, conn_cb);
r != 0) {
throw std::runtime_error(
std::string("Failed to listen for connections: ") + uv_strerror(r));
void server::run(std::stop_token token) {
if (token.stop_possible()) {
uv_timer_init(_loop, &_timer);

_token = std::move(token);
_timer.data = this;

uv_timer_start(
&_timer,
[](uv_timer_t *timer) {
auto *s = static_cast<server *>(timer->data);
if (s->_token.stop_requested()) {
uv_timer_stop(timer);
uv_close(reinterpret_cast<uv_handle_t *>(timer), nullptr);
uv_close(reinterpret_cast<uv_handle_t *>(&s->_handle), nullptr);
uv_stop(timer->loop);
}
},
SHUTDOWN_CHECK_INTERVAL_MS,
SHUTDOWN_CHECK_INTERVAL_MS);
}
}

void server::run(std::string_view ip, int port, std::stop_token stop_token) {
prepare(std::move(ip), port);
setup_stop_timer(std::move(stop_token));
uv_run(_loop, UV_RUN_DEFAULT);
}

void server::run(uv_os_sock_t sock, std::stop_token stop_token) {
prepare(sock);
setup_stop_timer(std::move(stop_token));
uv_run(_loop, UV_RUN_DEFAULT);
void server::run(uv_os_sock_t &&sock, std::stop_token token) {
open(std::move(sock));
listen();
run(std::move(token));
}

void server::setup_stop_timer(std::stop_token stop_token) noexcept {
// See https://docs.libuv.org/en/v1.x/guide/eventloops.html#stopping-an-event-loop
// for a rationale around the shutdown timer.

_stop_token = std::move(stop_token);
uv_timer_init(_loop, &_check_stop_timer);
_check_stop_timer.data = this;
uv_timer_start(
&_check_stop_timer,
[](uv_timer_t *handle) {
auto &self = *static_cast<server *>(handle->data);
if (self._stop_token.stop_requested()) {
uv_timer_stop(handle);
uv_close(reinterpret_cast<uv_handle_t *>(&self._handle), nullptr);
if (self._loop.is_managed()) {
uv_stop(handle->loop);
}
}
},
SHUTDOWN_CHECK_INTERVAL_MS,
SHUTDOWN_CHECK_INTERVAL_MS);
void server::run(std::string_view ip, int port, std::stop_token token) {
bind(std::move(ip), port);
listen();
run(std::move(token));
}

} // namespace uv
} // namespace grpcxx
Loading
Loading